diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 4003b3bc19..24131eefd9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -530,7 +530,7 @@ protected List extract(final ProcessInfo processInfo, final Node Node sourceNode = nodeService.findIdentity(); final FutureExtractStatus status = new FutureExtractStatus(); ExecutorService executor = Executors.newFixedThreadPool(1, new DataExtractorThreadFactory()); - List> futures = new ArrayList>(); + List> futures = new ArrayList>(); for (int i = 0; i < activeBatches.size(); i++) { currentBatch = activeBatches.get(i); @@ -570,13 +570,13 @@ protected List extract(final ProcessInfo processInfo, final Node } else { processInfo.setStatus(ProcessInfo.Status.EXTRACTING); final OutgoingBatch extractBatch = currentBatch; - Callable callable = new Callable() { - public OutgoingBatch call() throws Exception { - OutgoingBatch outgoingBatch = extractBatch; + Callable callable = new Callable() { + public FutureOutgoingBatch call() throws Exception { + FutureOutgoingBatch outgoingBatch = new FutureOutgoingBatch(extractBatch, false); if (!status.shouldExtractSkip) { try { - outgoingBatch = extractOutgoingBatch(processInfo, targetNode, dataWriter, outgoingBatch, - streamToFileEnabled, true, mode); + outgoingBatch = new FutureOutgoingBatch(extractOutgoingBatch(processInfo, targetNode, + dataWriter, extractBatch, streamToFileEnabled, true, mode), isRetry(extractBatch, targetNode)); status.batchExtractCount++; status.byteExtractCount += extractBatch.getByteCount(); @@ -599,16 +599,18 @@ public OutgoingBatch call() throws Exception { } Iterator activeBatchIter = activeBatches.iterator(); - for (Future future : futures) { + for (Future future : futures) { currentBatch = activeBatchIter.next(); boolean isSent = false; while (!isSent) { try { - currentBatch = future.get(keepAliveMillis, TimeUnit.MILLISECONDS); + FutureOutgoingBatch extractBatch = future.get(keepAliveMillis, TimeUnit.MILLISECONDS); + currentBatch = extractBatch.getOutgoingBatch(); if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT) { processInfo.setStatus(ProcessInfo.Status.TRANSFERRING); - currentBatch = sendOutgoingBatch(processInfo, targetNode, currentBatch, dataWriter, writer, mode); + currentBatch = sendOutgoingBatch(processInfo, targetNode, currentBatch, extractBatch.isRetry(), + dataWriter, writer, mode); } processedBatches.add(currentBatch); @@ -894,7 +896,7 @@ protected boolean isRetry(OutgoingBatch currentBatch, Node remoteNode) { } protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNode, - OutgoingBatch currentBatch, IDataWriter dataWriter, BufferedWriter writer, ExtractMode mode) { + OutgoingBatch currentBatch, boolean isRetry, IDataWriter dataWriter, BufferedWriter writer, ExtractMode mode) { if (currentBatch.getStatus() != Status.OK || ExtractMode.EXTRACT_ONLY == mode) { currentBatch.setSentCount(currentBatch.getSentCount() + 1); if (currentBatch.getStatus() != Status.RS) { @@ -909,8 +911,7 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo Channel channel = configurationService.getChannel(currentBatch.getChannelId()); DataContext ctx = new DataContext(); SimpleStagingDataReader dataReader = new SimpleStagingDataReader(BatchType.EXTRACT, currentBatch.getBatchId(), - currentBatch.getNodeId(), isRetry(currentBatch, targetNode), extractedBatch, writer, ctx, - channel.getMaxKBytesPerSecond()); + currentBatch.getNodeId(), isRetry, extractedBatch, writer, ctx, channel.getMaxKBytesPerSecond()); dataReader.process(); } else { IDataReader dataReader = new ProtocolDataReader(BatchType.EXTRACT, @@ -1901,4 +1902,22 @@ class FutureExtractStatus { int byteExtractCount; } + class FutureOutgoingBatch { + OutgoingBatch outgoingBatch; + boolean isRetry; + + public FutureOutgoingBatch(OutgoingBatch outgoingBatch, boolean isRetry) { + this.outgoingBatch = outgoingBatch; + this.isRetry = isRetry; + } + + public OutgoingBatch getOutgoingBatch() { + return outgoingBatch; + } + + public boolean isRetry() { + return isRetry; + } + } + }