Skip to content

Commit

Permalink
0002577: Separate thread for transfer and loading or extract
Browse files Browse the repository at this point in the history
  • Loading branch information
mmichalek committed Jun 30, 2016
1 parent 30ac850 commit b4133eb
Showing 1 changed file with 6 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -523,13 +523,14 @@ protected List<OutgoingBatch> extract(final ProcessInfo processInfo, final Node
Set<String> channelsProcessed = new HashSet<String>();
long batchesSelectedAtMs = System.currentTimeMillis();
OutgoingBatch currentBatch = null;
ExecutorService executor = null;
try {
final long maxBytesToSync = parameterService.getLong(ParameterConstants.TRANSPORT_MAX_BYTES_TO_SYNC);
final boolean streamToFileEnabled = parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED);
long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE);
Node sourceNode = nodeService.findIdentity();
final FutureExtractStatus status = new FutureExtractStatus();
ExecutorService executor = Executors.newFixedThreadPool(1, new DataExtractorThreadFactory());
executor = Executors.newFixedThreadPool(1, new DataExtractorThreadFactory());
List<Future<FutureOutgoingBatch>> futures = new ArrayList<Future<FutureOutgoingBatch>>();

for (int i = 0; i < activeBatches.size(); i++) {
Expand Down Expand Up @@ -677,6 +678,10 @@ public FutureOutgoingBatch call() throws Exception {
log.error("Could not log the outgoing batch status because the batch was null",
e);
}
} finally {
if (executor != null) {
executor.shutdown();
}
}

// Next, we update the node channel controls to the
Expand Down

0 comments on commit b4133eb

Please sign in to comment.