Skip to content

Commit

Permalink
0004394: Extract statistics can be overwritten and inaccurrate
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed May 12, 2020
1 parent 4bf2580 commit 55465ce
Showing 1 changed file with 13 additions and 7 deletions.
Expand Up @@ -42,6 +42,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
Expand Down Expand Up @@ -78,7 +79,6 @@
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.SqlException;
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.AbstractSymmetricEngine;
import org.jumpmind.symmetric.ISymmetricEngine;
Expand Down Expand Up @@ -993,8 +993,11 @@ final protected boolean changeBatchStatus(Status status, OutgoingBatch currentBa
*/
final protected OutgoingBatch requeryIfEnoughTimeHasPassed(long ts, OutgoingBatch currentBatch) {
if (System.currentTimeMillis() - ts > MS_PASSED_BEFORE_BATCH_REQUERIED) {
currentBatch = outgoingBatchService.findOutgoingBatch(currentBatch.getBatchId(),
OutgoingBatch batch = outgoingBatchService.findOutgoingBatch(currentBatch.getBatchId(),
currentBatch.getNodeId());
if (batch != null && !batch.getStatus().equals(currentBatch.getStatus())) {
currentBatch.setStatus(batch.getStatus());
}
}
return currentBatch;
}
Expand Down Expand Up @@ -1070,7 +1073,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo extractInfo, Node targe
}
}
extractTimeInMs = System.currentTimeMillis() - ts;
Statistics stats = getExtractStats(writer);
Statistics stats = getExtractStats(writer, currentBatch);
if (stats != null) {
transformTimeInMs = stats.get(DataWriterStatisticConstants.TRANSFORMMILLIS);
currentBatch.setDataRowCount(stats.get(DataWriterStatisticConstants.ROWCOUNT));
Expand Down Expand Up @@ -1264,18 +1267,21 @@ protected ExtractDataReader buildExtractDataReader(Node sourceNode, Node targetN
new SelectFromSymDataSource(currentBatch, sourceNode, targetNode, processInfo, containsBigLob));
}

protected Statistics getExtractStats(IDataWriter writer) {
protected Statistics getExtractStats(IDataWriter writer, OutgoingBatch currentBatch) {
Map<Batch, Statistics> statisticsMap = null;
if (writer instanceof TransformWriter) {
statisticsMap = ((TransformWriter) writer).getNestedWriter().getStatistics();
} else {
statisticsMap = writer.getStatistics();
}
if (statisticsMap.size() > 0) {
return statisticsMap.values().iterator().next();
} else {
return null;
for (Entry<Batch, Statistics> entry : statisticsMap.entrySet()) {
if (entry.getKey().getBatchId() == currentBatch.getBatchId()) {
return entry.getValue();
}
}
}
return null;
}

protected IDataWriter wrapWithTransformWriter(Node sourceNode, Node targetNode, ProcessInfo processInfo, IDataWriter dataWriter,
Expand Down

0 comments on commit 55465ce

Please sign in to comment.