Skip to content

Commit

Permalink
Merge branch 'woehrl01-patch-9' into 3.9
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Nov 21, 2016
2 parents cbae48c + f109dca commit f6048ff
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
Expand Up @@ -69,6 +69,7 @@ private ParameterConstants() {
public final static String PULL_THREAD_COUNT_PER_SERVER = "pull.thread.per.server.count";
public final static String PULL_MINIMUM_PERIOD_MS = "pull.period.minimum.ms";
public final static String PULL_LOCK_TIMEOUT_MS = "pull.lock.timeout.ms";
public final static String PULL_IMMEDIATE_IF_DATA_FOUND = "pull.immediate.if.data.found";

public final static String PUSH_THREAD_COUNT_PER_SERVER = "push.thread.per.server.count";
public final static String PUSH_MINIMUM_PERIOD_MS = "push.period.minimum.ms";
Expand Down Expand Up @@ -386,4 +387,4 @@ public static Set<String> getAllParameterTags() {
}
return tags;
}
}
}
Expand Up @@ -111,6 +111,9 @@ synchronized public RemoteNodeStatuses pullData(boolean force) {

public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status) {
Node node = nodeCommunication.getNode();

boolean immediatePullIfDataFound = parameterService.is(ParameterConstants.PULL_IMMEDIATE_IF_DATA_FOUND, false);

if (StringUtils.isNotBlank(node.getSyncUrl()) || !parameterService.isRegistrationServer()) {
int pullCount = 0;
long batchesProcessedCount = 0;
Expand All @@ -119,7 +122,11 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
pullCount++;
log.debug("Pull requested for {}", node.toString());
if (pullCount > 1) {
log.info("Immediate pull requested while in reload mode");
if(immediatePullIfDataFound && pullCount > 2){
log.info("Immediate pull requested while data found");
}else{
log.info("Immediate pull requested while in reload mode");
}
}

try {
Expand All @@ -129,7 +136,7 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
fireOffline(ex, node, status);
}

if (!status.failed() && (status.getDataProcessed() > 0 || status.getBatchesProcessed() > 0)) {
if (!status.failed() && ((status.getDataProcessed() > 0 && !immediatePullIfDataFound) || status.getBatchesProcessed() > batchesProcessedCount)) {
log.info(
"Pull data received from {} {}. {} rows and {} batches were processed",
new Object[] { node.toString(), "on channel thread " + nodeCommunication.getQueue(),
Expand All @@ -146,7 +153,7 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
* load so that the initial load completes as quickly as
* possible.
*/
} while (nodeService.isDataLoadStarted() && !status.failed()
} while ((immediatePullIfDataFound || nodeService.isDataLoadStarted()) && !status.failed()
&& status.getBatchesProcessed() > batchesProcessedCount);
} else {
log.warn("Cannot pull node '{}' in the group '{}'. The sync url is blank",
Expand Down

0 comments on commit f6048ff

Please sign in to comment.