From 6d82ba32553cfb14eb2f0195a7f275ef0bb1b483 Mon Sep 17 00:00:00 2001 From: Mark Michalek Date: Thu, 11 Apr 2019 13:12:54 -0400 Subject: [PATCH] 0003913: Exception during keep alive can cause concurrent loading of batches --- .../service/impl/DataLoaderService.java | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index f23d310294..54fadaf561 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -576,18 +576,19 @@ protected List loadDataFromTransport(final ProcessInfo transferIn OutputStreamWriter outWriter = null; if (out != null) { - outWriter = new OutputStreamWriter(out, IoConstants.ENCODING); - long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE); - while (!executor.awaitTermination(keepAliveMillis, TimeUnit.MILLISECONDS)) { - outWriter.write("1=1&"); - outWriter.flush(); + try { + outWriter = new OutputStreamWriter(out, IoConstants.ENCODING); + long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE); + while (!executor.awaitTermination(keepAliveMillis, TimeUnit.MILLISECONDS)) { + outWriter.write("1=1&"); + outWriter.flush(); + } + } catch (Exception ex) { + log.warn("Failed to send keep alives to " + sourceNode + " " + ex.toString()); + awaitTermination(executor); } } else { - long hours = 1; - while (!executor.awaitTermination(1, TimeUnit.HOURS)) { - log.info(String.format("Executor has been awaiting loader termination for %d hour(s).", hours)); - hours++; - } + awaitTermination(executor); } loadListener.isDone(); @@ -633,6 +634,14 @@ protected IDataWriter chooseDataWriter(Batch batch) { return listener.getBatchesProcessed(); } + private void awaitTermination(ExecutorService executor) throws InterruptedException { + long hours = 1; + while (!executor.awaitTermination(1, TimeUnit.HOURS)) { + log.info(String.format("Executor has been awaiting loader termination for %d hour(s).", hours)); + hours++; + } + } + protected void logOrRethrow(Throwable ex) throws IOException { if (ex instanceof RegistrationRequiredException) { throw (RegistrationRequiredException) ex;