Skip to content

Commit

Permalink
0002911: Immediate re-pull when data was just pulled
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jun 22, 2017
1 parent f3fb7b7 commit aaa4f5d
Showing 1 changed file with 20 additions and 14 deletions.
Expand Up @@ -148,35 +148,41 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
!parameterService.isRegistrationServer()) {
try {
startTimesOfNodesBeingPushedTo.put(nodeCommunication.getIdentifier(), new Date());
long batchesProcessed = 0;
long dataProcessed = 0;
long reloadBatchesProcessed = 0;
long lastBatchesProcessed = 0;
long lastDataProcessed = 0;
long lastReloadBatchesProcessed = 0;
long cumulativeBatchesProcessed = 0;
long cumulativeDataProcessed = 0;
long cumulativeReloadBatchesProcessed = 0;
do {
if (batchesProcessed > 0) {
if (status.getReloadBatchesProcessed() > 0) {
if (lastBatchesProcessed > 0) {
if (lastReloadBatchesProcessed > 0) {
log.info("Pushing to {} again because the last push contained reload batches", node);
} else {
log.debug("Pushing to {} again because the last push contained batches", node);
}
}
log.debug("Push requested for node {} channel {}", node, nodeCommunication.getQueue());
pushToNode(node, status);
batchesProcessed = status.getBatchesProcessed() - batchesProcessed;
dataProcessed = status.getDataProcessed() - dataProcessed;
reloadBatchesProcessed = status.getReloadBatchesProcessed() - reloadBatchesProcessed;
if (!status.failed() && batchesProcessed > 0) {
lastBatchesProcessed = status.getBatchesProcessed() - cumulativeBatchesProcessed;
lastDataProcessed = status.getDataProcessed() - cumulativeDataProcessed;
lastReloadBatchesProcessed = status.getReloadBatchesProcessed() - cumulativeReloadBatchesProcessed;
if (!status.failed() && lastBatchesProcessed > 0) {
log.info(
"Pushed data to node {}. {} data and {} batches were processed",
new Object[] { node, dataProcessed,
batchesProcessed});
new Object[] { node, lastDataProcessed,
lastBatchesProcessed});
} else if (status.failed()) {
log.debug(
"There was a failure while pushing data to {}. {} data and {} batches were processed",
new Object[] { node, dataProcessed,
batchesProcessed});
new Object[] { node, lastDataProcessed,
lastBatchesProcessed});
}
log.debug("Push completed for {} channel {}", node, nodeCommunication.getQueue());
} while (((immediatePushIfDataFound && batchesProcessed > 0) || reloadBatchesProcessed > 0)
cumulativeReloadBatchesProcessed = status.getReloadBatchesProcessed();
cumulativeDataProcessed = status.getDataProcessed();
cumulativeBatchesProcessed = status.getBatchesProcessed();
} while (((immediatePushIfDataFound && lastBatchesProcessed > 0) || lastReloadBatchesProcessed > 0)
&& !status.failed());
} finally {
startTimesOfNodesBeingPushedTo.remove(node.getNodeId());
Expand Down

0 comments on commit aaa4f5d

Please sign in to comment.