diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 14b3e133a3..5ffed1d439 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -35,7 +35,14 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang.StringUtils; import org.jumpmind.db.io.DatabaseXmlUtil; @@ -509,10 +516,9 @@ public boolean extractOnlyOutgoingBatch(String nodeId, long batchId, Writer writ return extracted; } - protected List extract(ProcessInfo processInfo, Node targetNode, - List activeBatches, IDataWriter dataWriter, BufferedWriter writer, ExtractMode mode) { - boolean streamToFileEnabled = parameterService - .is(ParameterConstants.STREAM_TO_FILE_ENABLED); + protected List extract(final ProcessInfo processInfo, final Node targetNode, + List activeBatches, final IDataWriter dataWriter, final BufferedWriter writer, final ExtractMode mode) { + boolean streamToFileEnabled = parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED); List processedBatches = new ArrayList(activeBatches.size()); if (activeBatches.size() > 0) { Set channelsProcessed = new HashSet(); @@ -522,8 +528,13 @@ protected List extract(ProcessInfo processInfo, Node targetNode, long bytesSentCount = 0; int batchesSentCount = 0; - long maxBytesToSync = parameterService - .getLong(ParameterConstants.TRANSPORT_MAX_BYTES_TO_SYNC); + long maxBytesToSync = parameterService.getLong(ParameterConstants.TRANSPORT_MAX_BYTES_TO_SYNC); + ExecutorService executor = null; + List> futures = null; + if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT) { + executor = Executors.newFixedThreadPool(1, new DataExtractorThreadFactory()); + futures = new ArrayList>(); + } for (int i = 0; i < activeBatches.size(); i++) { currentBatch = activeBatches.get(i); @@ -565,33 +576,44 @@ protected List extract(ProcessInfo processInfo, Node targetNode, currentBatch = extractOutgoingBatch(processInfo, targetNode, dataWriter, currentBatch, streamToFileEnabled, true, mode); } - + if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT) { processInfo.setStatus(ProcessInfo.Status.TRANSFERRING); - currentBatch = sendOutgoingBatch(processInfo, targetNode, currentBatch, - dataWriter, writer, mode); + final OutgoingBatch sendBatch = currentBatch; + Callable callable = new Callable() { + public OutgoingBatch call() throws Exception { + return sendOutgoingBatch(processInfo, targetNode, sendBatch, dataWriter, writer, mode); + } + }; + futures.add(executor.submit(callable)); + } else { + if (currentBatch.getStatus() != Status.OK) { + currentBatch.setLoadCount(currentBatch.getLoadCount() + 1); + changeBatchStatus(Status.LD, currentBatch, mode); + } } processedBatches.add(currentBatch); - - if (currentBatch.getStatus() != Status.OK) { - currentBatch.setLoadCount(currentBatch.getLoadCount() + 1); - changeBatchStatus(Status.LD, currentBatch, mode); - - bytesSentCount += currentBatch.getByteCount(); - batchesSentCount++; - - if (bytesSentCount >= maxBytesToSync && processedBatches.size() < activeBatches.size()) { - log.info( - "Reached the total byte threshold after {} of {} batches were extracted for node '{}'. The remaining batches will be extracted on a subsequent sync", - new Object[] { batchesSentCount, activeBatches.size(), - targetNode.getNodeId() }); - break; - } - } + bytesSentCount += currentBatch.getByteCount(); + batchesSentCount++; + + if (bytesSentCount >= maxBytesToSync && processedBatches.size() < activeBatches.size()) { + log.info( + "Reached the total byte threshold after {} of {} batches were extracted for node '{}'. The remaining batches will be extracted on a subsequent sync", + new Object[] { batchesSentCount, activeBatches.size(), + targetNode.getNodeId() }); + break; + } } - } catch (RuntimeException e) { + if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT) { + executor.shutdown(); + executor.awaitTermination(12, TimeUnit.HOURS); + for (Future future : futures) { + currentBatch = future.get(); + } + } + } catch (Exception e) { SQLException se = unwrapSqlException(e); if (currentBatch != null) { /* Reread batch in case the ignore flag has been set */ @@ -1825,4 +1847,21 @@ public String getInitialLoadSelect() { } + class DataExtractorThreadFactory implements ThreadFactory { + AtomicInteger threadNumber = new AtomicInteger(1); + String namePrefix = parameterService.getEngineName().toLowerCase() + "-data-extractor-"; + + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable); + thread.setName(namePrefix + threadNumber.getAndIncrement()); + if (thread.isDaemon()) { + thread.setDaemon(false); + } + if (thread.getPriority() != Thread.NORM_PRIORITY) { + thread.setPriority(Thread.NORM_PRIORITY); + } + return thread; + } + } + }