Skip to content

Commit

Permalink
[SYMMETRICDS-88] - Enable memory and file buffer for all synchronizat…
Browse files Browse the repository at this point in the history
…ions. The choice of memory vs. file system is dependent on the threshold setting.
  • Loading branch information
chenson42 committed May 17, 2009
1 parent 7364f72 commit 5880d1d
Show file tree
Hide file tree
Showing 11 changed files with 272 additions and 79 deletions.
Expand Up @@ -59,6 +59,7 @@ public class ParameterConstants {
public final static String AUTO_UPDATE_NODE_VALUES = "auto.update.node.values.from.properties";

public final static String STREAM_TO_FILE_ENABLED = "stream.to.file.enabled";
public final static String STREAM_TO_FILE_THRESHOLD = "stream.to.file.threshold.bytes";

public final static String PARAMETER_REFRESH_PERIOD_IN_MS = "parameter.reload.timeout.ms";

Expand Down
@@ -0,0 +1,115 @@
/*
* SymmetricDS is an open source database synchronization solution.
*
* Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*/
package org.jumpmind.symmetric.io;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.io.Writer;

import org.jumpmind.symmetric.util.AppUtils;

/**
* Write to an internal buffer up until the threshold. When the threshold is
* reached, flush the buffer to the file and write to the file from that point
* forward.
*/
public class ThresholdFileWriter extends Writer {

File file;

String tempFileCategory;

BufferedWriter fileWriter;

StringBuilder buffer;

long threshhold;

/**
* @param threshold The number of bytes at which to start writing to a file
* @param file The file to write to after the threshold has been reached
*/
public ThresholdFileWriter(long threshold, File file) {
this.file = file;
this.buffer = new StringBuilder();
this.threshhold = threshold;
}

/**
* @param threshold The number of bytes at which to start writing to a file
* @param tempFileCategory uses {@link AppUtils#createTempFile(String)} with this argument as the parameter
* @see AppUtils#createTempFile(String)
*/
public ThresholdFileWriter(long threshold, String tempFileCategory) {
this.tempFileCategory = tempFileCategory;
this.buffer = new StringBuilder();
this.threshhold = threshold;
}

@Override
public void close() throws IOException {
if (fileWriter != null) {
fileWriter.close();
}
}

@Override
public void flush() throws IOException {
if (fileWriter != null) {
fileWriter.flush();
}
}

@Override
public void write(char[] cbuf, int off, int len) throws IOException {
if (fileWriter != null) {
} else if (len + buffer.length() > threshhold) {
if (file == null) {
file = AppUtils.createTempFile(tempFileCategory == null ? "threshold.file.writer" : tempFileCategory);
}
fileWriter = new BufferedWriter(new FileWriter(file));
fileWriter.write(buffer.toString());
fileWriter.write(cbuf, off, len);
fileWriter.flush();
} else {
buffer.append(new String(cbuf), off, len);
}
}

public Reader getReader() throws IOException {
if (fileWriter != null) {
return new FileReader(file);
} else {
return new StringReader(buffer.toString());
}
}

public void delete() {
if (file != null && file.exists()) {
file.delete();
}
}

}
Expand Up @@ -27,8 +27,6 @@ public class DataLoaderStatistics implements IDataLoaderStatistics {

private Date startTime;

private long networkMillis;

private long filterMillis;

private long databaseMillis;
Expand Down Expand Up @@ -71,10 +69,6 @@ public long incrementStatementCount() {
return ++statementCount;
}

public void incrementNetworkMillis(long millis) {
networkMillis += millis;
}

public void incrementFilterMillis(long millis) {
filterMillis += millis;
}
Expand Down Expand Up @@ -159,14 +153,6 @@ public void setFilterMillis(long filterMillis) {
this.filterMillis = filterMillis;
}

public long getNetworkMillis() {
return networkMillis;
}

public void setNetworkMillis(long networkMillis) {
this.networkMillis = networkMillis;
}

public long getByteCount() {
return byteCount;
}
Expand Down
Expand Up @@ -27,8 +27,6 @@ public interface IDataLoaderStatistics {

public long getByteCount();

public long getNetworkMillis();

public long getFilterMillis();

public long getDatabaseMillis();
Expand Down
Expand Up @@ -37,6 +37,8 @@ public class IncomingBatchHistory implements Serializable {
public enum Status {
OK, ER, SK;
}

private IncomingBatch batch;

private long batchId;

Expand Down Expand Up @@ -82,7 +84,8 @@ public IncomingBatchHistory() {
this.hostName = thisHostName;
}

public IncomingBatchHistory(IDataLoaderContext context) {
public IncomingBatchHistory(IncomingBatch batch, IDataLoaderContext context) {
this.batch = batch;
batchId = context.getBatchId();
nodeId = context.getNodeId();
status = Status.OK;
Expand All @@ -92,7 +95,6 @@ public IncomingBatchHistory(IDataLoaderContext context) {

public void setValues(IDataLoaderStatistics statistics, boolean isSuccess) {
byteCount = statistics.getByteCount();
networkMillis = statistics.getNetworkMillis();
filterMillis = statistics.getFilterMillis();
databaseMillis = statistics.getDatabaseMillis();
statementCount = statistics.getStatementCount();
Expand Down Expand Up @@ -254,4 +256,7 @@ public void setSqlState(String sqlState) {
this.sqlState = sqlState;
}

public IncomingBatch getBatch() {
return batch;
}
}
Expand Up @@ -22,9 +22,9 @@
package org.jumpmind.symmetric.service.impl;

import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Reader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand Down Expand Up @@ -288,8 +288,8 @@ public boolean extract(Node node, IOutgoingTransport targetTransport) throws Exc
if (batches != null && batches.size() > 0) {
FileOutgoingTransport fileTransport = null;

if (shouldStreamToFile(batches)) {
fileTransport = new FileOutgoingTransport();
if (parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) {
fileTransport = new FileOutgoingTransport(parameterService.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD), "extract");
}

ExtractStreamHandler handler = new ExtractStreamHandler(dataExtractor,
Expand All @@ -305,24 +305,16 @@ public boolean extract(Node node, IOutgoingTransport targetTransport) throws Exc
}
}

protected boolean shouldStreamToFile(List<OutgoingBatch> batches) {
boolean stream2file = false;
for (OutgoingBatch outgoingBatch : batches) {
stream2file |= Constants.CHANNEL_RELOAD.equals(outgoingBatch.getChannelId());
}
return stream2file && parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED);
}

protected void copy(FileOutgoingTransport fileTransport, IOutgoingTransport targetTransport) throws IOException {
if (fileTransport != null) {
fileTransport.close();
FileReader reader = null;
Reader reader = null;
try {
reader = new FileReader(fileTransport.getFile());
reader = fileTransport.getReader();
IOUtils.copy(reader, targetTransport.open());
} finally {
IOUtils.closeQuietly(reader);
fileTransport.getFile().delete();
fileTransport.delete();
}
}
}
Expand All @@ -337,13 +329,15 @@ protected void extract(Node node, List<OutgoingBatch> batches, final IExtractLis
boolean initialized = false;
for (final OutgoingBatch batch : batches) {
history = new OutgoingBatchHistory(batch);
long ts = System.currentTimeMillis();
if (!initialized) {
handler.init();
initialized = true;
}
handler.startBatch(batch);
selectEventDataToExtract(handler, batch);
handler.endBatch(batch);
history.setDatabaseMillis(System.currentTimeMillis()-ts);
history.setStatus(OutgoingBatchHistory.Status.SE);
history.setEndTime(new Date());
outgoingBatchService.insertOutgoingBatchHistory(history);
Expand Down
Expand Up @@ -22,8 +22,6 @@
package org.jumpmind.symmetric.service.impl;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -44,6 +42,7 @@
import org.jumpmind.symmetric.common.ErrorConstants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.IDbDialect;
import org.jumpmind.symmetric.io.ThresholdFileWriter;
import org.jumpmind.symmetric.load.IBatchListener;
import org.jumpmind.symmetric.load.IColumnFilter;
import org.jumpmind.symmetric.load.IDataLoader;
Expand All @@ -67,7 +66,6 @@
import org.jumpmind.symmetric.transport.TransportException;
import org.jumpmind.symmetric.transport.file.FileIncomingTransport;
import org.jumpmind.symmetric.transport.internal.InternalIncomingTransport;
import org.jumpmind.symmetric.util.AppUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
Expand Down Expand Up @@ -172,7 +170,7 @@ public IDataLoaderStatistics loadDataBatch(String batchData) throws IOException
try {
while (dataLoader.hasNext()) {
dataLoader.load();
IncomingBatchHistory history = new IncomingBatchHistory(dataLoader.getContext());
IncomingBatchHistory history = new IncomingBatchHistory(new IncomingBatch(dataLoader.getContext()), dataLoader.getContext());
history.setValues(dataLoader.getStatistics(), true);
fireBatchComplete(dataLoader, history);
}
Expand All @@ -197,17 +195,30 @@ protected List<IncomingBatchHistory> loadDataAndReturnBatches(IIncomingTransport
IncomingBatchHistory history = null;
IDataLoader dataLoader = null;
try {
if (shouldStreamToFile(transport)) {
long totalNetworkMillis = System.currentTimeMillis();
if (parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) {
transport = writeToFile(transport);
}
totalNetworkMillis = System.currentTimeMillis() - totalNetworkMillis;
}
dataLoader = openDataLoader(transport.open());
while (dataLoader.hasNext()) {
status = new IncomingBatch(dataLoader.getContext());
history = new IncomingBatchHistory(dataLoader.getContext());
history = new IncomingBatchHistory(status, dataLoader.getContext());
list.add(history);
loadBatch(dataLoader, status, history);
status = null;
}

if (parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) {
estimateNetworkMillis(list, totalNetworkMillis);
}

for (IncomingBatchHistory incomingBatchHistory : list) {
if (incomingBatchHistory.getBatch().isPersistable()) {
incomingBatchService.insertIncomingBatchHistory(incomingBatchHistory);
}
}

} catch (RegistrationRequiredException ex) {
throw ex;
} catch (ConnectException ex) {
Expand Down Expand Up @@ -262,28 +273,30 @@ protected List<IncomingBatchHistory> loadDataAndReturnBatches(IIncomingTransport
}
return list;
}

/**
* Right now we will only write to a file first if the feature is enabled and we are in initial load
* state.
* TODO - Make sure statistics are captured correctly and stream to a file if the payload is > than
* a certain # of bytes.
*/
protected boolean shouldStreamToFile(IIncomingTransport transport) {
return parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED) && !nodeService.isDataLoadCompleted();
}

protected IIncomingTransport writeToFile(IIncomingTransport transport) throws IOException {
File file = AppUtils.createTempFile("load");
FileWriter writer = null;
protected void estimateNetworkMillis(List<IncomingBatchHistory> list, long totalNetworkMillis) {
long totalNumberOfBytes = 0;
for (IncomingBatchHistory incomingBatchHistory : list) {
totalNumberOfBytes += incomingBatchHistory.getByteCount();
}
for (IncomingBatchHistory incomingBatchHistory : list) {
if (totalNumberOfBytes > 0) {
double ratio = (double)incomingBatchHistory.getByteCount()/(double)totalNumberOfBytes;
incomingBatchHistory.setNetworkMillis((long)(totalNetworkMillis*ratio));
}
}
}

protected IIncomingTransport writeToFile(IIncomingTransport transport) throws IOException {
ThresholdFileWriter writer = null;
try {
writer = new FileWriter(file);
writer = new ThresholdFileWriter(parameterService.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD), "load");
IOUtils.copy(transport.open(), writer);
} finally {
IOUtils.closeQuietly(writer);
transport.close();
}
return new FileIncomingTransport(file);
return new FileIncomingTransport(writer);
}

private void recordStatistics(List<IncomingBatchHistory> list) {
Expand Down Expand Up @@ -327,9 +340,6 @@ public void doInTransactionWithoutResult(TransactionStatus transactionstatus) {
}
history.setValues(dataLoader.getStatistics(), true);
fireBatchComplete(dataLoader, history);
if (status.isPersistable()) {
incomingBatchService.insertIncomingBatchHistory(history);
}
} catch (IOException e) {
throw new TransportException(e);
} finally {
Expand Down

0 comments on commit 5880d1d

Please sign in to comment.