Skip to content

Commit

Permalink
0001165: For an initial load on push, exit out of load loop if one of…
Browse files Browse the repository at this point in the history
… the batches failed
  • Loading branch information
chenson42 committed Apr 6, 2013
1 parent acdcb26 commit 2976893
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
Expand Up @@ -105,14 +105,18 @@ public void updateOutgoingStatus(List<OutgoingBatch> outgoingBatches, List<Batch
}
}
}

if (outgoingBatches != null) {
for (OutgoingBatch batch : outgoingBatches) {
batchesProcessed++;
dataProcessed += batch.totalEventCount();
if (Constants.CHANNEL_RELOAD.equals(batch.getChannelId())) {
reloadBatchesProcessed++;
}

if (batch.getStatus() == OutgoingBatch.Status.ER) {
status = Status.DATA_ERROR;
}
}
}

Expand Down
Expand Up @@ -144,27 +144,25 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
try {
startTimesOfNodesBeingPushedTo.put(node.getNodeId(), new Date());
long reloadBatchesProcessed = 0;
int pushCount = 0;
long lastBatchCount = 0;
do {
if (pushCount > 0) {
if (lastBatchCount > 0) {
log.info(
"Pushing to {} again because the last push contained reload batches",
node);
}
reloadBatchesProcessed = status.getReloadBatchesProcessed();
log.debug("Push requested for {}", node);
pushToNode(node, status);
if (status.getBatchesProcessed() > 0) {
if (status.getBatchesProcessed() > 0 && status.getBatchesProcessed() != lastBatchCount) {
log.info(
"Pushed data to {}. {} data and {} batches were processed",
"Pushed data to {}. {} data and {} batches were processed{}",
new Object[] { node, status.getDataProcessed(),
status.getBatchesProcessed() });
} else if (status.failed()) {
log.warn("There was an error while pushing data to the server");
status.getBatchesProcessed(), status.failed() ? ". There was at least one failure" : "" });
}
log.debug("Push completed for {}", node);
pushCount++;
} while (status.getReloadBatchesProcessed() > reloadBatchesProcessed);
lastBatchCount = status.getBatchesProcessed();
} while (status.getReloadBatchesProcessed() > reloadBatchesProcessed && !status.failed());
} finally {
startTimesOfNodesBeingPushedTo.remove(node.getNodeId());
}
Expand Down

0 comments on commit 2976893

Please sign in to comment.