From a95793ab265aded5959b277db9d1cb8b6bbb63f5 Mon Sep 17 00:00:00 2001 From: elong Date: Thu, 19 May 2016 18:40:04 -0400 Subject: [PATCH] 0002603: Retry batches from staging instead of sending them again --- .../jumpmind/symmetric/model/BatchAck.java | 10 +++ .../symmetric/model/IncomingBatch.java | 16 ++++- .../symmetric/model/OutgoingBatch.java | 6 +- .../service/impl/AcknowledgeService.java | 6 +- .../service/impl/DataExtractorService.java | 16 +++-- .../service/impl/DataLoaderService.java | 33 +++++++-- .../service/impl/IncomingBatchService.java | 4 +- .../service/impl/OutgoingBatchService.java | 4 +- .../impl/OutgoingBatchServiceSqlMap.java | 4 +- .../transport/AbstractTransportManager.java | 5 +- .../jumpmind/symmetric/web/WebConstants.java | 2 + .../symmetric/io/data/CsvConstants.java | 2 + .../io/data/reader/ProtocolDataReader.java | 2 +- .../data/reader/SimpleStagingDataReader.java | 68 ++++++++++++------- .../data/writer/SimpleStagingDataWriter.java | 25 +++++-- 15 files changed, 150 insertions(+), 53 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/BatchAck.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/BatchAck.java index 442db2c247..6880b78676 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/BatchAck.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/BatchAck.java @@ -38,6 +38,8 @@ public class BatchAck implements Serializable { private boolean isOk; + private boolean isResend; + private long errorLine; private long networkMillis; @@ -79,6 +81,10 @@ public boolean isOk() { return isOk; } + public boolean isResend() { + return isResend; + } + public void setBatchId(long batchId) { this.batchId = batchId; } @@ -91,6 +97,10 @@ public void setOk(boolean isOk) { this.isOk = isOk; } + public void setResend(boolean isResend) { + this.isResend = isResend; + } + public long getByteCount() { return byteCount; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/IncomingBatch.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/IncomingBatch.java index ab4093f14e..8d6ebf505e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/IncomingBatch.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/IncomingBatch.java @@ -23,6 +23,7 @@ import java.io.Serializable; import java.util.Date; +import org.apache.commons.lang.StringUtils; import org.jumpmind.symmetric.io.data.Batch; import org.jumpmind.symmetric.io.data.reader.DataReaderStatistics; import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants; @@ -33,7 +34,7 @@ public class IncomingBatch implements Serializable { private static final long serialVersionUID = 1L; public enum Status { - OK("Ok"), ER("Error"), LD("Loading"), IG("Ignored"), XX("Unknown"); + OK("Ok"), ER("Error"), LD("Loading"), RS("Resend"), IG("Ignored"), XX("Unknown"); private String description; @@ -362,4 +363,17 @@ public String toString() { return Long.toString(batchId); } + @Override + public boolean equals(Object o) { + if (o == null || !(o instanceof IncomingBatch)) { + return false; + } + IncomingBatch b = (IncomingBatch) o; + return batchId == b.batchId && StringUtils.equals(nodeId, b.nodeId); + } + + @Override + public int hashCode() { + return (String.valueOf(batchId) + "-" + nodeId).hashCode(); + } } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java index 7db2d24807..ed791b6861 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java @@ -39,7 +39,8 @@ public class OutgoingBatch implements Serializable { private static final long serialVersionUID = 1L; public enum Status { - OK("Ok"), ER("Error"), RQ("Request"), NE("New"), QY("Querying"), SE("Sending"), LD("Loading"), RT("Routing"), IG("Ignored"), XX("Unknown"); + OK("Ok"), ER("Error"), RQ("Request"), NE("New"), QY("Querying"), SE("Sending"), LD("Loading"), RT("Routing"), IG("Ignored"), + RS("Resend"), XX("Unknown"); private String description; @@ -118,7 +119,7 @@ public String toString() { private Date createTime; private String createBy; - + private long oldDataEventCount = 0; private long oldByteCount = 0; private long oldFilterMillis = 0; @@ -494,5 +495,4 @@ public void setExtractJobFlag(boolean extractJobFlag) { public boolean isExtractJobFlag() { return extractJobFlag; } - } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java index 51fe0103ef..3f60465ada 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java @@ -73,7 +73,7 @@ public BatchAckResult ack(final BatchAck batch) { } else { OutgoingBatch outgoingBatch = outgoingBatchService .findOutgoingBatch(batch.getBatchId(), batch.getNodeId()); - Status status = batch.isOk() ? Status.OK : Status.ER; + Status status = batch.isOk() ? Status.OK : batch.isResend() ? Status.RS : Status.ER; if (outgoingBatch != null) { // Allow an outside system/user to indicate that a batch // is OK. @@ -111,12 +111,14 @@ public BatchAckResult ack(final BatchAck batch) { if (status == Status.ER) { log.error( - "The outgoing batch {} failed{}", + "The outgoing batch {} failed: {}", outgoingBatch.getNodeBatchId(), batch.getSqlMessage() != null ? ". " + batch.getSqlMessage() : ""); RouterStats routerStats = engine.getStatisticManager().getRouterStatsByBatch(batch.getBatchId()); if (routerStats != null) { log.info("Router stats for batch " + outgoingBatch.getBatchId() + ": " + routerStats.toString()); } + } else if (status == Status.RS) { + log.info("The outgoing batch {} received resend request", outgoingBatch.getNodeBatchId()); } else if (!outgoingBatch.isCommonFlag()) { IStagedResource stagingResource = stagingManager.find( Constants.STAGING_CATEGORY_OUTGOING, outgoingBatch.getNodeId(), diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 5a03092478..8bc43e375b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -619,7 +619,7 @@ public OutgoingBatch call() throws Exception { changeBatchStatus(Status.LD, currentBatch, mode); } } catch (ExecutionException e) { - throw new RuntimeException(e.getCause()); + throw new RuntimeException(e.getCause() != null ? e.getCause() : e); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (TimeoutException e) { @@ -887,11 +887,19 @@ protected boolean isPreviouslyExtracted(OutgoingBatch currentBatch) { } } + protected boolean isRetry(OutgoingBatch currentBatch, Node remoteNode) { + IStagedResource previouslyExtracted = getStagedResource(currentBatch); + return previouslyExtracted != null && previouslyExtracted.exists() && previouslyExtracted.getState() != State.CREATE + && currentBatch.getStatus() != OutgoingBatch.Status.RS && remoteNode.isVersionGreaterThanOrEqualTo(3, 8, 0); + } + protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNode, OutgoingBatch currentBatch, IDataWriter dataWriter, BufferedWriter writer, ExtractMode mode) { if (currentBatch.getStatus() != Status.OK || ExtractMode.EXTRACT_ONLY == mode) { currentBatch.setSentCount(currentBatch.getSentCount() + 1); - changeBatchStatus(Status.SE, currentBatch, mode); + if (currentBatch.getStatus() != Status.RS) { + changeBatchStatus(Status.SE, currentBatch, mode); + } long ts = System.currentTimeMillis(); @@ -899,8 +907,8 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo if (extractedBatch != null) { if (mode == ExtractMode.FOR_SYM_CLIENT && writer != null) { DataContext ctx = new DataContext(); - SimpleStagingDataReader dataReader = new SimpleStagingDataReader(BatchType.EXTRACT, - currentBatch.getBatchId(), currentBatch.getNodeId(), extractedBatch, writer, ctx); + SimpleStagingDataReader dataReader = new SimpleStagingDataReader(BatchType.EXTRACT, currentBatch.getBatchId(), + currentBatch.getNodeId(), isRetry(currentBatch, targetNode), extractedBatch, writer, ctx); dataReader.process(); } else { IDataReader dataReader = new ProtocolDataReader(BatchType.EXTRACT, diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index d31715f689..0cdbb1df73 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -400,10 +401,13 @@ private void logDataReceivedFromPush(Node sourceNode, List batchL } if (okBatchesCount > 0) { - log.info( - "{} data and {} batches loaded during push request from {}. There were {} batches in error", - new Object[] { okDataCount, okBatchesCount, sourceNode.toString(), - errorBatchesCount }); + if (errorBatchesCount > 0) { + log.info("{} data and {} batches loaded during push request from {}. There were {} batches in error", + new Object[] { okDataCount, okBatchesCount, sourceNode.toString(), errorBatchesCount }); + } else { + log.info("{} data and {} batches loaded during push request from {}.", + new Object[] { okDataCount, okBatchesCount, sourceNode.toString() }); + } } } @@ -922,14 +926,29 @@ protected IDataWriter chooseDataWriter(Batch batch) { return incomingBatch; } }; - futures.add(executor.submit(loadBatchFromStage)); + + if (resource == null) { + IncomingBatch incomingBatch = new IncomingBatch(batch); + listener.getBatchesProcessed().add(incomingBatch); + if (incomingBatchService.acquireIncomingBatch(incomingBatch)) { + log.warn("Unable to retry batch {} because it's not in staging. Setting status to resend.", batch.getNodeBatchId()); + incomingBatch.setStatus(Status.RS); + incomingBatchService.updateIncomingBatch(incomingBatch); + } + } else { + futures.add(executor.submit(loadBatchFromStage)); + } } - public boolean isDone() throws Exception { + public boolean isDone() throws Throwable { boolean isDone = true; for (Future future : futures) { if (future.isDone()) { - future.get(); + try { + future.get(); + } catch (ExecutionException e) { + throw e.getCause() != null ? e.getCause() : e; + } } else { isDone = false; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java index 2f8f8bcfa9..28f1a2318d 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java @@ -207,8 +207,8 @@ public boolean acquireIncomingBatch(IncomingBatch batch) { } if (batch.isRetry()) { - if (existingBatch.getStatus() == Status.ER - || existingBatch.getStatus() == Status.LD + if (existingBatch.getStatus() == Status.ER || existingBatch.getStatus() == Status.LD + || existingBatch.getStatus() == Status.RS || !parameterService .is(ParameterConstants.INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED)) { okayToProcess = true; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index ad68c1f560..faeacfd4d3 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -367,14 +367,14 @@ public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, b params = new Object[] { nodeId, channelThread, OutgoingBatch.Status.RQ.name(), OutgoingBatch.Status.NE.name(), OutgoingBatch.Status.QY.name(), OutgoingBatch.Status.SE.name(), OutgoingBatch.Status.LD.name(), OutgoingBatch.Status.ER.name(), - OutgoingBatch.Status.IG.name() }; + OutgoingBatch.Status.IG.name(), OutgoingBatch.Status.RS.name()}; } else { sql = getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchSql"); params = new Object[] { nodeId, OutgoingBatch.Status.RQ.name(), OutgoingBatch.Status.NE.name(), OutgoingBatch.Status.QY.name(), OutgoingBatch.Status.SE.name(), OutgoingBatch.Status.LD.name(), OutgoingBatch.Status.ER.name(), - OutgoingBatch.Status.IG.name() }; + OutgoingBatch.Status.IG.name(), OutgoingBatch.Status.RS.name()}; } List list = (List) sqlTemplate.query( diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java index 5a4e227239..b92fd8064a 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java @@ -64,10 +64,10 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, putSql("findOutgoingBatchByIdOnlySql", "where batch_id=? "); putSql("selectOutgoingBatchSql", "" - + "where node_id = ? and status in (?, ?, ?, ?, ?, ?, ?) order by batch_id asc "); + + "where node_id = ? and status in (?, ?, ?, ?, ?, ?, ?, ?) order by batch_id asc "); putSql("selectOutgoingBatchChannelSql", "" - + " join $(channel) c on c.channel_id = b.channel_id where node_id = ? and c.queue = ? and status in (?, ?, ?, ?, ?, ?, ?) order by batch_id asc "); + + " join $(channel) c on c.channel_id = b.channel_id where node_id = ? and c.queue = ? and status in (?, ?, ?, ?, ?, ?, ?, ?) order by batch_id asc "); putSql("selectOutgoingBatchRangeSql", "" + "where batch_id between ? and ? order by batch_id "); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/AbstractTransportManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/AbstractTransportManager.java index 0befc0c58e..c3c4d2f7bd 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/AbstractTransportManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/AbstractTransportManager.java @@ -88,7 +88,9 @@ protected String getAcknowledgementData(boolean requires13Format, String nodeId, long batchId = batch.getBatchId(); Object value = null; if (batch.getStatus() == Status.OK) { - value = WebConstants.ACK_BATCH_OK; + value = WebConstants.ACK_BATCH_OK; + } else if (batch.getStatus() == Status.RS) { + value = WebConstants.ACK_BATCH_RESEND; } else { value = batch.getFailedRowNumber(); } @@ -175,6 +177,7 @@ private static BatchAck getBatchInfo(Map parameters, l batchInfo.setIgnored(getParamAsBoolean(parameters, WebConstants.ACK_IGNORE_COUNT + batchId)); String status = getParam(parameters, WebConstants.ACK_BATCH_NAME + batchId, "").trim(); batchInfo.setOk(status.equalsIgnoreCase(WebConstants.ACK_BATCH_OK)); + batchInfo.setResend(status.equalsIgnoreCase(WebConstants.ACK_BATCH_RESEND)); if (!batchInfo.isOk()) { batchInfo.setErrorLine(NumberUtils.toLong(status)); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/web/WebConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/web/WebConstants.java index 527b45147d..58d235fe06 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/web/WebConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/web/WebConstants.java @@ -63,6 +63,8 @@ public class WebConstants { public static final String ACK_BATCH_OK = "ok"; + public static final String ACK_BATCH_RESEND = "resend"; + public static final String ACK_NODE_ID = "nodeId-"; public static final String ACK_NETWORK_MILLIS = "network-"; diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/CsvConstants.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/CsvConstants.java index ca64ef64aa..10fbea6752 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/CsvConstants.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/CsvConstants.java @@ -62,4 +62,6 @@ private CsvConstants() { public static final String CHANNEL = "channel"; public static final String IGNORE = "ignore"; + + public static final String RETRY = "retry"; } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ProtocolDataReader.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ProtocolDataReader.java index 16dd19b1bd..ad43492a16 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ProtocolDataReader.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ProtocolDataReader.java @@ -193,7 +193,7 @@ public Object readNext() { tokens = null; return data; - } else if (tokens[0].equals(CsvConstants.BATCH)) { + } else if (tokens[0].equals(CsvConstants.BATCH) || tokens[0].equals(CsvConstants.RETRY)) { Batch batch = new Batch(batchType, Long.parseLong(tokens[1]), channelId, binaryEncoding, sourceNodeId, targetNodeId, false); statistics.put(batch, new DataReaderStatistics()); diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/SimpleStagingDataReader.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/SimpleStagingDataReader.java index e90f31e245..e5aa98943d 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/SimpleStagingDataReader.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/SimpleStagingDataReader.java @@ -23,9 +23,9 @@ import java.io.BufferedReader; import java.io.BufferedWriter; -import org.apache.commons.io.IOUtils; import org.jumpmind.exception.IoException; import org.jumpmind.symmetric.io.data.Batch.BatchType; +import org.jumpmind.symmetric.io.data.CsvConstants; import org.jumpmind.symmetric.io.data.DataContext; import org.jumpmind.symmetric.io.stage.IStagedResource; import org.slf4j.Logger; @@ -39,47 +39,67 @@ public class SimpleStagingDataReader { protected BatchType batchType; protected long batchId; protected String targetNodeId; + protected boolean isRetry; protected IStagedResource stagedResource; protected BufferedWriter writer; - protected BufferedReader reader; protected DataContext context; - public SimpleStagingDataReader(BatchType batchType, long batchId, String targetNodeId, IStagedResource stagedResource, - BufferedWriter writer, DataContext context) { + public SimpleStagingDataReader(BatchType batchType, long batchId, String targetNodeId, boolean isRetry, + IStagedResource stagedResource, BufferedWriter writer, DataContext context) { this.batchType = batchType; + this.batchId = batchId; this.targetNodeId = targetNodeId; + this.isRetry = isRetry; this.stagedResource = stagedResource; this.writer = writer; this.context = context; } public void process() { - reader = stagedResource.getReader(); - char[] buffer = new char[MAX_WRITE_LENGTH]; - long totalCharsRead = 0; - int numCharsRead = 0; - long startTime = System.currentTimeMillis(), ts = startTime; - + BufferedReader reader = stagedResource.getReader(); try { - while ((numCharsRead = reader.read(buffer)) != -1) { - writer.write(buffer, 0, numCharsRead); - totalCharsRead += numCharsRead; - - if (Thread.currentThread().isInterrupted()) { - throw new IoException("This thread was interrupted"); + // Retry means we've sent this batch before, so let's ask to retry the batch from the target's staging + if (isRetry) { + String line = null; + while ((line = reader.readLine()) != null) { + if (line.startsWith(CsvConstants.BATCH)) { + writer.write(CsvConstants.RETRY + "," + batchId); + writer.newLine(); + writer.write(CsvConstants.COMMIT + "," + batchId); + writer.newLine(); + break; + } else { + writer.write(line); + writer.newLine(); + } } + } else { + char[] buffer = new char[MAX_WRITE_LENGTH]; + long totalCharsRead = 0; + int numCharsRead = 0; + long startTime = System.currentTimeMillis(), ts = startTime; - if (System.currentTimeMillis() - ts > 60000) { - log.info("Batch '{}', for node '{}', for process 'send from stage' has been processing for {} seconds. The following stats have been gathered: {}", - new Object[] { batchId, targetNodeId, (System.currentTimeMillis() - startTime) / 1000, - "BYTES=" + totalCharsRead }); - ts = System.currentTimeMillis(); - } - } + while ((numCharsRead = reader.read(buffer)) != -1) { + writer.write(buffer, 0, numCharsRead); + totalCharsRead += numCharsRead; + + if (Thread.currentThread().isInterrupted()) { + throw new IoException("This thread was interrupted"); + } + + if (System.currentTimeMillis() - ts > 60000) { + log.info("Batch '{}', for node '{}', for process 'send from stage' has been processing for {} seconds. " + + "The following stats have been gathered: {}", + new Object[] { batchId, targetNodeId, (System.currentTimeMillis() - startTime) / 1000, + "BYTES=" + totalCharsRead }); + ts = System.currentTimeMillis(); + } + } + } } catch (Throwable t) { throw new RuntimeException(t); } finally { - IOUtils.closeQuietly(reader); + stagedResource.close(); } } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/SimpleStagingDataWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/SimpleStagingDataWriter.java index 33a278dd93..43736b7618 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/SimpleStagingDataWriter.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/SimpleStagingDataWriter.java @@ -133,9 +133,11 @@ public void process() throws IOException { } } } else if (line.startsWith(CsvConstants.COMMIT)) { - writeLine(line); - resource.close(); - resource.setState(State.READY); + if (writer != null) { + writeLine(line); + resource.close(); + resource.setState(State.READY); + } batchTableLines.clear(); if (listeners != null) { @@ -143,6 +145,21 @@ public void process() throws IOException { listener.end(context, batch, resource); } } + } else if (line.startsWith(CsvConstants.RETRY)) { + batch = new Batch(batchType, Long.parseLong(getArgLine(line)), getArgLine(channelLine), + getBinaryEncoding(binaryLine), getArgLine(nodeLine), targetNodeId, false); + String location = batch.getStagedLocation(); + resource = stagingManager.find(category, location, batch.getBatchId()); + if (resource == null || resource.getState() == State.CREATE) { + resource = null; + writer = null; + } + + if (listeners != null) { + for (IProtocolDataWriterListener listener : listeners) { + listener.start(context, batch); + } + } } else if (line.startsWith(CsvConstants.NODEID)) { nodeLine = line; } else if (line.startsWith(CsvConstants.BINARY)) { @@ -167,7 +184,7 @@ public void process() throws IOException { if (System.currentTimeMillis() - ts > 60000) { log.info("Batch '{}', for node '{}', for process 'transfer to stage' has been processing for {} seconds. The following stats have been gathered: {}", new Object[] { batch.getBatchId(), batch.getTargetNodeId(), (System.currentTimeMillis() - startTime) / 1000, - "LINES=" + lineCount + ", BYTES=" + resource.getSize() }); + "LINES=" + lineCount + ", BYTES=" + ((resource == null) ? 0 : resource.getSize()) }); ts = System.currentTimeMillis(); } }