diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/SimpleStagingDataWriter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/SimpleStagingDataWriter.java index 8aca49f260..b90195f020 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/SimpleStagingDataWriter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/SimpleStagingDataWriter.java @@ -147,7 +147,7 @@ public void process() throws IOException { writeLine(channelLine); writeLine(line); - if (listeners != null) { + if (listeners != null && exception == null) { for (IProtocolDataWriterListener listener : listeners) { listener.start(context, batch); } @@ -163,7 +163,7 @@ public void process() throws IOException { if (batch != null) { batch.setStatistics(batchStats); - if (listeners != null) { + if (listeners != null && exception == null) { for (IProtocolDataWriterListener listener : listeners) { listener.end(context, batch, resource); } @@ -200,7 +200,7 @@ public void process() throws IOException { debugLine(line); } - if (listeners != null) { + if (listeners != null && exception == null) { for (IProtocolDataWriterListener listener : listeners) { listener.start(context, batch); } @@ -275,7 +275,9 @@ public void process() throws IOException { processInfo.setStatus(ProcessStatus.OK); } catch (Exception ex) { - exception = ex; + if (exception == null) { + exception = ex; + } if (resource != null) { resource.delete(); } @@ -327,7 +329,8 @@ protected void writeLine(String line) throws IOException { writer.write(line); writer.write("\n"); } else { - throw new ProtocolException("Batch data is corrupt because no batch ID is present"); + exception = new ProtocolException("Batch data is corrupt because no batch ID was present for DML lines"); + processInfo.setStatus(ProcessStatus.ERROR); } } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java index 1d29abaf9b..70d0394168 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java @@ -22,9 +22,8 @@ import java.util.ArrayList; import java.util.Date; -import java.util.HashMap; +import java.util.Iterator; import java.util.List; -import java.util.Map; import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.db.sql.mapper.NumberMapper; @@ -273,38 +272,32 @@ public void checkMissingAck(List acks, String queue) { } if (hasCorruptBatch && parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) { OutgoingBatches batches = engine.getOutgoingBatchService().getOutgoingBatches(nodeId, queue, false); - if (batches.containsBatches()) { - Map batchMap = new HashMap(); - for (OutgoingBatch batch : batches.getBatches()) { - if (batch.getStatus() == Status.LD) { - batchMap.put(batch.getBatchId(), batch); - } + Iterator iter = batches.getBatches().iterator(); + while (iter.hasNext()) { + OutgoingBatch batch = iter.next(); + if (batch.getStatus() == Status.RQ || batch.getStatus() == Status.NE) { + iter.remove(); } - for (BatchAck ack : acks) { - batchMap.remove(ack.getBatchId()); - } - if (batchMap.size() > 0) { - for (OutgoingBatch batch : batchMap.values()) { - StringBuilder message = new StringBuilder(128); - message.append("Expected but did not receive an ack for batch "); - message.append(batch.getNodeBatchId()).append(". "); - - if (!batch.isLoadFlag()) { - message.append("This could be because the batch is corrupt - removing the batch from staging."); - log.warn(message.toString()); - IStagedResource resource = engine.getStagingManager().find(Constants.STAGING_CATEGORY_OUTGOING, - batch.getStagedLocation(), batch.getBatchId()); - if (resource != null) { - resource.delete(); - } else { - log.warn("Unable to find outgoing staging file for node {} that has corrupt batch data", nodeId); - } - } else { - message.append( - "This could be because the batch is corrupt. Not removing the batch because it was a load batch, but you may need to clear the batch from staging manually."); - log.warn(message.toString()); - } + } + if (batches.containsBatches()) { + OutgoingBatch batch = batches.getBatches().get(0); + StringBuilder message = new StringBuilder(128); + message.append("Expected but did not receive an ack for batch "); + message.append(batch.getNodeBatchId()).append(". "); + if (!batch.isLoadFlag()) { + message.append("This could be because the batch is corrupt - removing the batch from staging."); + log.warn(message.toString()); + IStagedResource resource = engine.getStagingManager().find(Constants.STAGING_CATEGORY_OUTGOING, + batch.getStagedLocation(), batch.getBatchId()); + if (resource != null) { + resource.delete(); + } else { + log.warn("Unable to find outgoing staging file for {} that has corrupt batch data", batch.getNodeBatchId()); } + } else { + message.append( + "This could be because the batch is corrupt. Not removing the batch because it was a load batch, but you may need to clear the batch from staging manually."); + log.warn(message.toString()); } } else { log.warn("Unable to find outgoing batch for node {} that has corrupt batch data", nodeId); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index fa2ef348c8..5a4e6c5a3d 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -676,7 +676,7 @@ protected IDataWriter chooseDataWriter(Batch batch) { } } List batchesProcessed = listener.getBatchesProcessed(); - if (error != null) { + if (error != null && error instanceof ProtocolException) { batchesProcessed.add(new IncomingBatch()); } return batchesProcessed;