From bf900aafe930a76af6fee69a4cb5ffe1c81ca5ba Mon Sep 17 00:00:00 2001 From: elong Date: Thu, 22 Sep 2016 19:52:31 -0400 Subject: [PATCH] fix for sqlite to use single thread --- .../service/impl/DataExtractorService.java | 38 ++++++++++++++----- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 97abb0db74..61f6aadc80 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -624,6 +624,21 @@ public FutureOutgoingBatch call() throws Exception { futures.add(executor.submit(callable)); } + if (parameterService.is(ParameterConstants.SYNCHRONIZE_ALL_JOBS)) { + executor.shutdown(); + boolean isProcessed = false; + while (!isProcessed) { + try { + isProcessed = executor.awaitTermination(keepAliveMillis, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (!isProcessed) { + writeKeepAliveAck(writer, sourceNode, streamToFileEnabled); + } + } + } + Iterator activeBatchIter = activeBatches.iterator(); for (Future future : futures) { currentBatch = activeBatchIter.next(); @@ -632,7 +647,6 @@ public FutureOutgoingBatch call() throws Exception { try { FutureOutgoingBatch extractBatch = future.get(keepAliveMillis, TimeUnit.MILLISECONDS); currentBatch = extractBatch.getOutgoingBatch(); - if (extractBatch.isExtractSkipped) { break; @@ -664,17 +678,10 @@ public FutureOutgoingBatch call() throws Exception { } catch (InterruptedException e) { throw new RuntimeException(e); } catch (TimeoutException e) { - try { - if (writer != null && streamToFileEnabled) { - writer.write(CsvConstants.NODEID + "," + sourceNode.getNodeId()); - writer.newLine(); - writer.flush(); - } - } catch (IOException ex) { - } + writeKeepAliveAck(writer, sourceNode, streamToFileEnabled); } } - } + } } catch (RuntimeException e) { SQLException se = unwrapSqlException(e); if (currentBatch != null) { @@ -740,6 +747,17 @@ public FutureOutgoingBatch call() throws Exception { } } + protected void writeKeepAliveAck(BufferedWriter writer, Node sourceNode, boolean streamToFileEnabled) { + try { + if (writer != null && streamToFileEnabled) { + writer.write(CsvConstants.NODEID + "," + sourceNode.getNodeId()); + writer.newLine(); + writer.flush(); + } + } catch (IOException ex) { + } + } + final protected boolean changeBatchStatus(Status status, OutgoingBatch currentBatch, ExtractMode mode) { if (currentBatch.getStatus() != Status.IG) { currentBatch.setStatus(status);