Skip to content

Commit

Permalink
fix for sqlite to use single thread
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Sep 22, 2016
1 parent 1aa51b0 commit bf900aa
Showing 1 changed file with 28 additions and 10 deletions.
Expand Up @@ -624,6 +624,21 @@ public FutureOutgoingBatch call() throws Exception {
futures.add(executor.submit(callable));
}

if (parameterService.is(ParameterConstants.SYNCHRONIZE_ALL_JOBS)) {
executor.shutdown();
boolean isProcessed = false;
while (!isProcessed) {
try {
isProcessed = executor.awaitTermination(keepAliveMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (!isProcessed) {
writeKeepAliveAck(writer, sourceNode, streamToFileEnabled);
}
}
}

Iterator<OutgoingBatch> activeBatchIter = activeBatches.iterator();
for (Future<FutureOutgoingBatch> future : futures) {
currentBatch = activeBatchIter.next();
Expand All @@ -632,7 +647,6 @@ public FutureOutgoingBatch call() throws Exception {
try {
FutureOutgoingBatch extractBatch = future.get(keepAliveMillis, TimeUnit.MILLISECONDS);
currentBatch = extractBatch.getOutgoingBatch();


if (extractBatch.isExtractSkipped) {
break;
Expand Down Expand Up @@ -664,17 +678,10 @@ public FutureOutgoingBatch call() throws Exception {
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
try {
if (writer != null && streamToFileEnabled) {
writer.write(CsvConstants.NODEID + "," + sourceNode.getNodeId());
writer.newLine();
writer.flush();
}
} catch (IOException ex) {
}
writeKeepAliveAck(writer, sourceNode, streamToFileEnabled);
}
}
}
}
} catch (RuntimeException e) {
SQLException se = unwrapSqlException(e);
if (currentBatch != null) {
Expand Down Expand Up @@ -740,6 +747,17 @@ public FutureOutgoingBatch call() throws Exception {
}
}

protected void writeKeepAliveAck(BufferedWriter writer, Node sourceNode, boolean streamToFileEnabled) {
try {
if (writer != null && streamToFileEnabled) {
writer.write(CsvConstants.NODEID + "," + sourceNode.getNodeId());
writer.newLine();
writer.flush();
}
} catch (IOException ex) {
}
}

final protected boolean changeBatchStatus(Status status, OutgoingBatch currentBatch, ExtractMode mode) {
if (currentBatch.getStatus() != Status.IG) {
currentBatch.setStatus(status);
Expand Down

0 comments on commit bf900aa

Please sign in to comment.