Skip to content

Commit

Permalink
0002577: Separate thread for transfer and loading
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Apr 27, 2016
1 parent c3b0c22 commit 761b081
Showing 1 changed file with 82 additions and 23 deletions.
Expand Up @@ -37,6 +37,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Table;
Expand Down Expand Up @@ -461,10 +467,15 @@ protected List<IncomingBatch> loadDataFromTransport(final ProcessInfo processInf
String targetNodeId = nodeService.findIdentityNodeId();
if (parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) {
processInfo.setStatus(ProcessInfo.Status.TRANSFERRING);
ExecutorService executor = Executors.newFixedThreadPool(1, new DataLoaderThreadFactory());
LoadIntoDatabaseOnArrivalListener loadListener = new LoadIntoDatabaseOnArrivalListener(processInfo,
sourceNode.getNodeId(), listener);
sourceNode.getNodeId(), listener, executor);
new SimpleStagingDataWriter(transport.openReader(), stagingManager, Constants.STAGING_CATEGORY_INCOMING,
memoryThresholdInBytes, BatchType.LOAD, targetNodeId, ctx, loadListener).process();

while (!loadListener.isDone()) {
Thread.sleep(1000);
}
} else {
DataProcessor processor = new DataProcessor(new ProtocolDataReader(BatchType.LOAD,
targetNodeId, transport.openReader()), null, listener, "data load") {
Expand Down Expand Up @@ -818,6 +829,23 @@ public IncomingError mapRow(Row rs) {
}
}

class DataLoaderThreadFactory implements ThreadFactory {
AtomicInteger threadNumber = new AtomicInteger(1);
String namePrefix = parameterService.getEngineName().toLowerCase() + "-data-loader-";

public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setName(namePrefix + threadNumber.getAndIncrement());
if (thread.isDaemon()) {
thread.setDaemon(false);
}
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
return thread;
}
}

class LoadIntoDatabaseOnArrivalListener implements IProtocolDataWriterListener {

private ManageIncomingBatchListener listener;
Expand All @@ -827,44 +855,75 @@ class LoadIntoDatabaseOnArrivalListener implements IProtocolDataWriterListener {
private String sourceNodeId;

private ProcessInfo processInfo;

private ExecutorService executor;

private List<Future<IncomingBatch>> futures = new ArrayList<Future<IncomingBatch>>();

private boolean isError;

public LoadIntoDatabaseOnArrivalListener(ProcessInfo processInfo, String sourceNodeId,
ManageIncomingBatchListener listener) {
ManageIncomingBatchListener listener, ExecutorService executor) {
this.sourceNodeId = sourceNodeId;
this.listener = listener;
this.processInfo = processInfo;
this.executor = executor;
}

public void start(DataContext ctx, Batch batch) {
batchStartsToArriveTimeInMs = System.currentTimeMillis();
processInfo.setStatus(ProcessInfo.Status.TRANSFERRING);
}

public void end(DataContext ctx, Batch batch, IStagedResource resource) {

long networkMillis = System.currentTimeMillis() - batchStartsToArriveTimeInMs;
public void end(final DataContext ctx, final Batch batch, final IStagedResource resource) {
final long networkMillis = System.currentTimeMillis() - batchStartsToArriveTimeInMs;

try {
processInfo.setStatus(ProcessInfo.Status.LOADING);
DataProcessor processor = new DataProcessor(new ProtocolDataReader(BatchType.LOAD,
batch.getTargetNodeId(), resource), null, listener, "data load from stage") {
@Override
protected IDataWriter chooseDataWriter(Batch batch) {
return buildDataWriter(processInfo, sourceNodeId, batch.getChannelId(),
batch.getBatchId());
}
};

processor.process(ctx);
} finally {
if (listener.currentBatch != null) {
listener.currentBatch.setNetworkMillis(networkMillis);
if (batch.isIgnored()) {
listener.currentBatch.incrementIgnoreCount();
Callable<IncomingBatch> loadBatchFromStage = new Callable<IncomingBatch>() {
public IncomingBatch call() throws Exception {
IncomingBatch incomingBatch = null;
if (!isError) {
try {
processInfo.setStatus(ProcessInfo.Status.LOADING);
DataProcessor processor = new DataProcessor(new ProtocolDataReader(BatchType.LOAD,
batch.getTargetNodeId(), resource), null, listener, "data load from stage") {
@Override
protected IDataWriter chooseDataWriter(Batch batch) {
return buildDataWriter(processInfo, sourceNodeId, batch.getChannelId(),
batch.getBatchId());
}
};

processor.process(ctx);
} catch (Exception e) {
isError = true;
throw e;
} finally {
incomingBatch = listener.currentBatch;
if (incomingBatch != null) {
incomingBatch.setNetworkMillis(networkMillis);
if (batch.isIgnored()) {
incomingBatch.incrementIgnoreCount();
}
}
resource.setState(State.DONE);
}
}
return incomingBatch;
}
};
futures.add(executor.submit(loadBatchFromStage));
}

public boolean isDone() throws Exception {
boolean isDone = true;
for (Future<IncomingBatch> future : futures) {
if (future.isDone()) {
future.get();
} else {
isDone = false;
}
resource.setState(State.DONE);
}
return isDone;
}
}

Expand Down

0 comments on commit 761b081

Please sign in to comment.