From 86ea19f8e9d1791fdddfb3ba49a3d1f6bee8cffa Mon Sep 17 00:00:00 2001 From: Chris Henson Date: Thu, 22 Jun 2017 16:47:18 -0400 Subject: [PATCH] 0002911: Immediate re-pull when data was just pulled --- .../symmetric/service/impl/PullService.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java index 4ad51b67c4..3244b24476 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java @@ -116,15 +116,18 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status boolean immediatePullIfDataFound = parameterService.is(ParameterConstants.PULL_IMMEDIATE_IF_DATA_FOUND, false); if (StringUtils.isNotBlank(node.getSyncUrl()) || !parameterService.isRegistrationServer()) { + long lastBatchesProcessed = 0; + long lastDataProcessed = 0; + long cumulativeBatchesProcessed = 0; + long cumulativeDataProcessed = 0; do { - log.debug("Pull requested for {}", node.toString()); - if (status.getBatchesProcessed() > 0) { + log.debug("Pull requested for {}", node.toString()); + if (lastBatchesProcessed > 0) { if (nodeService.isDataLoadStarted()) { log.info("Immediate pull requested while in reload mode"); } else { log.debug("Immediate pull requested while data found"); } - status.resetCounts(); } try { @@ -134,25 +137,29 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status fireOffline(ex, node, status); } - if (!status.failed() && (status.getDataProcessed() > 0 || status.getBatchesProcessed() > 0)) { + lastBatchesProcessed = status.getBatchesProcessed() - cumulativeBatchesProcessed; + lastDataProcessed = status.getDataProcessed() - cumulativeDataProcessed; + if (!status.failed() && (lastDataProcessed > 0 || lastDataProcessed > 0)) { log.info( "Pull data received from {} {}. {} rows and {} batches were processed", new Object[] { node.toString(), "on channel thread " + nodeCommunication.getQueue(), - status.getDataProcessed(), status.getBatchesProcessed() }); + lastDataProcessed, lastDataProcessed }); } else if (status.failed()) { log.debug( "There was a failure while pulling data from {} {}. {} rows and {} batches were processed", new Object[] { node.toString(), "on channel thread " + nodeCommunication.getQueue(), - status.getDataProcessed(), status.getBatchesProcessed() }); + lastDataProcessed, lastBatchesProcessed }); } /* * Re-pull immediately if we are in the middle of an initial * load so that the initial load completes as quickly as * possible. */ + cumulativeDataProcessed = status.getDataProcessed(); + cumulativeBatchesProcessed = status.getBatchesProcessed(); } while ((immediatePullIfDataFound || nodeService.isDataLoadStarted()) && !status.failed() - && status.getBatchesProcessed() > 0); + && lastBatchesProcessed > 0); } else { log.warn("Cannot pull node '{}' in the group '{}'. The sync url is blank", node.getNodeId(), node.getNodeGroupId());