From 55465cecbe77f8dc9d17030bf54ad698216284b3 Mon Sep 17 00:00:00 2001 From: Eric Long Date: Tue, 12 May 2020 10:59:02 -0400 Subject: [PATCH] 0004394: Extract statistics can be overwritten and inaccurrate --- .../service/impl/DataExtractorService.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 8ab3ad5a3e..a2b2ad75a5 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -42,6 +42,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; @@ -78,7 +79,6 @@ import org.jumpmind.db.sql.ISqlRowMapper; import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.db.sql.Row; -import org.jumpmind.db.sql.SqlException; import org.jumpmind.exception.IoException; import org.jumpmind.symmetric.AbstractSymmetricEngine; import org.jumpmind.symmetric.ISymmetricEngine; @@ -993,8 +993,11 @@ final protected boolean changeBatchStatus(Status status, OutgoingBatch currentBa */ final protected OutgoingBatch requeryIfEnoughTimeHasPassed(long ts, OutgoingBatch currentBatch) { if (System.currentTimeMillis() - ts > MS_PASSED_BEFORE_BATCH_REQUERIED) { - currentBatch = outgoingBatchService.findOutgoingBatch(currentBatch.getBatchId(), + OutgoingBatch batch = outgoingBatchService.findOutgoingBatch(currentBatch.getBatchId(), currentBatch.getNodeId()); + if (batch != null && !batch.getStatus().equals(currentBatch.getStatus())) { + currentBatch.setStatus(batch.getStatus()); + } } return currentBatch; } @@ -1070,7 +1073,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo extractInfo, Node targe } } extractTimeInMs = System.currentTimeMillis() - ts; - Statistics stats = getExtractStats(writer); + Statistics stats = getExtractStats(writer, currentBatch); if (stats != null) { transformTimeInMs = stats.get(DataWriterStatisticConstants.TRANSFORMMILLIS); currentBatch.setDataRowCount(stats.get(DataWriterStatisticConstants.ROWCOUNT)); @@ -1264,7 +1267,7 @@ protected ExtractDataReader buildExtractDataReader(Node sourceNode, Node targetN new SelectFromSymDataSource(currentBatch, sourceNode, targetNode, processInfo, containsBigLob)); } - protected Statistics getExtractStats(IDataWriter writer) { + protected Statistics getExtractStats(IDataWriter writer, OutgoingBatch currentBatch) { Map statisticsMap = null; if (writer instanceof TransformWriter) { statisticsMap = ((TransformWriter) writer).getNestedWriter().getStatistics(); @@ -1272,10 +1275,13 @@ protected Statistics getExtractStats(IDataWriter writer) { statisticsMap = writer.getStatistics(); } if (statisticsMap.size() > 0) { - return statisticsMap.values().iterator().next(); - } else { - return null; + for (Entry entry : statisticsMap.entrySet()) { + if (entry.getKey().getBatchId() == currentBatch.getBatchId()) { + return entry.getValue(); + } + } } + return null; } protected IDataWriter wrapWithTransformWriter(Node sourceNode, Node targetNode, ProcessInfo processInfo, IDataWriter dataWriter,