diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index dc822dadef..a3baafa354 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -37,6 +37,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang.StringUtils; import org.jumpmind.db.model.Table; @@ -461,10 +467,15 @@ protected List loadDataFromTransport(final ProcessInfo processInf String targetNodeId = nodeService.findIdentityNodeId(); if (parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) { processInfo.setStatus(ProcessInfo.Status.TRANSFERRING); + ExecutorService executor = Executors.newFixedThreadPool(1, new DataLoaderThreadFactory()); LoadIntoDatabaseOnArrivalListener loadListener = new LoadIntoDatabaseOnArrivalListener(processInfo, - sourceNode.getNodeId(), listener); + sourceNode.getNodeId(), listener, executor); new SimpleStagingDataWriter(transport.openReader(), stagingManager, Constants.STAGING_CATEGORY_INCOMING, memoryThresholdInBytes, BatchType.LOAD, targetNodeId, ctx, loadListener).process(); + + while (!loadListener.isDone()) { + Thread.sleep(1000); + } } else { DataProcessor processor = new DataProcessor(new ProtocolDataReader(BatchType.LOAD, targetNodeId, transport.openReader()), null, listener, "data load") { @@ -818,6 +829,23 @@ public IncomingError mapRow(Row rs) { } } + class DataLoaderThreadFactory implements ThreadFactory { + AtomicInteger threadNumber = new AtomicInteger(1); + String namePrefix = parameterService.getEngineName().toLowerCase() + "-data-loader-"; + + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable); + thread.setName(namePrefix + threadNumber.getAndIncrement()); + if (thread.isDaemon()) { + thread.setDaemon(false); + } + if (thread.getPriority() != Thread.NORM_PRIORITY) { + thread.setPriority(Thread.NORM_PRIORITY); + } + return thread; + } + } + class LoadIntoDatabaseOnArrivalListener implements IProtocolDataWriterListener { private ManageIncomingBatchListener listener; @@ -827,12 +855,19 @@ class LoadIntoDatabaseOnArrivalListener implements IProtocolDataWriterListener { private String sourceNodeId; private ProcessInfo processInfo; + + private ExecutorService executor; + + private List> futures = new ArrayList>(); + + private boolean isError; public LoadIntoDatabaseOnArrivalListener(ProcessInfo processInfo, String sourceNodeId, - ManageIncomingBatchListener listener) { + ManageIncomingBatchListener listener, ExecutorService executor) { this.sourceNodeId = sourceNodeId; this.listener = listener; this.processInfo = processInfo; + this.executor = executor; } public void start(DataContext ctx, Batch batch) { @@ -840,31 +875,55 @@ public void start(DataContext ctx, Batch batch) { processInfo.setStatus(ProcessInfo.Status.TRANSFERRING); } - public void end(DataContext ctx, Batch batch, IStagedResource resource) { - - long networkMillis = System.currentTimeMillis() - batchStartsToArriveTimeInMs; + public void end(final DataContext ctx, final Batch batch, final IStagedResource resource) { + final long networkMillis = System.currentTimeMillis() - batchStartsToArriveTimeInMs; - try { - processInfo.setStatus(ProcessInfo.Status.LOADING); - DataProcessor processor = new DataProcessor(new ProtocolDataReader(BatchType.LOAD, - batch.getTargetNodeId(), resource), null, listener, "data load from stage") { - @Override - protected IDataWriter chooseDataWriter(Batch batch) { - return buildDataWriter(processInfo, sourceNodeId, batch.getChannelId(), - batch.getBatchId()); - } - }; - - processor.process(ctx); - } finally { - if (listener.currentBatch != null) { - listener.currentBatch.setNetworkMillis(networkMillis); - if (batch.isIgnored()) { - listener.currentBatch.incrementIgnoreCount(); + Callable loadBatchFromStage = new Callable() { + public IncomingBatch call() throws Exception { + IncomingBatch incomingBatch = null; + if (!isError) { + try { + processInfo.setStatus(ProcessInfo.Status.LOADING); + DataProcessor processor = new DataProcessor(new ProtocolDataReader(BatchType.LOAD, + batch.getTargetNodeId(), resource), null, listener, "data load from stage") { + @Override + protected IDataWriter chooseDataWriter(Batch batch) { + return buildDataWriter(processInfo, sourceNodeId, batch.getChannelId(), + batch.getBatchId()); + } + }; + + processor.process(ctx); + } catch (Exception e) { + isError = true; + throw e; + } finally { + incomingBatch = listener.currentBatch; + if (incomingBatch != null) { + incomingBatch.setNetworkMillis(networkMillis); + if (batch.isIgnored()) { + incomingBatch.incrementIgnoreCount(); + } + } + resource.setState(State.DONE); + } } + return incomingBatch; + } + }; + futures.add(executor.submit(loadBatchFromStage)); + } + + public boolean isDone() throws Exception { + boolean isDone = true; + for (Future future : futures) { + if (future.isDone()) { + future.get(); + } else { + isDone = false; } - resource.setState(State.DONE); } + return isDone; } }