Skip to content

Commit

Permalink
0002574: Improve performance of the transfer to and from staging
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Apr 28, 2016
1 parent 0a03d49 commit 9644ef8
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 25 deletions.
Expand Up @@ -20,6 +20,7 @@
*/
package org.jumpmind.symmetric.service.impl;

import java.io.BufferedWriter;
import java.io.OutputStream;
import java.io.Writer;
import java.sql.SQLException;
Expand Down Expand Up @@ -68,6 +69,7 @@
import org.jumpmind.symmetric.io.data.reader.ExtractDataReader;
import org.jumpmind.symmetric.io.data.reader.IExtractDataReaderSource;
import org.jumpmind.symmetric.io.data.reader.ProtocolDataReader;
import org.jumpmind.symmetric.io.data.reader.SimpleStagingDataReader;
import org.jumpmind.symmetric.io.data.transform.TransformPoint;
import org.jumpmind.symmetric.io.data.transform.TransformTable;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
Expand Down Expand Up @@ -413,7 +415,7 @@ public List<OutgoingBatchWithPayload> extractToPayload(ProcessInfo processInfo,
useDelimiterIdentifiers, symmetricDialect.getBinaryEncoding(),
useJdbcTimestampFormat, useUpsertStatements);
List<OutgoingBatch> extractedBatches = extract(processInfo, targetNode,
activeBatches, writer, ExtractMode.FOR_PAYLOAD_CLIENT);
activeBatches, writer, null, ExtractMode.FOR_PAYLOAD_CLIENT);

List<OutgoingBatchWithPayload> batchesWithPayload = new ArrayList<OutgoingBatchWithPayload>();
for (OutgoingBatch batch : extractedBatches) {
Expand Down Expand Up @@ -466,10 +468,11 @@ public List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode, Str
List<OutgoingBatch> activeBatches = filterBatchesForExtraction(batches, channelMap);

if (activeBatches.size() > 0) {
BufferedWriter writer = transport.openWriter();
IDataWriter dataWriter = new ProtocolDataWriter(nodeService.findIdentityNodeId(),
transport.openWriter(), targetNode.requires13Compatiblity());
writer, targetNode.requires13Compatiblity());

return extract(processInfo, targetNode, activeBatches, dataWriter,
return extract(processInfo, targetNode, activeBatches, dataWriter, writer,
ExtractMode.FOR_SYM_CLIENT);
}

Expand Down Expand Up @@ -498,7 +501,7 @@ public boolean extractOnlyOutgoingBatch(String nodeId, long batchId, Writer writ
writer, targetNode.requires13Compatiblity());
List<OutgoingBatch> batches = new ArrayList<OutgoingBatch>(1);
batches.add(batch);
batches = extract(new ProcessInfo(), targetNode, batches, dataWriter,
batches = extract(new ProcessInfo(), targetNode, batches, dataWriter, null,
ExtractMode.EXTRACT_ONLY);
extracted = batches.size() > 0;
}
Expand All @@ -507,7 +510,7 @@ public boolean extractOnlyOutgoingBatch(String nodeId, long batchId, Writer writ
}

protected List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode,
List<OutgoingBatch> activeBatches, IDataWriter dataWriter, ExtractMode mode) {
List<OutgoingBatch> activeBatches, IDataWriter dataWriter, BufferedWriter writer, ExtractMode mode) {
boolean streamToFileEnabled = parameterService
.is(ParameterConstants.STREAM_TO_FILE_ENABLED);
List<OutgoingBatch> processedBatches = new ArrayList<OutgoingBatch>(activeBatches.size());
Expand Down Expand Up @@ -566,7 +569,7 @@ protected List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode,
if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT) {
processInfo.setStatus(ProcessInfo.Status.TRANSFERRING);
currentBatch = sendOutgoingBatch(processInfo, targetNode, currentBatch,
dataWriter, mode);
dataWriter, writer, mode);
}

processedBatches.add(currentBatch);
Expand Down Expand Up @@ -842,7 +845,7 @@ protected boolean isPreviouslyExtracted(OutgoingBatch currentBatch) {
}

protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNode,
OutgoingBatch currentBatch, IDataWriter dataWriter, ExtractMode mode) {
OutgoingBatch currentBatch, IDataWriter dataWriter, BufferedWriter writer, ExtractMode mode) {
if (currentBatch.getStatus() != Status.OK || ExtractMode.EXTRACT_ONLY == mode) {
currentBatch.setSentCount(currentBatch.getSentCount() + 1);
changeBatchStatus(Status.SE, currentBatch, mode);
Expand All @@ -851,25 +854,31 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo

IStagedResource extractedBatch = getStagedResource(currentBatch);
if (extractedBatch != null) {
IDataReader dataReader = new ProtocolDataReader(BatchType.EXTRACT,
currentBatch.getNodeId(), extractedBatch);

DataContext ctx = new DataContext();
ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode);
ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, nodeService.findIdentity());
new DataProcessor(dataReader, new ProcessInfoDataWriter(dataWriter, processInfo), "send from stage")
.process(ctx);
if (dataWriter.getStatistics().size() > 0) {
Statistics stats = dataWriter.getStatistics().values().iterator().next();
statisticManager.incrementDataSent(currentBatch.getChannelId(),
stats.get(DataWriterStatisticConstants.STATEMENTCOUNT));
long byteCount = stats.get(DataWriterStatisticConstants.BYTECOUNT);
statisticManager.incrementDataBytesSent(currentBatch.getChannelId(), byteCount);
if (mode == ExtractMode.FOR_SYM_CLIENT && writer != null) {
DataContext ctx = new DataContext();
SimpleStagingDataReader dataReader = new SimpleStagingDataReader(BatchType.EXTRACT,
currentBatch.getBatchId(), currentBatch.getNodeId(), extractedBatch, writer, ctx);
dataReader.process();
} else {
log.warn("Could not find recorded statistics for batch {}",
currentBatch.getNodeBatchId());
IDataReader dataReader = new ProtocolDataReader(BatchType.EXTRACT,
currentBatch.getNodeId(), extractedBatch);

DataContext ctx = new DataContext();
ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode);
ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, nodeService.findIdentity());
new DataProcessor(dataReader, new ProcessInfoDataWriter(dataWriter, processInfo), "send from stage")
.process(ctx);
if (dataWriter.getStatistics().size() > 0) {
Statistics stats = dataWriter.getStatistics().values().iterator().next();
statisticManager.incrementDataSent(currentBatch.getChannelId(),
stats.get(DataWriterStatisticConstants.STATEMENTCOUNT));
long byteCount = stats.get(DataWriterStatisticConstants.BYTECOUNT);
statisticManager.incrementDataBytesSent(currentBatch.getChannelId(), byteCount);
} else {
log.warn("Could not find recorded statistics for batch {}",
currentBatch.getNodeBatchId());
}
}

} else {
throw new IllegalStateException(String.format(
"Could not find the staged resource for batch %s",
Expand Down
@@ -0,0 +1,86 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.io.data.reader;

import java.io.BufferedReader;
import java.io.BufferedWriter;

import org.apache.commons.io.IOUtils;
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.io.data.Batch.BatchType;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleStagingDataReader {
final static int MAX_WRITE_LENGTH = 32768;

protected final Logger log = LoggerFactory.getLogger(getClass());

protected BatchType batchType;
protected long batchId;
protected String targetNodeId;
protected IStagedResource stagedResource;
protected BufferedWriter writer;
protected BufferedReader reader;
protected DataContext context;

public SimpleStagingDataReader(BatchType batchType, long batchId, String targetNodeId, IStagedResource stagedResource,
BufferedWriter writer, DataContext context) {
this.batchType = batchType;
this.targetNodeId = targetNodeId;
this.stagedResource = stagedResource;
this.writer = writer;
this.context = context;
}

public void process() {
reader = stagedResource.getReader();
char[] buffer = new char[MAX_WRITE_LENGTH];
long totalCharsRead = 0;
int numCharsRead = 0;
long startTime = System.currentTimeMillis(), ts = startTime;

try {
while ((numCharsRead = reader.read(buffer)) != -1) {
writer.write(buffer, 0, numCharsRead);
totalCharsRead += numCharsRead;

if (Thread.currentThread().isInterrupted()) {
throw new IoException("This thread was interrupted");
}

if (System.currentTimeMillis() - ts > 60000) {
log.info("Batch '{}', for node '{}', for process 'send from stage' has been processing for {} seconds. The following stats have been gathered: {}",
new Object[] { batchId, targetNodeId, (System.currentTimeMillis() - startTime) / 1000,
"BYTES=" + totalCharsRead });
ts = System.currentTimeMillis();
}
}
} catch (Throwable t) {
throw new RuntimeException(t);
} finally {
IOUtils.closeQuietly(reader);
}
}

}
Expand Up @@ -40,7 +40,7 @@

public class SimpleStagingDataWriter {

final static int MAX_WRITE_LENGTH = 262144;
final static int MAX_WRITE_LENGTH = 32768;

protected final Logger log = LoggerFactory.getLogger(getClass());

Expand Down

0 comments on commit 9644ef8

Please sign in to comment.