Skip to content

Commit

Permalink
0004538: Suppress first deadlock, FK, or protocol violation error to
Browse files Browse the repository at this point in the history
allow retry
  • Loading branch information
erilong committed Sep 14, 2020
1 parent d06144c commit 1e3d8dc
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 59 deletions.
Expand Up @@ -51,8 +51,7 @@ public class AcknowledgeService extends AbstractService implements IAcknowledgeS
public AcknowledgeService(ISymmetricEngine engine) {
super(engine.getParameterService(), engine.getSymmetricDialect());
this.engine = engine;
setSqlMap(new AcknowledgeServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens()));
setSqlMap(new AcknowledgeServiceSqlMap(symmetricDialect.getPlatform(), createSqlReplacementTokens()));
}

public BatchAckResult ack(final BatchAck batch) {
Expand All @@ -70,30 +69,22 @@ public BatchAckResult ack(final BatchAck batch) {
registrationService.markNodeAsRegistered(batch.getNodeId());
}
} else {
OutgoingBatch outgoingBatch = outgoingBatchService
.findOutgoingBatch(batch.getBatchId(), batch.getNodeId());
OutgoingBatch outgoingBatch = outgoingBatchService.findOutgoingBatch(batch.getBatchId(), batch.getNodeId());
Status status = batch.isResend() ? Status.RS : batch.isOk() ? Status.OK : Status.ER;
Status oldStatus = null;

if (outgoingBatch != null && outgoingBatch.getStatus() != Status.RQ) {
// Allow an outside system/user to indicate that a batch
// is OK.
if (outgoingBatch.getStatus() != Status.OK &&
outgoingBatch.getStatus() != Status.IG) {
outgoingBatch.setStatus(status);
outgoingBatch.setErrorFlag(!batch.isOk());
} else if (outgoingBatch.getStatus() != Status.OK) {
// clearing the error flag in case the user set the batch
// status to OK
oldStatus = outgoingBatch.getStatus();
outgoingBatch.setStatus(Status.OK);
outgoingBatch.setErrorFlag(false);
status = Status.OK;
log.info("Batch {} for node {} was set to {}. Updating the status to OK.",
new Object[] { batch.getBatchId(), batch.getNodeId(), oldStatus.name() });
}
if (batch.isIgnored()) {
outgoingBatch.incrementIgnoreCount();

// Allow an outside system/user to indicate that a batch is OK
if (outgoingBatch.getStatus() == Status.IG && status == Status.OK) {
log.info("Ignoring batch {}", outgoingBatch.getNodeBatchId());
} else if (outgoingBatch.getStatus() == Status.OK && status != Status.OK) {
log.info("Setting status to ignore for batch {} because status was set to OK by user", outgoingBatch.getNodeBatchId());
status = Status.IG;
}

boolean isFirstTimeAsOkStatus = outgoingBatch.getStatus() != Status.OK && status == Status.OK;
outgoingBatch.setStatus(status);
outgoingBatch.setErrorFlag(status == Status.ER);
outgoingBatch.setNetworkMillis(batch.getNetworkMillis());
outgoingBatch.setFilterMillis(batch.getFilterMillis());
outgoingBatch.setLoadMillis(batch.getLoadMillis());
Expand All @@ -111,9 +102,16 @@ public BatchAckResult ack(final BatchAck batch) {
outgoingBatch.setIgnoreRowCount(batch.getIgnoreRowCount());
outgoingBatch.setMissingDeleteCount(batch.getMissingDeleteCount());
outgoingBatch.setSkipCount(batch.getSkipCount());
if (batch.isIgnored()) {
outgoingBatch.incrementIgnoreCount();
}
if (status == Status.OK) {
outgoingBatch.setFailedDataId(0);
outgoingBatch.setFailedLineNumber(0);
}

boolean isNewError = false;
if (!batch.isOk() && batch.getErrorLine() != 0) {
if (status == Status.ER && batch.getErrorLine() != 0) {
if (outgoingBatch.isLoadFlag()) {
isNewError = outgoingBatch.getSentCount() == 1;
} else if (batch.getErrorLine() != outgoingBatch.getFailedLineNumber()){
Expand All @@ -127,27 +125,25 @@ public BatchAckResult ack(final BatchAck batch) {
List<Number> ids = sqlTemplateDirty.query(sql, new NumberMapper(), outgoingBatch.getBatchId());
if (ids.size() >= batch.getErrorLine()) {
long failedDataId = ids.get((int) batch.getErrorLine() - 1).longValue();
if (outgoingBatch.getFailedDataId() == 0 || outgoingBatch.getFailedDataId() != failedDataId) {
isNewError = true;
}
isNewError = outgoingBatch.getFailedDataId() == 0 || outgoingBatch.getFailedDataId() != failedDataId;
outgoingBatch.setFailedDataId(failedDataId);
}
outgoingBatch.setFailedLineNumber(batch.getErrorLine());
}
}

if (status == Status.ER) {
boolean suppressLogError = false;
boolean suppressError = false;
if (isNewError) {
engine.getStatisticManager().incrementDataLoadedOutgoingErrors(outgoingBatch.getChannelId(), 1);
}
if (isNewError && outgoingBatch.getSqlCode() == ErrorConstants.FK_VIOLATION_CODE) {
if (!outgoingBatch.isLoadFlag() && parameterService.is(ParameterConstants.AUTO_RESOLVE_FOREIGN_KEY_VIOLATION)) {
engine.getDataService().reloadMissingForeignKeyRows(outgoingBatch.getNodeId(), outgoingBatch.getFailedDataId());
suppressLogError = true;
suppressError = true;
}
if (outgoingBatch.isLoadFlag() && parameterService.is(ParameterConstants.AUTO_RESOLVE_FOREIGN_KEY_VIOLATION_REVERSE_RELOAD)) {
suppressLogError = true;
suppressError = true;
}
}
if (outgoingBatch.getSqlCode() == ErrorConstants.PROTOCOL_VIOLATION_CODE
Expand All @@ -161,14 +157,17 @@ public BatchAckResult ack(final BatchAck batch) {
if (resource != null) {
log.info("The batch {} may be corrupt in staging, so removing it.", outgoingBatch.getNodeBatchId());
resource.delete();
suppressLogError = isNewError;
suppressError = isNewError;
}
}
}
if (isNewError && outgoingBatch.getSqlCode() == ErrorConstants.DEADLOCK_CODE) {
suppressLogError = true;
suppressError = true;
}
if (!suppressLogError) {

if (suppressError) {
outgoingBatch.setErrorFlag(false);
} else {
log.error("The outgoing batch {} failed: {}{}", outgoingBatch.getNodeBatchId(),
(batch.getSqlCode() != 0 ? "[" + batch.getSqlState() + "," + batch.getSqlCode() + "] " : ""), batch.getSqlMessage());
RouterStats routerStats = engine.getStatisticManager().getRouterStatsByBatch(batch.getBatchId());
Expand All @@ -181,8 +180,9 @@ public BatchAckResult ack(final BatchAck batch) {
}

outgoingBatchService.updateOutgoingBatch(outgoingBatch);

if (status == Status.OK) {
if (!Status.OK.equals(oldStatus)) {
if (isFirstTimeAsOkStatus) {
if (outgoingBatch.getLoadId() > 0) {
engine.getDataExtractorService().updateExtractRequestLoadTime(new Date(), outgoingBatch);
}
Expand Down
Expand Up @@ -50,7 +50,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipException;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Table;
Expand All @@ -62,7 +61,6 @@
import org.jumpmind.exception.HttpException;
import org.jumpmind.exception.InvalidRetryException;
import org.jumpmind.exception.IoException;
import org.jumpmind.exception.ParseException;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.Version;
Expand All @@ -78,7 +76,6 @@
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.DataProcessor;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.ProtocolException;
import org.jumpmind.symmetric.io.data.reader.DataReaderStatistics;
import org.jumpmind.symmetric.io.data.reader.ProtocolDataReader;
import org.jumpmind.symmetric.io.data.transform.TransformPoint;
Expand Down Expand Up @@ -1105,21 +1102,16 @@ protected IDataWriter chooseDataWriter(Batch batch) {
isError = true;
incomingBatch = listener.currentBatch;
incomingBatch.setStatus(Status.ER);
incomingBatch.setErrorFlag(true);
incomingBatchService.updateIncomingBatch(incomingBatch);
throw e;
}
} else {
isError = true;
if (e instanceof ParseException || e instanceof ProtocolException || e.getCause() instanceof ZipException) {
log.warn("The batch {} may be corrupt in staging, so removing it.", batchInStaging.getNodeBatchId());
if (listener.currentBatch.getSqlCode() == ErrorConstants.PROTOCOL_VIOLATION_CODE) {
log.info("The batch {} may be corrupt in staging, so removing it.", batchInStaging.getNodeBatchId());
resource.delete();
incomingBatch = listener.currentBatch;
if (incomingBatch != null) {
incomingBatch.setStatus(Status.ER);
incomingBatch.setSqlCode(ErrorConstants.PROTOCOL_VIOLATION_CODE);
incomingBatch.setSqlState(ErrorConstants.PROTOCOL_VIOLATION_STATE);
incomingBatchService.updateIncomingBatch(incomingBatch);
}
} else {
throw e;
}
Expand Down
Expand Up @@ -321,10 +321,9 @@ public int updateIncomingBatch(IncomingBatch batch) {
public int updateIncomingBatch(ISqlTransaction transaction, IncomingBatch batch) {
int count = 0;
if (batch.isPersistable()) {
if (batch.getStatus() == IncomingBatch.Status.ER) {
batch.setErrorFlag(true);
} else if (batch.getStatus() == IncomingBatch.Status.OK) {
if (batch.getStatus() == IncomingBatch.Status.OK) {
batch.setErrorFlag(false);
batch.setFailedDataId(0);
}
batch.setLastUpdatedHostName(clusterService.getServerId());
count = transaction.prepareAndExecute(getSql("updateIncomingBatchSql"),
Expand Down
Expand Up @@ -27,13 +27,15 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.zip.ZipException;

import org.jumpmind.db.model.Table;
import org.jumpmind.db.sql.ISqlTemplate;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.TableNotFoundException;
import org.jumpmind.db.sql.UniqueKeyException;
import org.jumpmind.exception.IoException;
import org.jumpmind.exception.ParseException;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ContextConstants;
Expand All @@ -43,6 +45,7 @@
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.IDataProcessorListener;
import org.jumpmind.symmetric.io.data.ProtocolException;
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.ConflictException;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
Expand Down Expand Up @@ -162,6 +165,9 @@ public void batchSuccessful(DataContext context) {
try {
this.currentBatch.setStatus(Status.OK);
if (incomingBatchService.isRecordOkBatchesEnabled()) {
if (this.currentBatch.getIgnoreCount() > 0) {
log.info("Ignoring batch {}", this.currentBatch.getNodeBatchId());
}
incomingBatchService.updateIncomingBatch(this.currentBatch);
} else if (this.currentBatch.isRetry()) {
incomingBatchService.deleteIncomingBatch(this.currentBatch);
Expand Down Expand Up @@ -202,7 +208,12 @@ public void batchInError(DataContext context, Throwable ex) {
* Reread batch to make sure it wasn't set to IG or OK
*/
engine.getIncomingBatchService().refreshIncomingBatch(currentBatch);


if (currentBatch.getStatus() != Status.OK && currentBatch.getStatus() != Status.IG) {
currentBatch.setStatus(IncomingBatch.Status.ER);
currentBatch.setErrorFlag(true);
}

Batch batch = context.getBatch();
isNewErrorForCurrentBatch = batch.getLineCount() != currentBatch.getFailedLineNumber();

Expand All @@ -222,13 +233,21 @@ public void batchInError(DataContext context, Throwable ex) {
}

enableSyncTriggers(context);

if (ex instanceof CancellationException) {
log.info("Cancelling batch " + this.currentBatch.getNodeBatchId());
} else if (ex instanceof IOException || ex instanceof TransportException
|| ex instanceof IoException) {
log.warn("Failed to load batch " + this.currentBatch.getNodeBatchId(), ex);
this.currentBatch.setSqlMessage(ex.getMessage());
} else if (ex instanceof ParseException || ex instanceof ProtocolException || ex.getCause() instanceof ZipException) {
this.currentBatch.setSqlCode(ErrorConstants.PROTOCOL_VIOLATION_CODE);
this.currentBatch.setSqlState(ErrorConstants.PROTOCOL_VIOLATION_STATE);
if (isNewErrorForCurrentBatch) {
this.currentBatch.setErrorFlag(false);
} else {
log.error(String.format("Failed to parse batch %s", this.currentBatch.getNodeBatchId()), ex);
}
} else {
SQLException se = ExceptionUtils.unwrapSqlException(ex);
if (ex instanceof ConflictException) {
Expand Down Expand Up @@ -263,21 +282,17 @@ public void batchInError(DataContext context, Throwable ex) {

if (ex instanceof TableNotFoundException) {
log.error(String.format("The incoming batch %s failed: %s", this.currentBatch.getNodeBatchId(), ex.getMessage()));
} else if ((this.currentBatch.getSqlCode() != ErrorConstants.FK_VIOLATION_CODE && this.currentBatch.getSqlCode() != ErrorConstants.DEADLOCK_CODE)
|| !isNewErrorForCurrentBatch) {
} else if (isNewErrorForCurrentBatch && (this.currentBatch.getSqlCode() == ErrorConstants.FK_VIOLATION_CODE
|| this.currentBatch.getSqlCode() == ErrorConstants.DEADLOCK_CODE)) {
this.currentBatch.setErrorFlag(false);
} else {
log.error(String.format("Failed to load batch %s", this.currentBatch.getNodeBatchId()), ex);
}
}

ISqlTransaction transaction = context.findSymmetricTransaction(engine.getTablePrefix());

// If we were in the process of skipping or ignoring a batch
// then its status would have been OK. We should not
// set the status to ER.
if (this.currentBatch.getStatus() != Status.OK &&
this.currentBatch.getStatus() != Status.IG) {

this.currentBatch.setStatus(IncomingBatch.Status.ER);
if (currentBatch.getStatus() == Status.ER) {
if (context.getTable() != null && context.getData() != null) {
try {
IncomingError error = new IncomingError();
Expand Down

0 comments on commit 1e3d8dc

Please sign in to comment.