Skip to content

Commit

Permalink
0006130: Too many "Expected but did not receive an ack for batch"
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Nov 28, 2023
1 parent 8379735 commit d4d0b93
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void process() throws IOException {
writeLine(binaryLine);
writeLine(channelLine);
writeLine(line);
if (listeners != null) {
if (listeners != null && exception == null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.start(context, batch);
}
Expand All @@ -157,7 +157,7 @@ public void process() throws IOException {
batchTableLines.clear();
if (batch != null) {
batch.setStatistics(batchStats);
if (listeners != null) {
if (listeners != null && exception == null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.end(context, batch, resource);
}
Expand Down Expand Up @@ -192,7 +192,7 @@ public void process() throws IOException {
debugLine(channelLine);
debugLine(line);
}
if (listeners != null) {
if (listeners != null && exception == null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.start(context, batch);
}
Expand Down Expand Up @@ -262,7 +262,9 @@ public void process() throws IOException {
}
processInfo.setStatus(ProcessStatus.OK);
} catch (Exception ex) {
exception = ex;
if (exception == null) {
exception = ex;
}
if (resource != null) {
resource.delete();
}
Expand Down Expand Up @@ -311,7 +313,8 @@ protected void writeLine(String line) throws IOException {
writer.write(line);
writer.write("\n");
} else {
throw new ProtocolException("Batch data is corrupt because no batch ID is present");
exception = new ProtocolException("Batch data is corrupt because no batch ID was present for DML lines");
processInfo.setStatus(ProcessStatus.ERROR);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.mapper.NumberMapper;
Expand Down Expand Up @@ -267,37 +266,32 @@ public void checkMissingAck(List<BatchAck> acks, String queue) {
}
if (hasCorruptBatch && parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) {
OutgoingBatches batches = engine.getOutgoingBatchService().getOutgoingBatches(nodeId, queue, false);
if (batches.containsBatches()) {
Map<Long, OutgoingBatch> batchMap = new HashMap<Long, OutgoingBatch>();
for (OutgoingBatch batch : batches.getBatches()) {
if (batch.getStatus() == Status.LD) {
batchMap.put(batch.getBatchId(), batch);
}
}
for (BatchAck ack : acks) {
batchMap.remove(ack.getBatchId());
Iterator<OutgoingBatch> iter = batches.getBatches().iterator();
while (iter.hasNext()) {
OutgoingBatch batch = iter.next();
if (batch.getStatus() == Status.RQ || batch.getStatus() == Status.NE) {
iter.remove();
}
if (batchMap.size() > 0) {
for (OutgoingBatch batch : batchMap.values()) {
StringBuilder message = new StringBuilder(128);
message.append("Expected but did not receive an ack for batch ");
message.append(batch.getNodeBatchId()).append(". ");
if (!batch.isLoadFlag()) {
message.append("This could be because the batch is corrupt - removing the batch from staging.");
log.warn(message.toString());
IStagedResource resource = engine.getStagingManager().find(Constants.STAGING_CATEGORY_OUTGOING,
batch.getStagedLocation(), batch.getBatchId());
if (resource != null) {
resource.delete();
} else {
log.warn("Unable to find outgoing staging file for node {} that has corrupt batch data", nodeId);
}
} else {
message.append(
"This could be because the batch is corrupt. Not removing the batch because it was a load batch, but you may need to clear the batch from staging manually.");
log.warn(message.toString());
}
}
if (batches.containsBatches()) {
OutgoingBatch batch = batches.getBatches().get(0);
StringBuilder message = new StringBuilder(128);
message.append("Expected but did not receive an ack for batch ");
message.append(batch.getNodeBatchId()).append(". ");
if (!batch.isLoadFlag()) {
message.append("This could be because the batch is corrupt - removing the batch from staging.");
log.warn(message.toString());
IStagedResource resource = engine.getStagingManager().find(Constants.STAGING_CATEGORY_OUTGOING,
batch.getStagedLocation(), batch.getBatchId());
if (resource != null) {
resource.delete();
} else {
log.warn("Unable to find outgoing staging file for {} that has corrupt batch data", batch.getNodeBatchId());
}
} else {
message.append(
"This could be because the batch is corrupt. Not removing the batch because it was a load batch, but you may need to clear the batch from staging manually.");
log.warn(message.toString());
}
} else {
log.warn("Unable to find outgoing batch for node {} that has corrupt batch data", nodeId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ protected IDataWriter chooseDataWriter(Batch batch) {
}
}
List<IncomingBatch> batchesProcessed = listener.getBatchesProcessed();
if (error != null) {
if (error != null && error instanceof ProtocolException) {
batchesProcessed.add(new IncomingBatch());
}
return batchesProcessed;
Expand Down

0 comments on commit d4d0b93

Please sign in to comment.