Skip to content

Commit

Permalink
0002577: Separate thread for transfer and loading or extract
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Apr 28, 2016
1 parent 78493cd commit 4945eb7
Showing 1 changed file with 65 additions and 26 deletions.
Expand Up @@ -35,7 +35,14 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.io.DatabaseXmlUtil;
Expand Down Expand Up @@ -509,10 +516,9 @@ public boolean extractOnlyOutgoingBatch(String nodeId, long batchId, Writer writ
return extracted;
}

protected List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode,
List<OutgoingBatch> activeBatches, IDataWriter dataWriter, BufferedWriter writer, ExtractMode mode) {
boolean streamToFileEnabled = parameterService
.is(ParameterConstants.STREAM_TO_FILE_ENABLED);
protected List<OutgoingBatch> extract(final ProcessInfo processInfo, final Node targetNode,
List<OutgoingBatch> activeBatches, final IDataWriter dataWriter, final BufferedWriter writer, final ExtractMode mode) {
boolean streamToFileEnabled = parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED);
List<OutgoingBatch> processedBatches = new ArrayList<OutgoingBatch>(activeBatches.size());
if (activeBatches.size() > 0) {
Set<String> channelsProcessed = new HashSet<String>();
Expand All @@ -522,8 +528,13 @@ protected List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode,

long bytesSentCount = 0;
int batchesSentCount = 0;
long maxBytesToSync = parameterService
.getLong(ParameterConstants.TRANSPORT_MAX_BYTES_TO_SYNC);
long maxBytesToSync = parameterService.getLong(ParameterConstants.TRANSPORT_MAX_BYTES_TO_SYNC);
ExecutorService executor = null;
List<Future<OutgoingBatch>> futures = null;
if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT) {
executor = Executors.newFixedThreadPool(1, new DataExtractorThreadFactory());
futures = new ArrayList<Future<OutgoingBatch>>();
}

for (int i = 0; i < activeBatches.size(); i++) {
currentBatch = activeBatches.get(i);
Expand Down Expand Up @@ -565,33 +576,44 @@ protected List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode,
currentBatch = extractOutgoingBatch(processInfo, targetNode, dataWriter,
currentBatch, streamToFileEnabled, true, mode);
}

if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT) {
processInfo.setStatus(ProcessInfo.Status.TRANSFERRING);
currentBatch = sendOutgoingBatch(processInfo, targetNode, currentBatch,
dataWriter, writer, mode);
final OutgoingBatch sendBatch = currentBatch;
Callable<OutgoingBatch> callable = new Callable<OutgoingBatch>() {
public OutgoingBatch call() throws Exception {
return sendOutgoingBatch(processInfo, targetNode, sendBatch, dataWriter, writer, mode);
}
};
futures.add(executor.submit(callable));
} else {
if (currentBatch.getStatus() != Status.OK) {
currentBatch.setLoadCount(currentBatch.getLoadCount() + 1);
changeBatchStatus(Status.LD, currentBatch, mode);
}
}

processedBatches.add(currentBatch);

if (currentBatch.getStatus() != Status.OK) {
currentBatch.setLoadCount(currentBatch.getLoadCount() + 1);
changeBatchStatus(Status.LD, currentBatch, mode);

bytesSentCount += currentBatch.getByteCount();
batchesSentCount++;

if (bytesSentCount >= maxBytesToSync && processedBatches.size() < activeBatches.size()) {
log.info(
"Reached the total byte threshold after {} of {} batches were extracted for node '{}'. The remaining batches will be extracted on a subsequent sync",
new Object[] { batchesSentCount, activeBatches.size(),
targetNode.getNodeId() });
break;
}
}
bytesSentCount += currentBatch.getByteCount();
batchesSentCount++;

if (bytesSentCount >= maxBytesToSync && processedBatches.size() < activeBatches.size()) {
log.info(
"Reached the total byte threshold after {} of {} batches were extracted for node '{}'. The remaining batches will be extracted on a subsequent sync",
new Object[] { batchesSentCount, activeBatches.size(),
targetNode.getNodeId() });
break;
}
}

} catch (RuntimeException e) {
if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT) {
executor.shutdown();
executor.awaitTermination(12, TimeUnit.HOURS);
for (Future<OutgoingBatch> future : futures) {
currentBatch = future.get();
}
}
} catch (Exception e) {
SQLException se = unwrapSqlException(e);
if (currentBatch != null) {
/* Reread batch in case the ignore flag has been set */
Expand Down Expand Up @@ -1825,4 +1847,21 @@ public String getInitialLoadSelect() {

}

class DataExtractorThreadFactory implements ThreadFactory {
AtomicInteger threadNumber = new AtomicInteger(1);
String namePrefix = parameterService.getEngineName().toLowerCase() + "-data-extractor-";

public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setName(namePrefix + threadNumber.getAndIncrement());
if (thread.isDaemon()) {
thread.setDaemon(false);
}
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
return thread;
}
}

}

0 comments on commit 4945eb7

Please sign in to comment.