Skip to content

Commit

Permalink
0002911: Immediate re-pull when data was just pulled
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jun 22, 2017
1 parent aaa4f5d commit 86ea19f
Showing 1 changed file with 14 additions and 7 deletions.
Expand Up @@ -116,15 +116,18 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
boolean immediatePullIfDataFound = parameterService.is(ParameterConstants.PULL_IMMEDIATE_IF_DATA_FOUND, false);

if (StringUtils.isNotBlank(node.getSyncUrl()) || !parameterService.isRegistrationServer()) {
long lastBatchesProcessed = 0;
long lastDataProcessed = 0;
long cumulativeBatchesProcessed = 0;
long cumulativeDataProcessed = 0;
do {
log.debug("Pull requested for {}", node.toString());
if (status.getBatchesProcessed() > 0) {
log.debug("Pull requested for {}", node.toString());
if (lastBatchesProcessed > 0) {
if (nodeService.isDataLoadStarted()) {
log.info("Immediate pull requested while in reload mode");
} else {
log.debug("Immediate pull requested while data found");
}
status.resetCounts();
}

try {
Expand All @@ -134,25 +137,29 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
fireOffline(ex, node, status);
}

if (!status.failed() && (status.getDataProcessed() > 0 || status.getBatchesProcessed() > 0)) {
lastBatchesProcessed = status.getBatchesProcessed() - cumulativeBatchesProcessed;
lastDataProcessed = status.getDataProcessed() - cumulativeDataProcessed;
if (!status.failed() && (lastDataProcessed > 0 || lastDataProcessed > 0)) {
log.info(
"Pull data received from {} {}. {} rows and {} batches were processed",
new Object[] { node.toString(), "on channel thread " + nodeCommunication.getQueue(),
status.getDataProcessed(), status.getBatchesProcessed() });
lastDataProcessed, lastDataProcessed });

} else if (status.failed()) {
log.debug(
"There was a failure while pulling data from {} {}. {} rows and {} batches were processed",
new Object[] { node.toString(), "on channel thread " + nodeCommunication.getQueue(),
status.getDataProcessed(), status.getBatchesProcessed() });
lastDataProcessed, lastBatchesProcessed });
}
/*
* Re-pull immediately if we are in the middle of an initial
* load so that the initial load completes as quickly as
* possible.
*/
cumulativeDataProcessed = status.getDataProcessed();
cumulativeBatchesProcessed = status.getBatchesProcessed();
} while ((immediatePullIfDataFound || nodeService.isDataLoadStarted()) && !status.failed()
&& status.getBatchesProcessed() > 0);
&& lastBatchesProcessed > 0);
} else {
log.warn("Cannot pull node '{}' in the group '{}'. The sync url is blank",
node.getNodeId(), node.getNodeGroupId());
Expand Down

0 comments on commit 86ea19f

Please sign in to comment.