Skip to content
Permalink
Browse files

0003913: Exception during keep alive can cause concurrent loading of …

…batches
  • Loading branch information...
mmichalek committed Apr 11, 2019
1 parent b3eb819 commit b7d38cb054cea50d1f509ee4081ab5eccc95f4cb
@@ -614,18 +614,19 @@ public void loadDataFromConfig(Node remote, RemoteNodeStatus status, boolean for

OutputStreamWriter outWriter = null;
if (out != null) {
outWriter = new OutputStreamWriter(out, IoConstants.ENCODING);
long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE);
while (!executor.awaitTermination(keepAliveMillis, TimeUnit.MILLISECONDS)) {
outWriter.write("1=1&");
outWriter.flush();
try {
outWriter = new OutputStreamWriter(out, IoConstants.ENCODING);
long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE);
while (!executor.awaitTermination(keepAliveMillis, TimeUnit.MILLISECONDS)) {
outWriter.write("1=1&");
outWriter.flush();
}
} catch (Exception ex) {
log.warn("Failed to send keep alives to " + sourceNode + " " + ex.toString());
awaitTermination(executor);
}
} else {
long hours = 1;
while (!executor.awaitTermination(1, TimeUnit.HOURS)) {
log.info(String.format("Executor has been awaiting loader termination for %d hour(s).", hours));
hours++;
}
awaitTermination(executor);
}

loadListener.isDone();
@@ -679,6 +680,14 @@ protected IDataWriter chooseDataWriter(Batch batch) {
return listener.getBatchesProcessed();
}

private void awaitTermination(ExecutorService executor) throws InterruptedException {
long hours = 1;
while (!executor.awaitTermination(1, TimeUnit.HOURS)) {
log.info(String.format("Executor has been awaiting loader termination for %d hour(s).", hours));
hours++;
}
}

protected void logOrRethrow(Throwable ex) throws IOException {
// Throwing exception will mean acks are not sent, so only certain exceptions should be thrown
if (ex instanceof RegistrationRequiredException) {

0 comments on commit b7d38cb

Please sign in to comment.
You can’t perform that action at this time.