diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java index e077bb46c7..669aad8903 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java @@ -134,5 +134,13 @@ protected void close(ISqlTransaction transaction) { transaction.close(); } } + + protected String getRootMessage(Exception ex) { + Throwable cause = ExceptionUtils.getRootCause(ex); + if (cause == null) { + cause = ex; + } + return cause.getMessage(); + } } \ No newline at end of file diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 1fd0a73004..c32444619e 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -20,6 +20,7 @@ */ package org.jumpmind.symmetric.service.impl; +import java.io.EOFException; import java.io.OutputStream; import java.io.Writer; import java.sql.SQLException; @@ -31,6 +32,7 @@ import java.util.concurrent.Semaphore; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.exception.ExceptionUtils; import org.jumpmind.db.model.Table; import org.jumpmind.db.sql.ISqlReadCursor; import org.jumpmind.db.sql.ISqlRowMapper; @@ -357,7 +359,7 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport, } } - } catch (Exception e) { + } catch (RuntimeException e) { SQLException se = unwrapSqlException(e); if (currentBatch != null) { statisticManager.incrementDataExtractedErrors(currentBatch.getChannelId(), 1); @@ -366,7 +368,7 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport, currentBatch.setSqlCode(se.getErrorCode()); currentBatch.setSqlMessage(se.getMessage()); } else { - currentBatch.setSqlMessage(e.getMessage()); + currentBatch.setSqlMessage(getRootMessage(e)); } currentBatch.revertStatsOnError(); if (currentBatch.getStatus() != Status.IG) { @@ -374,19 +376,27 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport, } currentBatch.setErrorFlag(true); outgoingBatchService.updateOutgoingBatch(currentBatch); + + if (isStreamClosedByClient(e)) { + log.warn("Failed to extract batch {}. The stream was closed by the client. There is a good chance that a previously sent batch errored out and the stream was closed. The error was: {}", currentBatch, getRootMessage(e)); + } else { + log.error("Failed to extract batch {}", currentBatch, e); + } } else { - log.error("Could not log the outgoing batch status because the batch was null.", e); - } - - if (e instanceof RuntimeException) { - throw (RuntimeException) e; - } else { - throw new RuntimeException(e); + log.error("Could not log the outgoing batch status because the batch was null", e); } } } + + protected boolean isStreamClosedByClient(Exception ex) { + if (ExceptionUtils.indexOfType(ex, EOFException.class) >= 0) { + return true; + } else { + return false; + } + } final protected void changeBatchStatus(Status status, OutgoingBatch currentBatch) { if (currentBatch.getStatus() != Status.IG) {