Skip to content

Commit

Permalink
0002917: Don't update the status of an outgoing batch to SE until it has
Browse files Browse the repository at this point in the history
been sending for outgoing.batches.update.status.millis
  • Loading branch information
chenson42 committed Nov 23, 2016
1 parent c7211f6 commit c8bf633
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 144 deletions.
Expand Up @@ -22,10 +22,12 @@

import static org.apache.commons.lang.StringUtils.isNotBlank;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
Expand Down Expand Up @@ -64,6 +66,7 @@
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.Row;
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.AbstractSymmetricEngine;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SymmetricException;
Expand All @@ -85,7 +88,6 @@
import org.jumpmind.symmetric.io.data.reader.ExtractDataReader;
import org.jumpmind.symmetric.io.data.reader.IExtractDataReaderSource;
import org.jumpmind.symmetric.io.data.reader.ProtocolDataReader;
import org.jumpmind.symmetric.io.data.reader.SimpleStagingDataReader;
import org.jumpmind.symmetric.io.data.transform.TransformPoint;
import org.jumpmind.symmetric.io.data.transform.TransformTable;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
Expand Down Expand Up @@ -988,9 +990,6 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo
OutgoingBatch currentBatch, boolean isRetry, IDataWriter dataWriter, BufferedWriter writer, ExtractMode mode) {
if (currentBatch.getStatus() != Status.OK || ExtractMode.EXTRACT_ONLY == mode) {
currentBatch.setSentCount(currentBatch.getSentCount() + 1);
if (currentBatch.getStatus() != Status.RS) {
changeBatchStatus(Status.SE, currentBatch, mode);
}

long ts = System.currentTimeMillis();

Expand Down Expand Up @@ -1024,9 +1023,8 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo

Channel channel = configurationService.getChannel(currentBatch.getChannelId());
DataContext ctx = new DataContext();
SimpleStagingDataReader dataReader = new SimpleStagingDataReader(BatchType.EXTRACT, currentBatch.getBatchId(),
currentBatch.getNodeId(), isRetry, extractedBatch, writer, ctx, channel.getMaxKBytesPerSecond());
dataReader.process();
transferFromStaging(mode, BatchType.EXTRACT, currentBatch, isRetry, extractedBatch, writer, ctx,
channel.getMaxKBytesPerSecond());
} else {
IDataReader dataReader = new ProtocolDataReader(BatchType.EXTRACT,
currentBatch.getNodeId(), extractedBatch);
Expand Down Expand Up @@ -1060,6 +1058,90 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo
return currentBatch;

}

protected void transferFromStaging(ExtractMode mode, BatchType batchType, OutgoingBatch batch, boolean isRetry, IStagedResource stagedResource,
BufferedWriter writer, DataContext context, BigDecimal maxKBytesPerSec) {
final int MAX_WRITE_LENGTH = 32768;
BufferedReader reader = stagedResource.getReader();
try {
// Retry means we've sent this batch before, so let's ask to
// retry the batch from the target's staging
if (isRetry) {
String line = null;
while ((line = reader.readLine()) != null) {
if (line.startsWith(CsvConstants.BATCH)) {
writer.write(CsvConstants.RETRY + "," + batch.getBatchId());
writer.newLine();
writer.write(CsvConstants.COMMIT + "," + batch.getBatchId());
writer.newLine();
break;
} else {
writer.write(line);
writer.newLine();
}
}
} else {
long totalCharsRead = 0, totalBytesRead = 0;
int numCharsRead = 0, numBytesRead = 0;
long startTime = System.currentTimeMillis(), ts = startTime, bts = startTime;
boolean isThrottled = maxKBytesPerSec != null && maxKBytesPerSec.compareTo(BigDecimal.ZERO) > 0;
long totalThrottleTime = 0;
int bufferSize = MAX_WRITE_LENGTH;

if (isThrottled) {
bufferSize = maxKBytesPerSec.multiply(new BigDecimal(1024)).intValue();
}
char[] buffer = new char[bufferSize];

while ((numCharsRead = reader.read(buffer)) != -1) {
writer.write(buffer, 0, numCharsRead);
totalCharsRead += numCharsRead;

if (Thread.currentThread().isInterrupted()) {
throw new IoException("This thread was interrupted");
}

long batchStatusUpdateMillis = parameterService.getLong(ParameterConstants.OUTGOING_BATCH_UPDATE_STATUS_MILLIS);
if (System.currentTimeMillis() - ts > batchStatusUpdateMillis && batch.getStatus() != Status.SE && batch.getStatus() != Status.RS) {
changeBatchStatus(Status.SE, batch, mode);
}
if (System.currentTimeMillis() - ts > 60000) {
log.info(
"Batch '{}', for node '{}', for process 'send from stage' has been processing for {} seconds. "
+ "The following stats have been gathered: {}",
new Object[] { batch.getBatchId(), batch.getNodeId(), (System.currentTimeMillis() - startTime) / 1000,
"CHARS=" + totalCharsRead });
ts = System.currentTimeMillis();
}

if (isThrottled) {
numBytesRead += new String(buffer, 0, numCharsRead).getBytes().length;
totalBytesRead += numBytesRead;
if (numBytesRead >= bufferSize) {
long expectedMillis = (long) (((numBytesRead / 1024f) / maxKBytesPerSec.floatValue()) * 1000);
long actualMillis = System.currentTimeMillis() - bts;
if (actualMillis < expectedMillis) {
totalThrottleTime += expectedMillis - actualMillis;
Thread.sleep(expectedMillis - actualMillis);
}
numBytesRead = 0;
bts = System.currentTimeMillis();
}
}
}
if (log.isDebugEnabled() && totalThrottleTime > 0) {
log.debug("Batch '{}' for node '{}' took {}ms for {} bytes and was throttled for {}ms because limit is set to {} KB/s",
batch.getBatchId(), batch.getNodeId(), (System.currentTimeMillis() - startTime), totalBytesRead,
totalThrottleTime, maxKBytesPerSec);
}
}
} catch (Throwable t) {
throw new RuntimeException(t);
} finally {
stagedResource.close();
}
}


public boolean extractBatchRange(Writer writer, String nodeId, long startBatchId,
long endBatchId) {
Expand Down Expand Up @@ -1953,4 +2035,5 @@ public boolean isRetry() {
}
}


}

This file was deleted.

Expand Up @@ -31,7 +31,6 @@
import org.jumpmind.symmetric.model.ChannelMap;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.statistic.IStatisticManager;
import org.jumpmind.symmetric.transport.ConcurrentConnectionManager;
import org.jumpmind.symmetric.transport.IConcurrentConnectionManager;
import org.jumpmind.symmetric.transport.IConcurrentConnectionManager.ReservationType;
import org.slf4j.Logger;
Expand Down

0 comments on commit c8bf633

Please sign in to comment.