diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/SimpleStagingDataWriter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/SimpleStagingDataWriter.java index f8329db713..71ad6306b9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/SimpleStagingDataWriter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/SimpleStagingDataWriter.java @@ -28,18 +28,17 @@ import org.apache.commons.lang.StringUtils; import org.jumpmind.db.util.BinaryEncoding; +import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.csv.CsvReader; import org.jumpmind.symmetric.io.data.Batch; import org.jumpmind.symmetric.io.data.Batch.BatchType; -import org.jumpmind.symmetric.io.data.writer.IProtocolDataWriterListener; import org.jumpmind.symmetric.io.data.CsvConstants; import org.jumpmind.symmetric.io.data.DataContext; -import org.jumpmind.symmetric.io.stage.IStagedResource; +import org.jumpmind.symmetric.io.data.writer.IProtocolDataWriterListener; import org.jumpmind.symmetric.io.stage.IStagedResource.State; -import org.jumpmind.util.Statistics; -import org.jumpmind.symmetric.io.stage.IStagingManager; import org.jumpmind.symmetric.model.ProcessInfo; import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus; +import org.jumpmind.util.Statistics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +59,7 @@ public class SimpleStagingDataWriter { protected ProcessInfo processInfo; protected BufferedWriter writer; protected Batch batch; + protected long invalidLineCount; public SimpleStagingDataWriter(ProcessInfo processInfo, BufferedReader reader, IStagingManager stagingManager, String category, long memoryThresholdInBytes, BatchType batchType, String targetNodeId, DataContext context, IProtocolDataWriterListener... listeners) { @@ -154,13 +154,15 @@ public void process() throws IOException { } batchTableLines.clear(); - batch.setStatistics(batchStats); - batchStats = null; - if (listeners != null) { - for (IProtocolDataWriterListener listener : listeners) { - listener.end(context, batch, resource); + if (batch != null) { + batch.setStatistics(batchStats); + if (listeners != null) { + for (IProtocolDataWriterListener listener : listeners) { + listener.end(context, batch, resource); + } } } + batchStats = null; resource = null; } else if (line.startsWith(CsvConstants.RETRY)) { batch = new Batch(batchType, Long.parseLong(getArgLine(line)), getArgLine(channelLine), getBinaryEncoding(binaryLine), @@ -204,10 +206,9 @@ public void process() throws IOException { batchStatsLine = line; batchStats = new Statistics(); putStats(batchStats, batchStatsColumnsLine, batchStatsLine); + } else if (writer == null) { + invalidLineCount++; } else { - if (writer == null) { - throw new IllegalStateException("Invalid batch data was received: " + line); - } TableLine batchLine = batchTableLines.get(tableLine); if (batchLine == null || (batchLine != null && batchLine.columnsLine == null)) { TableLine syncLine = syncTableLines.get(tableLine); @@ -261,7 +262,7 @@ public void process() throws IOException { } processInfo.setStatus(ProcessStatus.OK); - } catch (IOException ex) { + } catch (Exception ex) { if (resource != null) { resource.delete(); } @@ -271,7 +272,13 @@ public void process() throws IOException { /* * Just log an error here. We want batches that come before us to continue to process and to be acknowledged */ - log.error("Failed to process batch. Context: " + context.getContext() , ex); + + log.error("Failed to write batch into staging from {}. {}: {}", context.getContext().get(Constants.DATA_CONTEXT_SOURCE_NODE).toString(), + ex.getClass().getName(), ex.getMessage()); + } finally { + if (invalidLineCount > 0) { + log.warn("Found {} invalid lines that could not be written to a batch", invalidLineCount); + } } }