Skip to content

Commit

Permalink
0003318: common batches can get corrupted because of 0003275 and 0003282
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Nov 16, 2017
1 parent 0de299c commit 215ebf5
Showing 1 changed file with 5 additions and 3 deletions.
Expand Up @@ -868,12 +868,13 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe
if (currentBatch.getStatus() == Status.IG) {
cleanupIgnoredBatch(sourceNode, targetNode, currentBatch, writer);
} else if (!isPreviouslyExtracted(currentBatch, true)) {

BatchLock lock = null;
try {
log.debug("{} attempting to acquire lock for batch {}", targetNode.getNodeId(), currentBatch.getBatchId());
lock = acquireLock(currentBatch, useStagingDataWriter);

log.debug("{} acquired lock for batch {}", targetNode.getNodeId(), currentBatch.getBatchId());
if (!isPreviouslyExtracted(currentBatch, true)) {
log.debug("{} extracting batch {}", targetNode.getNodeId(), currentBatch.getBatchId());
currentBatch.setExtractCount(currentBatch.getExtractCount() + 1);
if (updateBatchStatistics) {
changeBatchStatus(Status.QY, currentBatch, mode);
Expand Down Expand Up @@ -910,12 +911,13 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe
resource.delete();
}
throw ex;
} finally {
} finally {
IStagedResource resource = getStagedResource(currentBatch);
if (resource != null) {
resource.setState(State.DONE);
}
releaseLock(lock, currentBatch, useStagingDataWriter);
log.debug("{} released lock for batch {}", targetNode.getNodeId(), currentBatch.getBatchId());
}
}

Expand Down

0 comments on commit 215ebf5

Please sign in to comment.