Skip to content
Permalink
Browse files

0003913: Exception during keep alive can cause concurrent loading of …

…batches
  • Loading branch information...
mmichalek committed Apr 11, 2019
1 parent 2139c4d commit 6d82ba32553cfb14eb2f0195a7f275ef0bb1b483
@@ -576,18 +576,19 @@ public void loadDataFromConfig(Node remote, RemoteNodeStatus status, boolean for

OutputStreamWriter outWriter = null;
if (out != null) {
outWriter = new OutputStreamWriter(out, IoConstants.ENCODING);
long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE);
while (!executor.awaitTermination(keepAliveMillis, TimeUnit.MILLISECONDS)) {
outWriter.write("1=1&");
outWriter.flush();
try {
outWriter = new OutputStreamWriter(out, IoConstants.ENCODING);
long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE);
while (!executor.awaitTermination(keepAliveMillis, TimeUnit.MILLISECONDS)) {
outWriter.write("1=1&");
outWriter.flush();
}
} catch (Exception ex) {
log.warn("Failed to send keep alives to " + sourceNode + " " + ex.toString());
awaitTermination(executor);
}
} else {
long hours = 1;
while (!executor.awaitTermination(1, TimeUnit.HOURS)) {
log.info(String.format("Executor has been awaiting loader termination for %d hour(s).", hours));
hours++;
}
awaitTermination(executor);
}

loadListener.isDone();
@@ -633,6 +634,14 @@ protected IDataWriter chooseDataWriter(Batch batch) {
return listener.getBatchesProcessed();
}

private void awaitTermination(ExecutorService executor) throws InterruptedException {
long hours = 1;
while (!executor.awaitTermination(1, TimeUnit.HOURS)) {
log.info(String.format("Executor has been awaiting loader termination for %d hour(s).", hours));
hours++;
}
}

protected void logOrRethrow(Throwable ex) throws IOException {
if (ex instanceof RegistrationRequiredException) {
throw (RegistrationRequiredException) ex;

0 comments on commit 6d82ba3

Please sign in to comment.
You can’t perform that action at this time.