diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 7bb52f34d8..94acb4c298 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -607,6 +607,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe IDataWriter dataWriter, OutgoingBatch currentBatch, boolean useStagingDataWriter, boolean updateBatchStatistics, ExtractMode mode) { if (currentBatch.getStatus() != Status.OK || ExtractMode.EXTRACT_ONLY == mode) { + Node sourceNode = nodeService.findIdentity(); TransformWriter transformExtractWriter = null; @@ -649,7 +650,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe } finally { transformExtractWriter.close(); } - } else if (!isPreviouslyExtracted(currentBatch)) { + } else if (!isPreviouslyExtracted(currentBatch)) { int maxPermits = parameterService.getInt(ParameterConstants.CONCURRENT_WORKERS); String semaphoreKey = useStagingDataWriter ? Long.toString(currentBatch .getBatchId()) : currentBatch.getNodeBatchId(); @@ -670,6 +671,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe synchronized (lock) { if (!isPreviouslyExtracted(currentBatch)) { + currentBatch.resetStats(); currentBatch.setExtractCount(currentBatch.getExtractCount() + 1); if (updateBatchStatistics) { changeBatchStatus(Status.QY, currentBatch, mode);