Skip to content

Commit

Permalink
0003748: Batch was not complete
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Oct 12, 2018
1 parent f6c842a commit 9ea369d
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ private ErrorConstants() {
public static final String FK_VIOLATION_STATE = "FK";
public static final int FK_VIOLATION_CODE = -900;

public static final String PROTOCOL_VIOLATION_STATE = "PROTOCOL";
public static final int PROTOCOL_VIOLATION_CODE = -888;

}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ public void process() throws IOException {
getArgLine(nodeLine), targetNodeId, false);
processInfo.incrementBatchCount();
String location = batch.getStagedLocation();
if (resource != null) {
resource.close();
resource.setState(State.DONE);
}
resource = stagingManager.create(category, location, batch.getBatchId());
writer = resource.getWriter(memoryThresholdInBytes);
writeLine(nodeLine);
Expand Down Expand Up @@ -163,6 +167,10 @@ public void process() throws IOException {
getArgLine(nodeLine), targetNodeId, false);
processInfo.incrementBatchCount();
String location = batch.getStagedLocation();
if (resource != null) {
resource.close();
resource.setState(State.DONE);
}
resource = stagingManager.find(category, location, batch.getBatchId());
if (resource == null || resource.getState() == State.CREATE) {
if (resource != null) {
Expand All @@ -172,6 +180,13 @@ public void process() throws IOException {
writer = null;
}

if (log.isDebugEnabled()) {
debugLine(nodeLine);
debugLine(binaryLine);
debugLine(channelLine);
debugLine(line);
}

if (listeners != null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.start(context, batch);
Expand Down Expand Up @@ -239,7 +254,12 @@ public void process() throws IOException {
ts = System.currentTimeMillis();
}
}


if (resource != null) {
resource.close();
resource.setState(State.DONE);
}

processInfo.setStatus(ProcessStatus.OK);
} catch (IOException ex) {
if (resource != null) {
Expand Down Expand Up @@ -299,6 +319,12 @@ protected void putStats(Statistics stats, String columnsString, String statsStri
}
}

protected void debugLine(String line) {
if (line != null) {
log.debug("Received: {}", line);
}
}

class TableLine {
String catalogLine;
String schemaLine;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ErrorConstants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.model.AbstractBatch.Status;
import org.jumpmind.symmetric.model.BatchAck;
import org.jumpmind.symmetric.model.BatchAckResult;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.AbstractBatch.Status;
import org.jumpmind.symmetric.service.IAcknowledgeService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.service.IRegistrationService;
Expand Down Expand Up @@ -143,6 +144,20 @@ public BatchAckResult ack(final BatchAck batch) {
engine.getDataService().reloadMissingForeignKeyRows(outgoingBatch.getNodeId(), outgoingBatch.getFailedDataId());
}
}
if (outgoingBatch.getSqlCode() == ErrorConstants.PROTOCOL_VIOLATION_CODE
&& ErrorConstants.PROTOCOL_VIOLATION_STATE.equals(outgoingBatch.getSqlState())) {
if (outgoingBatch.isLoadFlag()) {
log.info("The batch {} may be corrupt in staging. Not removing the batch because it was a load batch, but you may need to clear the batch from staging manually.",
outgoingBatch.getNodeBatchId());
} else {
IStagedResource resource = engine.getStagingManager().find(Constants.STAGING_CATEGORY_OUTGOING,
outgoingBatch.getStagedLocation(), outgoingBatch.getBatchId());
if (resource != null) {
log.info("The batch {} may be corrupt in staging, so removing it.", outgoingBatch.getNodeBatchId());
resource.delete();
}
}
}
} else if (status == Status.RS) {
log.info("The outgoing batch {} received resend request", outgoingBatch.getNodeBatchId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.jumpmind.exception.HttpException;
import org.jumpmind.exception.InvalidRetryException;
import org.jumpmind.exception.IoException;
import org.jumpmind.exception.ParseException;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.Version;
Expand All @@ -75,6 +76,7 @@
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.DataProcessor;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.ProtocolException;
import org.jumpmind.symmetric.io.data.reader.DataReaderStatistics;
import org.jumpmind.symmetric.io.data.reader.ProtocolDataReader;
import org.jumpmind.symmetric.io.data.transform.TransformPoint;
Expand Down Expand Up @@ -1049,6 +1051,15 @@ protected IDataWriter chooseDataWriter(Batch batch) {
processor.setDataReader(buildDataReader(batchInStaging, resource));
processor.process(ctx);
} else {
if (e instanceof ParseException || e instanceof ProtocolException) {
log.info("The batch {} may be corrupt in staging, so removing it.", batchInStaging.getNodeBatchId());
resource.delete();
incomingBatch = listener.currentBatch;
incomingBatch.setStatus(Status.ER);
incomingBatch.setSqlCode(ErrorConstants.PROTOCOL_VIOLATION_CODE);
incomingBatch.setSqlState(ErrorConstants.PROTOCOL_VIOLATION_STATE);
incomingBatchService.updateIncomingBatch(incomingBatch);
}
isError = true;
throw e;
}
Expand Down

0 comments on commit 9ea369d

Please sign in to comment.