diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index c1aaa12ab7..2272589eb9 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -59,6 +59,7 @@ public class ParameterConstants { public final static String AUTO_UPDATE_NODE_VALUES = "auto.update.node.values.from.properties"; public final static String STREAM_TO_FILE_ENABLED = "stream.to.file.enabled"; + public final static String STREAM_TO_FILE_THRESHOLD = "stream.to.file.threshold.bytes"; public final static String PARAMETER_REFRESH_PERIOD_IN_MS = "parameter.reload.timeout.ms"; diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/io/ThresholdFileWriter.java b/symmetric/src/main/java/org/jumpmind/symmetric/io/ThresholdFileWriter.java new file mode 100644 index 0000000000..f01022fa66 --- /dev/null +++ b/symmetric/src/main/java/org/jumpmind/symmetric/io/ThresholdFileWriter.java @@ -0,0 +1,115 @@ +/* + * SymmetricDS is an open source database synchronization solution. + * + * Copyright (C) Chris Henson + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see + * . + */ +package org.jumpmind.symmetric.io; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.io.Writer; + +import org.jumpmind.symmetric.util.AppUtils; + +/** + * Write to an internal buffer up until the threshold. When the threshold is + * reached, flush the buffer to the file and write to the file from that point + * forward. + */ +public class ThresholdFileWriter extends Writer { + + File file; + + String tempFileCategory; + + BufferedWriter fileWriter; + + StringBuilder buffer; + + long threshhold; + + /** + * @param threshold The number of bytes at which to start writing to a file + * @param file The file to write to after the threshold has been reached + */ + public ThresholdFileWriter(long threshold, File file) { + this.file = file; + this.buffer = new StringBuilder(); + this.threshhold = threshold; + } + + /** + * @param threshold The number of bytes at which to start writing to a file + * @param tempFileCategory uses {@link AppUtils#createTempFile(String)} with this argument as the parameter + * @see AppUtils#createTempFile(String) + */ + public ThresholdFileWriter(long threshold, String tempFileCategory) { + this.tempFileCategory = tempFileCategory; + this.buffer = new StringBuilder(); + this.threshhold = threshold; + } + + @Override + public void close() throws IOException { + if (fileWriter != null) { + fileWriter.close(); + } + } + + @Override + public void flush() throws IOException { + if (fileWriter != null) { + fileWriter.flush(); + } + } + + @Override + public void write(char[] cbuf, int off, int len) throws IOException { + if (fileWriter != null) { + } else if (len + buffer.length() > threshhold) { + if (file == null) { + file = AppUtils.createTempFile(tempFileCategory == null ? "threshold.file.writer" : tempFileCategory); + } + fileWriter = new BufferedWriter(new FileWriter(file)); + fileWriter.write(buffer.toString()); + fileWriter.write(cbuf, off, len); + fileWriter.flush(); + } else { + buffer.append(new String(cbuf), off, len); + } + } + + public Reader getReader() throws IOException { + if (fileWriter != null) { + return new FileReader(file); + } else { + return new StringReader(buffer.toString()); + } + } + + public void delete() { + if (file != null && file.exists()) { + file.delete(); + } + } + +} diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/load/DataLoaderStatistics.java b/symmetric/src/main/java/org/jumpmind/symmetric/load/DataLoaderStatistics.java index 0fcb54ca3b..9f181b6e31 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/load/DataLoaderStatistics.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/load/DataLoaderStatistics.java @@ -27,8 +27,6 @@ public class DataLoaderStatistics implements IDataLoaderStatistics { private Date startTime; - private long networkMillis; - private long filterMillis; private long databaseMillis; @@ -71,10 +69,6 @@ public long incrementStatementCount() { return ++statementCount; } - public void incrementNetworkMillis(long millis) { - networkMillis += millis; - } - public void incrementFilterMillis(long millis) { filterMillis += millis; } @@ -159,14 +153,6 @@ public void setFilterMillis(long filterMillis) { this.filterMillis = filterMillis; } - public long getNetworkMillis() { - return networkMillis; - } - - public void setNetworkMillis(long networkMillis) { - this.networkMillis = networkMillis; - } - public long getByteCount() { return byteCount; } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/load/IDataLoaderStatistics.java b/symmetric/src/main/java/org/jumpmind/symmetric/load/IDataLoaderStatistics.java index fd7795340b..cd1bd06f15 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/load/IDataLoaderStatistics.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/load/IDataLoaderStatistics.java @@ -27,8 +27,6 @@ public interface IDataLoaderStatistics { public long getByteCount(); - public long getNetworkMillis(); - public long getFilterMillis(); public long getDatabaseMillis(); diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/model/IncomingBatchHistory.java b/symmetric/src/main/java/org/jumpmind/symmetric/model/IncomingBatchHistory.java index 13cc4ec456..99c591dda3 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/model/IncomingBatchHistory.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/model/IncomingBatchHistory.java @@ -37,6 +37,8 @@ public class IncomingBatchHistory implements Serializable { public enum Status { OK, ER, SK; } + + private IncomingBatch batch; private long batchId; @@ -82,7 +84,8 @@ public IncomingBatchHistory() { this.hostName = thisHostName; } - public IncomingBatchHistory(IDataLoaderContext context) { + public IncomingBatchHistory(IncomingBatch batch, IDataLoaderContext context) { + this.batch = batch; batchId = context.getBatchId(); nodeId = context.getNodeId(); status = Status.OK; @@ -92,7 +95,6 @@ public IncomingBatchHistory(IDataLoaderContext context) { public void setValues(IDataLoaderStatistics statistics, boolean isSuccess) { byteCount = statistics.getByteCount(); - networkMillis = statistics.getNetworkMillis(); filterMillis = statistics.getFilterMillis(); databaseMillis = statistics.getDatabaseMillis(); statementCount = statistics.getStatementCount(); @@ -254,4 +256,7 @@ public void setSqlState(String sqlState) { this.sqlState = sqlState; } + public IncomingBatch getBatch() { + return batch; + } } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index fe65cd8d52..e519f82d46 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -22,9 +22,9 @@ package org.jumpmind.symmetric.service.impl; import java.io.BufferedWriter; -import java.io.FileReader; import java.io.IOException; import java.io.OutputStream; +import java.io.Reader; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -288,8 +288,8 @@ public boolean extract(Node node, IOutgoingTransport targetTransport) throws Exc if (batches != null && batches.size() > 0) { FileOutgoingTransport fileTransport = null; - if (shouldStreamToFile(batches)) { - fileTransport = new FileOutgoingTransport(); + if (parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) { + fileTransport = new FileOutgoingTransport(parameterService.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD), "extract"); } ExtractStreamHandler handler = new ExtractStreamHandler(dataExtractor, @@ -305,24 +305,16 @@ public boolean extract(Node node, IOutgoingTransport targetTransport) throws Exc } } - protected boolean shouldStreamToFile(List batches) { - boolean stream2file = false; - for (OutgoingBatch outgoingBatch : batches) { - stream2file |= Constants.CHANNEL_RELOAD.equals(outgoingBatch.getChannelId()); - } - return stream2file && parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED); - } - protected void copy(FileOutgoingTransport fileTransport, IOutgoingTransport targetTransport) throws IOException { if (fileTransport != null) { fileTransport.close(); - FileReader reader = null; + Reader reader = null; try { - reader = new FileReader(fileTransport.getFile()); + reader = fileTransport.getReader(); IOUtils.copy(reader, targetTransport.open()); } finally { IOUtils.closeQuietly(reader); - fileTransport.getFile().delete(); + fileTransport.delete(); } } } @@ -337,6 +329,7 @@ protected void extract(Node node, List batches, final IExtractLis boolean initialized = false; for (final OutgoingBatch batch : batches) { history = new OutgoingBatchHistory(batch); + long ts = System.currentTimeMillis(); if (!initialized) { handler.init(); initialized = true; @@ -344,6 +337,7 @@ protected void extract(Node node, List batches, final IExtractLis handler.startBatch(batch); selectEventDataToExtract(handler, batch); handler.endBatch(batch); + history.setDatabaseMillis(System.currentTimeMillis()-ts); history.setStatus(OutgoingBatchHistory.Status.SE); history.setEndTime(new Date()); outgoingBatchService.insertOutgoingBatchHistory(history); diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index 2c9d61007d..f053279436 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -22,8 +22,6 @@ package org.jumpmind.symmetric.service.impl; import java.io.BufferedReader; -import java.io.File; -import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -44,6 +42,7 @@ import org.jumpmind.symmetric.common.ErrorConstants; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.IDbDialect; +import org.jumpmind.symmetric.io.ThresholdFileWriter; import org.jumpmind.symmetric.load.IBatchListener; import org.jumpmind.symmetric.load.IColumnFilter; import org.jumpmind.symmetric.load.IDataLoader; @@ -67,7 +66,6 @@ import org.jumpmind.symmetric.transport.TransportException; import org.jumpmind.symmetric.transport.file.FileIncomingTransport; import org.jumpmind.symmetric.transport.internal.InternalIncomingTransport; -import org.jumpmind.symmetric.util.AppUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; @@ -172,7 +170,7 @@ public IDataLoaderStatistics loadDataBatch(String batchData) throws IOException try { while (dataLoader.hasNext()) { dataLoader.load(); - IncomingBatchHistory history = new IncomingBatchHistory(dataLoader.getContext()); + IncomingBatchHistory history = new IncomingBatchHistory(new IncomingBatch(dataLoader.getContext()), dataLoader.getContext()); history.setValues(dataLoader.getStatistics(), true); fireBatchComplete(dataLoader, history); } @@ -197,17 +195,30 @@ protected List loadDataAndReturnBatches(IIncomingTransport IncomingBatchHistory history = null; IDataLoader dataLoader = null; try { - if (shouldStreamToFile(transport)) { + long totalNetworkMillis = System.currentTimeMillis(); + if (parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) { transport = writeToFile(transport); - } + totalNetworkMillis = System.currentTimeMillis() - totalNetworkMillis; + } dataLoader = openDataLoader(transport.open()); while (dataLoader.hasNext()) { status = new IncomingBatch(dataLoader.getContext()); - history = new IncomingBatchHistory(dataLoader.getContext()); + history = new IncomingBatchHistory(status, dataLoader.getContext()); list.add(history); loadBatch(dataLoader, status, history); status = null; } + + if (parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) { + estimateNetworkMillis(list, totalNetworkMillis); + } + + for (IncomingBatchHistory incomingBatchHistory : list) { + if (incomingBatchHistory.getBatch().isPersistable()) { + incomingBatchService.insertIncomingBatchHistory(incomingBatchHistory); + } + } + } catch (RegistrationRequiredException ex) { throw ex; } catch (ConnectException ex) { @@ -262,28 +273,30 @@ protected List loadDataAndReturnBatches(IIncomingTransport } return list; } - - /** - * Right now we will only write to a file first if the feature is enabled and we are in initial load - * state. - * TODO - Make sure statistics are captured correctly and stream to a file if the payload is > than - * a certain # of bytes. - */ - protected boolean shouldStreamToFile(IIncomingTransport transport) { - return parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED) && !nodeService.isDataLoadCompleted(); - } - protected IIncomingTransport writeToFile(IIncomingTransport transport) throws IOException { - File file = AppUtils.createTempFile("load"); - FileWriter writer = null; + protected void estimateNetworkMillis(List list, long totalNetworkMillis) { + long totalNumberOfBytes = 0; + for (IncomingBatchHistory incomingBatchHistory : list) { + totalNumberOfBytes += incomingBatchHistory.getByteCount(); + } + for (IncomingBatchHistory incomingBatchHistory : list) { + if (totalNumberOfBytes > 0) { + double ratio = (double)incomingBatchHistory.getByteCount()/(double)totalNumberOfBytes; + incomingBatchHistory.setNetworkMillis((long)(totalNetworkMillis*ratio)); + } + } + } + + protected IIncomingTransport writeToFile(IIncomingTransport transport) throws IOException { + ThresholdFileWriter writer = null; try { - writer = new FileWriter(file); + writer = new ThresholdFileWriter(parameterService.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD), "load"); IOUtils.copy(transport.open(), writer); } finally { IOUtils.closeQuietly(writer); transport.close(); } - return new FileIncomingTransport(file); + return new FileIncomingTransport(writer); } private void recordStatistics(List list) { @@ -327,9 +340,6 @@ public void doInTransactionWithoutResult(TransactionStatus transactionstatus) { } history.setValues(dataLoader.getStatistics(), true); fireBatchComplete(dataLoader, history); - if (status.isPersistable()) { - incomingBatchService.insertIncomingBatchHistory(history); - } } catch (IOException e) { throw new TransportException(e); } finally { diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/transport/file/FileIncomingTransport.java b/symmetric/src/main/java/org/jumpmind/symmetric/transport/file/FileIncomingTransport.java index faafc44376..a725c0ddd0 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/transport/file/FileIncomingTransport.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/transport/file/FileIncomingTransport.java @@ -1,10 +1,28 @@ +/* + * SymmetricDS is an open source database synchronization solution. + * + * Copyright (C) Chris Henson + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see + * . + */ package org.jumpmind.symmetric.transport.file; import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; import java.io.IOException; +import org.jumpmind.symmetric.io.ThresholdFileWriter; import org.jumpmind.symmetric.transport.IIncomingTransport; /** @@ -12,20 +30,20 @@ */ public class FileIncomingTransport implements IIncomingTransport { - File file; + ThresholdFileWriter fileWriter; BufferedReader reader; - public FileIncomingTransport(File file) { - this.file = file; + public FileIncomingTransport(ThresholdFileWriter fileWriter) { + this.fileWriter = fileWriter; } public void close() throws IOException { if (reader != null) { reader.close(); } - if (file != null) { - file.delete(); + if (fileWriter != null) { + fileWriter.delete(); } } @@ -34,7 +52,7 @@ public boolean isOpen() { } public BufferedReader open() throws IOException { - reader = new BufferedReader(new FileReader(file)); + reader = new BufferedReader(fileWriter.getReader()); return reader; } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/transport/file/FileOutgoingTransport.java b/symmetric/src/main/java/org/jumpmind/symmetric/transport/file/FileOutgoingTransport.java index 0822f6c745..c534cfc8f6 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/transport/file/FileOutgoingTransport.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/transport/file/FileOutgoingTransport.java @@ -1,32 +1,46 @@ +/* + * SymmetricDS is an open source database synchronization solution. + * + * Copyright (C) Chris Henson + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see + * . + */ package org.jumpmind.symmetric.transport.file; import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; import java.io.IOException; +import java.io.Reader; +import org.jumpmind.symmetric.io.ThresholdFileWriter; import org.jumpmind.symmetric.transport.IOutgoingTransport; -import org.jumpmind.symmetric.util.AppUtils; /** * An outgoing transport that writes to the file system */ public class FileOutgoingTransport implements IOutgoingTransport { - File file; - BufferedWriter out; - public FileOutgoingTransport(File file) throws IOException { - this.file = file; - } + ThresholdFileWriter fileWriter; - public FileOutgoingTransport() throws IOException { - this.file = AppUtils.createTempFile("extract"); + public FileOutgoingTransport(long threshold, String tempFileCategory) throws IOException { + this.fileWriter = new ThresholdFileWriter(threshold, tempFileCategory); } public BufferedWriter open() throws IOException { - out = new BufferedWriter(new FileWriter(file)); + out = new BufferedWriter(fileWriter); return out; } @@ -38,10 +52,15 @@ public void close() throws IOException { out.close(); out = null; } - - public File getFile() { - return file; + + public Reader getReader() throws IOException { + return this.fileWriter.getReader(); } + public void delete() { + if (fileWriter != null) { + fileWriter.delete(); + } + } } diff --git a/symmetric/src/main/resources/symmetric-default.properties b/symmetric/src/main/resources/symmetric-default.properties index 8dff45c3b0..095e1a9601 100644 --- a/symmetric/src/main/resources/symmetric-default.properties +++ b/symmetric/src/main/resources/symmetric-default.properties @@ -127,6 +127,14 @@ initial.load.delete.first=false # This property can be overridden in the database stream.to.file.enabled=true +# If stream.to.file.enabled is true, then the threshold number of bytes at which a file +# will be written is controlled by this property. Note that for a synchronization the +# entire payload of the synchronization will be buffered in memory up to this number (at +# which point it will be written and continue to stream to disk) +# +# This property can be overridden in the database +stream.to.file.threshold.bytes=20480 + # Set this if tables should be created prior to an initial load # # This property can be overridden in the database diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/io/ThresholdFileWriterUnitTest.java b/symmetric/src/test/java/org/jumpmind/symmetric/io/ThresholdFileWriterUnitTest.java new file mode 100644 index 0000000000..205d1bc7b3 --- /dev/null +++ b/symmetric/src/test/java/org/jumpmind/symmetric/io/ThresholdFileWriterUnitTest.java @@ -0,0 +1,39 @@ +package org.jumpmind.symmetric.io; + +import java.io.File; + +import org.apache.commons.io.IOUtils; +import org.junit.Assert; +import org.junit.Test; + +public class ThresholdFileWriterUnitTest { + + final String TEST_STR = "The quick brown fox jumped over the lazy dog"; + + @Test + public void testNoWriteToFile() throws Exception { + File file = getTestFile(); + ThresholdFileWriter writer = new ThresholdFileWriter(TEST_STR.length() + 1, file); + writer.write(TEST_STR); + Assert.assertFalse(file.exists()); + Assert.assertEquals(TEST_STR, IOUtils.toString(writer.getReader())); + file.delete(); + } + + @Test + public void testWriteToFile() throws Exception { + File file = getTestFile(); + ThresholdFileWriter writer = new ThresholdFileWriter(TEST_STR.length() - 1, file); + writer.write(TEST_STR); + Assert.assertTrue(file.exists()); + Assert.assertEquals(TEST_STR, IOUtils.toString(writer.getReader())); + file.delete(); + } + + private File getTestFile() { + File file = new File("target/test/buffered.file.writer.tst"); + file.getParentFile().mkdirs(); + file.delete(); + return file; + } +}