Skip to content

Commit

Permalink
0002603: Retry batches from staging instead of sending them again
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed May 19, 2016
1 parent 268ccff commit a95793a
Show file tree
Hide file tree
Showing 15 changed files with 150 additions and 53 deletions.
Expand Up @@ -38,6 +38,8 @@ public class BatchAck implements Serializable {

private boolean isOk;

private boolean isResend;

private long errorLine;

private long networkMillis;
Expand Down Expand Up @@ -79,6 +81,10 @@ public boolean isOk() {
return isOk;
}

public boolean isResend() {
return isResend;
}

public void setBatchId(long batchId) {
this.batchId = batchId;
}
Expand All @@ -91,6 +97,10 @@ public void setOk(boolean isOk) {
this.isOk = isOk;
}

public void setResend(boolean isResend) {
this.isResend = isResend;
}

public long getByteCount() {
return byteCount;
}
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.io.Serializable;
import java.util.Date;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.reader.DataReaderStatistics;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
Expand All @@ -33,7 +34,7 @@ public class IncomingBatch implements Serializable {
private static final long serialVersionUID = 1L;

public enum Status {
OK("Ok"), ER("Error"), LD("Loading"), IG("Ignored"), XX("Unknown");
OK("Ok"), ER("Error"), LD("Loading"), RS("Resend"), IG("Ignored"), XX("Unknown");

private String description;

Expand Down Expand Up @@ -362,4 +363,17 @@ public String toString() {
return Long.toString(batchId);
}

@Override
public boolean equals(Object o) {
if (o == null || !(o instanceof IncomingBatch)) {
return false;
}
IncomingBatch b = (IncomingBatch) o;
return batchId == b.batchId && StringUtils.equals(nodeId, b.nodeId);
}

@Override
public int hashCode() {
return (String.valueOf(batchId) + "-" + nodeId).hashCode();
}
}
Expand Up @@ -39,7 +39,8 @@ public class OutgoingBatch implements Serializable {
private static final long serialVersionUID = 1L;

public enum Status {
OK("Ok"), ER("Error"), RQ("Request"), NE("New"), QY("Querying"), SE("Sending"), LD("Loading"), RT("Routing"), IG("Ignored"), XX("Unknown");
OK("Ok"), ER("Error"), RQ("Request"), NE("New"), QY("Querying"), SE("Sending"), LD("Loading"), RT("Routing"), IG("Ignored"),
RS("Resend"), XX("Unknown");

private String description;

Expand Down Expand Up @@ -118,7 +119,7 @@ public String toString() {
private Date createTime;

private String createBy;

private long oldDataEventCount = 0;
private long oldByteCount = 0;
private long oldFilterMillis = 0;
Expand Down Expand Up @@ -494,5 +495,4 @@ public void setExtractJobFlag(boolean extractJobFlag) {
public boolean isExtractJobFlag() {
return extractJobFlag;
}

}
Expand Up @@ -73,7 +73,7 @@ public BatchAckResult ack(final BatchAck batch) {
} else {
OutgoingBatch outgoingBatch = outgoingBatchService
.findOutgoingBatch(batch.getBatchId(), batch.getNodeId());
Status status = batch.isOk() ? Status.OK : Status.ER;
Status status = batch.isOk() ? Status.OK : batch.isResend() ? Status.RS : Status.ER;
if (outgoingBatch != null) {
// Allow an outside system/user to indicate that a batch
// is OK.
Expand Down Expand Up @@ -111,12 +111,14 @@ public BatchAckResult ack(final BatchAck batch) {

if (status == Status.ER) {
log.error(
"The outgoing batch {} failed{}",
"The outgoing batch {} failed: {}",
outgoingBatch.getNodeBatchId(), batch.getSqlMessage() != null ? ". " + batch.getSqlMessage() : "");
RouterStats routerStats = engine.getStatisticManager().getRouterStatsByBatch(batch.getBatchId());
if (routerStats != null) {
log.info("Router stats for batch " + outgoingBatch.getBatchId() + ": " + routerStats.toString());
}
} else if (status == Status.RS) {
log.info("The outgoing batch {} received resend request", outgoingBatch.getNodeBatchId());
} else if (!outgoingBatch.isCommonFlag()) {
IStagedResource stagingResource = stagingManager.find(
Constants.STAGING_CATEGORY_OUTGOING, outgoingBatch.getNodeId(),
Expand Down
Expand Up @@ -619,7 +619,7 @@ public OutgoingBatch call() throws Exception {
changeBatchStatus(Status.LD, currentBatch, mode);
}
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
throw new RuntimeException(e.getCause() != null ? e.getCause() : e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
Expand Down Expand Up @@ -887,20 +887,28 @@ protected boolean isPreviouslyExtracted(OutgoingBatch currentBatch) {
}
}

protected boolean isRetry(OutgoingBatch currentBatch, Node remoteNode) {
IStagedResource previouslyExtracted = getStagedResource(currentBatch);
return previouslyExtracted != null && previouslyExtracted.exists() && previouslyExtracted.getState() != State.CREATE
&& currentBatch.getStatus() != OutgoingBatch.Status.RS && remoteNode.isVersionGreaterThanOrEqualTo(3, 8, 0);
}

protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNode,
OutgoingBatch currentBatch, IDataWriter dataWriter, BufferedWriter writer, ExtractMode mode) {
if (currentBatch.getStatus() != Status.OK || ExtractMode.EXTRACT_ONLY == mode) {
currentBatch.setSentCount(currentBatch.getSentCount() + 1);
changeBatchStatus(Status.SE, currentBatch, mode);
if (currentBatch.getStatus() != Status.RS) {
changeBatchStatus(Status.SE, currentBatch, mode);
}

long ts = System.currentTimeMillis();

IStagedResource extractedBatch = getStagedResource(currentBatch);
if (extractedBatch != null) {
if (mode == ExtractMode.FOR_SYM_CLIENT && writer != null) {
DataContext ctx = new DataContext();
SimpleStagingDataReader dataReader = new SimpleStagingDataReader(BatchType.EXTRACT,
currentBatch.getBatchId(), currentBatch.getNodeId(), extractedBatch, writer, ctx);
SimpleStagingDataReader dataReader = new SimpleStagingDataReader(BatchType.EXTRACT, currentBatch.getBatchId(),
currentBatch.getNodeId(), isRetry(currentBatch, targetNode), extractedBatch, writer, ctx);
dataReader.process();
} else {
IDataReader dataReader = new ProtocolDataReader(BatchType.EXTRACT,
Expand Down
Expand Up @@ -39,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -400,10 +401,13 @@ private void logDataReceivedFromPush(Node sourceNode, List<IncomingBatch> batchL
}

if (okBatchesCount > 0) {
log.info(
"{} data and {} batches loaded during push request from {}. There were {} batches in error",
new Object[] { okDataCount, okBatchesCount, sourceNode.toString(),
errorBatchesCount });
if (errorBatchesCount > 0) {
log.info("{} data and {} batches loaded during push request from {}. There were {} batches in error",
new Object[] { okDataCount, okBatchesCount, sourceNode.toString(), errorBatchesCount });
} else {
log.info("{} data and {} batches loaded during push request from {}.",
new Object[] { okDataCount, okBatchesCount, sourceNode.toString() });
}
}
}

Expand Down Expand Up @@ -922,14 +926,29 @@ protected IDataWriter chooseDataWriter(Batch batch) {
return incomingBatch;
}
};
futures.add(executor.submit(loadBatchFromStage));

if (resource == null) {
IncomingBatch incomingBatch = new IncomingBatch(batch);
listener.getBatchesProcessed().add(incomingBatch);
if (incomingBatchService.acquireIncomingBatch(incomingBatch)) {
log.warn("Unable to retry batch {} because it's not in staging. Setting status to resend.", batch.getNodeBatchId());
incomingBatch.setStatus(Status.RS);
incomingBatchService.updateIncomingBatch(incomingBatch);
}
} else {
futures.add(executor.submit(loadBatchFromStage));
}
}

public boolean isDone() throws Exception {
public boolean isDone() throws Throwable {
boolean isDone = true;
for (Future<IncomingBatch> future : futures) {
if (future.isDone()) {
future.get();
try {
future.get();
} catch (ExecutionException e) {
throw e.getCause() != null ? e.getCause() : e;
}
} else {
isDone = false;
}
Expand Down
Expand Up @@ -207,8 +207,8 @@ public boolean acquireIncomingBatch(IncomingBatch batch) {
}

if (batch.isRetry()) {
if (existingBatch.getStatus() == Status.ER
|| existingBatch.getStatus() == Status.LD
if (existingBatch.getStatus() == Status.ER || existingBatch.getStatus() == Status.LD
|| existingBatch.getStatus() == Status.RS
|| !parameterService
.is(ParameterConstants.INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED)) {
okayToProcess = true;
Expand Down
Expand Up @@ -367,14 +367,14 @@ public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, b
params = new Object[] { nodeId, channelThread, OutgoingBatch.Status.RQ.name(), OutgoingBatch.Status.NE.name(),
OutgoingBatch.Status.QY.name(), OutgoingBatch.Status.SE.name(),
OutgoingBatch.Status.LD.name(), OutgoingBatch.Status.ER.name(),
OutgoingBatch.Status.IG.name() };
OutgoingBatch.Status.IG.name(), OutgoingBatch.Status.RS.name()};
}
else {
sql = getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchSql");
params = new Object[] { nodeId, OutgoingBatch.Status.RQ.name(), OutgoingBatch.Status.NE.name(),
OutgoingBatch.Status.QY.name(), OutgoingBatch.Status.SE.name(),
OutgoingBatch.Status.LD.name(), OutgoingBatch.Status.ER.name(),
OutgoingBatch.Status.IG.name() };
OutgoingBatch.Status.IG.name(), OutgoingBatch.Status.RS.name()};
}

List<OutgoingBatch> list = (List<OutgoingBatch>) sqlTemplate.query(
Expand Down
Expand Up @@ -64,10 +64,10 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform,
putSql("findOutgoingBatchByIdOnlySql", "where batch_id=? ");

putSql("selectOutgoingBatchSql", ""
+ "where node_id = ? and status in (?, ?, ?, ?, ?, ?, ?) order by batch_id asc ");
+ "where node_id = ? and status in (?, ?, ?, ?, ?, ?, ?, ?) order by batch_id asc ");

putSql("selectOutgoingBatchChannelSql", ""
+ " join $(channel) c on c.channel_id = b.channel_id where node_id = ? and c.queue = ? and status in (?, ?, ?, ?, ?, ?, ?) order by batch_id asc ");
+ " join $(channel) c on c.channel_id = b.channel_id where node_id = ? and c.queue = ? and status in (?, ?, ?, ?, ?, ?, ?, ?) order by batch_id asc ");

putSql("selectOutgoingBatchRangeSql", ""
+ "where batch_id between ? and ? order by batch_id ");
Expand Down
Expand Up @@ -88,7 +88,9 @@ protected String getAcknowledgementData(boolean requires13Format, String nodeId,
long batchId = batch.getBatchId();
Object value = null;
if (batch.getStatus() == Status.OK) {
value = WebConstants.ACK_BATCH_OK;
value = WebConstants.ACK_BATCH_OK;
} else if (batch.getStatus() == Status.RS) {
value = WebConstants.ACK_BATCH_RESEND;
} else {
value = batch.getFailedRowNumber();
}
Expand Down Expand Up @@ -175,6 +177,7 @@ private static BatchAck getBatchInfo(Map<String, ? extends Object> parameters, l
batchInfo.setIgnored(getParamAsBoolean(parameters, WebConstants.ACK_IGNORE_COUNT + batchId));
String status = getParam(parameters, WebConstants.ACK_BATCH_NAME + batchId, "").trim();
batchInfo.setOk(status.equalsIgnoreCase(WebConstants.ACK_BATCH_OK));
batchInfo.setResend(status.equalsIgnoreCase(WebConstants.ACK_BATCH_RESEND));

if (!batchInfo.isOk()) {
batchInfo.setErrorLine(NumberUtils.toLong(status));
Expand Down
Expand Up @@ -63,6 +63,8 @@ public class WebConstants {

public static final String ACK_BATCH_OK = "ok";

public static final String ACK_BATCH_RESEND = "resend";

public static final String ACK_NODE_ID = "nodeId-";

public static final String ACK_NETWORK_MILLIS = "network-";
Expand Down
Expand Up @@ -62,4 +62,6 @@ private CsvConstants() {
public static final String CHANNEL = "channel";

public static final String IGNORE = "ignore";

public static final String RETRY = "retry";
}
Expand Up @@ -193,7 +193,7 @@ public Object readNext() {
tokens = null;
return data;

} else if (tokens[0].equals(CsvConstants.BATCH)) {
} else if (tokens[0].equals(CsvConstants.BATCH) || tokens[0].equals(CsvConstants.RETRY)) {
Batch batch = new Batch(batchType, Long.parseLong(tokens[1]), channelId,
binaryEncoding, sourceNodeId, targetNodeId, false);
statistics.put(batch, new DataReaderStatistics());
Expand Down

0 comments on commit a95793a

Please sign in to comment.