Skip to content

Commit

Permalink
0001350: Push batch not getting acked when multiple batches exist and…
Browse files Browse the repository at this point in the history
… it passed the transport.max.bytes.to.sync threshold
  • Loading branch information
chenson42 committed Jul 26, 2013
1 parent d3cd8d6 commit a46921f
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -493,24 +493,24 @@ protected List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode,
currentBatch = sendOutgoingBatch(processInfo, targetNode, currentBatch,
dataWriter);
}

processedBatches.add(currentBatch);

if (currentBatch.getStatus() != Status.OK) {
currentBatch.setLoadCount(currentBatch.getLoadCount() + 1);
changeBatchStatus(Status.LD, currentBatch);

bytesSentCount += currentBatch.getByteCount();
batchesSentCount++;

if (bytesSentCount >= maxBytesToSync && i < activeBatches.size() - 1) {
if (bytesSentCount >= maxBytesToSync && processedBatches.size() < activeBatches.size()) {
log.info(
"Reached the total byte threshold after {} of {} batches were extracted for {}. The remaining batches will be extracted on a subsequent sync",
"Reached the total byte threshold after {} of {} batches were extracted for node '{}'. The remaining batches will be extracted on a subsequent sync",
new Object[] { batchesSentCount, activeBatches.size(),
targetNode.getNodeId() });
break;
}
}

processedBatches.add(currentBatch);
}
}

} catch (RuntimeException e) {
Expand Down Expand Up @@ -931,7 +931,7 @@ public List<String> getExtractRequestNodes() {

public List<ExtractRequest> getExtractRequestsForNode(String nodeId) {
return sqlTemplate.query(getSql("selectExtractRequestForNodeSql"),
new ExtractRequestMapper(), nodeId, Status.NE.name());
new ExtractRequestMapper(), nodeId, ExtractRequest.ExtractStatus.NE.name());
}

protected void resetExtractRequest(OutgoingBatch batch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class StagingManager implements IStagingManager {

protected static final Logger log = LoggerFactory.getLogger(StagingManager.class);


protected File directory;

protected Map<String, IStagedResource> resourceList = new ConcurrentHashMap<String, IStagedResource>();
Expand Down

0 comments on commit a46921f

Please sign in to comment.