Skip to content

Commit

Permalink
0006129: Too many "Expected but did not receive an ack for batch"
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Nov 28, 2023
1 parent 7cb6da3 commit fa09056
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 38 deletions.
Expand Up @@ -147,7 +147,7 @@ public void process() throws IOException {
writeLine(channelLine);
writeLine(line);

if (listeners != null) {
if (listeners != null && exception == null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.start(context, batch);
}
Expand All @@ -163,7 +163,7 @@ public void process() throws IOException {

if (batch != null) {
batch.setStatistics(batchStats);
if (listeners != null) {
if (listeners != null && exception == null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.end(context, batch, resource);
}
Expand Down Expand Up @@ -200,7 +200,7 @@ public void process() throws IOException {
debugLine(line);
}

if (listeners != null) {
if (listeners != null && exception == null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.start(context, batch);
}
Expand Down Expand Up @@ -275,7 +275,9 @@ public void process() throws IOException {

processInfo.setStatus(ProcessStatus.OK);
} catch (Exception ex) {
exception = ex;
if (exception == null) {
exception = ex;
}
if (resource != null) {
resource.delete();
}
Expand Down Expand Up @@ -327,7 +329,8 @@ protected void writeLine(String line) throws IOException {
writer.write(line);
writer.write("\n");
} else {
throw new ProtocolException("Batch data is corrupt because no batch ID is present");
exception = new ProtocolException("Batch data is corrupt because no batch ID was present for DML lines");
processInfo.setStatus(ProcessStatus.ERROR);
}
}
}
Expand Down
Expand Up @@ -22,9 +22,8 @@

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.mapper.NumberMapper;
Expand Down Expand Up @@ -273,38 +272,32 @@ public void checkMissingAck(List<BatchAck> acks, String queue) {
}
if (hasCorruptBatch && parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) {
OutgoingBatches batches = engine.getOutgoingBatchService().getOutgoingBatches(nodeId, queue, false);
if (batches.containsBatches()) {
Map<Long, OutgoingBatch> batchMap = new HashMap<Long, OutgoingBatch>();
for (OutgoingBatch batch : batches.getBatches()) {
if (batch.getStatus() == Status.LD) {
batchMap.put(batch.getBatchId(), batch);
}
Iterator<OutgoingBatch> iter = batches.getBatches().iterator();
while (iter.hasNext()) {
OutgoingBatch batch = iter.next();
if (batch.getStatus() == Status.RQ || batch.getStatus() == Status.NE) {
iter.remove();
}
for (BatchAck ack : acks) {
batchMap.remove(ack.getBatchId());
}
if (batchMap.size() > 0) {
for (OutgoingBatch batch : batchMap.values()) {
StringBuilder message = new StringBuilder(128);
message.append("Expected but did not receive an ack for batch ");
message.append(batch.getNodeBatchId()).append(". ");

if (!batch.isLoadFlag()) {
message.append("This could be because the batch is corrupt - removing the batch from staging.");
log.warn(message.toString());
IStagedResource resource = engine.getStagingManager().find(Constants.STAGING_CATEGORY_OUTGOING,
batch.getStagedLocation(), batch.getBatchId());
if (resource != null) {
resource.delete();
} else {
log.warn("Unable to find outgoing staging file for node {} that has corrupt batch data", nodeId);
}
} else {
message.append(
"This could be because the batch is corrupt. Not removing the batch because it was a load batch, but you may need to clear the batch from staging manually.");
log.warn(message.toString());
}
}
if (batches.containsBatches()) {
OutgoingBatch batch = batches.getBatches().get(0);
StringBuilder message = new StringBuilder(128);
message.append("Expected but did not receive an ack for batch ");
message.append(batch.getNodeBatchId()).append(". ");
if (!batch.isLoadFlag()) {
message.append("This could be because the batch is corrupt - removing the batch from staging.");
log.warn(message.toString());
IStagedResource resource = engine.getStagingManager().find(Constants.STAGING_CATEGORY_OUTGOING,
batch.getStagedLocation(), batch.getBatchId());
if (resource != null) {
resource.delete();
} else {
log.warn("Unable to find outgoing staging file for {} that has corrupt batch data", batch.getNodeBatchId());
}
} else {
message.append(
"This could be because the batch is corrupt. Not removing the batch because it was a load batch, but you may need to clear the batch from staging manually.");
log.warn(message.toString());
}
} else {
log.warn("Unable to find outgoing batch for node {} that has corrupt batch data", nodeId);
Expand Down
Expand Up @@ -676,7 +676,7 @@ protected IDataWriter chooseDataWriter(Batch batch) {
}
}
List<IncomingBatch> batchesProcessed = listener.getBatchesProcessed();
if (error != null) {
if (error != null && error instanceof ProtocolException) {
batchesProcessed.add(new IncomingBatch());
}
return batchesProcessed;
Expand Down

0 comments on commit fa09056

Please sign in to comment.