Skip to content

Commit

Permalink
0003080: Show failed batch extract errors.
Browse files Browse the repository at this point in the history
  • Loading branch information
gwilmer committed Apr 28, 2017
1 parent 0978636 commit 42da3a5
Showing 1 changed file with 35 additions and 31 deletions.
Expand Up @@ -703,41 +703,45 @@ public FutureOutgoingBatch call() throws Exception {
}
}
} catch (RuntimeException e) {
SQLException se = unwrapSqlException(e);
SQLException se = unwrapSqlException(e);
if (currentBatch != null) {
/* Reread batch in case the ignore flag has been set */
currentBatch = outgoingBatchService.
findOutgoingBatch(currentBatch.getBatchId(), currentBatch.getNodeId());
statisticManager.incrementDataExtractedErrors(currentBatch.getChannelId(), 1);
if (se != null) {
currentBatch.setSqlState(se.getSQLState());
currentBatch.setSqlCode(se.getErrorCode());
currentBatch.setSqlMessage(se.getMessage());
} else {
currentBatch.setSqlMessage(getRootMessage(e));
}
currentBatch.revertStatsOnError();
if (currentBatch.getStatus() != Status.IG &&
currentBatch.getStatus() != Status.OK) {
currentBatch.setStatus(Status.ER);
currentBatch.setErrorFlag(true);
}
outgoingBatchService.updateOutgoingBatch(currentBatch);

if (!isStreamClosedByClient(e)) {
if (e instanceof ProtocolException) {
IStagedResource resource = getStagedResource(currentBatch);
if (resource != null) {
resource.delete();
}
}
if (e.getCause() instanceof InterruptedException) {
log.info("Extract of batch {} was interrupted", currentBatch);
try {
/* Reread batch in case the ignore flag has been set */
currentBatch = outgoingBatchService.
findOutgoingBatch(currentBatch.getBatchId(), currentBatch.getNodeId());
statisticManager.incrementDataExtractedErrors(currentBatch.getChannelId(), 1);
if (se != null) {
currentBatch.setSqlState(se.getSQLState());
currentBatch.setSqlCode(se.getErrorCode());
currentBatch.setSqlMessage(se.getMessage());
} else {
log.error("Failed to extract batch {}", currentBatch, e);
currentBatch.setSqlMessage(getRootMessage(e));
}
currentBatch.revertStatsOnError();
if (currentBatch.getStatus() != Status.IG &&
currentBatch.getStatus() != Status.OK) {
currentBatch.setStatus(Status.ER);
currentBatch.setErrorFlag(true);
}
outgoingBatchService.updateOutgoingBatch(currentBatch);
} catch(Exception ex) {
log.error("Failed to update the outgoing batch status for failed batch {}", currentBatch, ex);
} finally {
if (!isStreamClosedByClient(e)) {
if (e instanceof ProtocolException) {
IStagedResource resource = getStagedResource(currentBatch);
if (resource != null) {
resource.delete();
}
}
if (e.getCause() instanceof InterruptedException) {
log.info("Extract of batch {} was interrupted", currentBatch);
} else {
log.error("Failed to extract batch {}", currentBatch, e);
}
}
processInfo.setStatus(ProcessInfo.Status.ERROR);
}
processInfo.setStatus(ProcessInfo.Status.ERROR);
} else {
log.error("Could not log the outgoing batch status because the batch was null",
e);
Expand Down

0 comments on commit 42da3a5

Please sign in to comment.