diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java index b46c482dbd..9c4f20117c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java @@ -148,12 +148,15 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status !parameterService.isRegistrationServer()) { try { startTimesOfNodesBeingPushedTo.put(nodeCommunication.getIdentifier(), new Date()); - long batchesProcessed = 0; - long dataProcessed = 0; - long reloadBatchesProcessed = 0; + long lastBatchesProcessed = 0; + long lastDataProcessed = 0; + long lastReloadBatchesProcessed = 0; + long cumulativeBatchesProcessed = 0; + long cumulativeDataProcessed = 0; + long cumulativeReloadBatchesProcessed = 0; do { - if (batchesProcessed > 0) { - if (status.getReloadBatchesProcessed() > 0) { + if (lastBatchesProcessed > 0) { + if (lastReloadBatchesProcessed > 0) { log.info("Pushing to {} again because the last push contained reload batches", node); } else { log.debug("Pushing to {} again because the last push contained batches", node); @@ -161,22 +164,25 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status } log.debug("Push requested for node {} channel {}", node, nodeCommunication.getQueue()); pushToNode(node, status); - batchesProcessed = status.getBatchesProcessed() - batchesProcessed; - dataProcessed = status.getDataProcessed() - dataProcessed; - reloadBatchesProcessed = status.getReloadBatchesProcessed() - reloadBatchesProcessed; - if (!status.failed() && batchesProcessed > 0) { + lastBatchesProcessed = status.getBatchesProcessed() - cumulativeBatchesProcessed; + lastDataProcessed = status.getDataProcessed() - cumulativeDataProcessed; + lastReloadBatchesProcessed = status.getReloadBatchesProcessed() - cumulativeReloadBatchesProcessed; + if (!status.failed() && lastBatchesProcessed > 0) { log.info( "Pushed data to node {}. {} data and {} batches were processed", - new Object[] { node, dataProcessed, - batchesProcessed}); + new Object[] { node, lastDataProcessed, + lastBatchesProcessed}); } else if (status.failed()) { log.debug( "There was a failure while pushing data to {}. {} data and {} batches were processed", - new Object[] { node, dataProcessed, - batchesProcessed}); + new Object[] { node, lastDataProcessed, + lastBatchesProcessed}); } log.debug("Push completed for {} channel {}", node, nodeCommunication.getQueue()); - } while (((immediatePushIfDataFound && batchesProcessed > 0) || reloadBatchesProcessed > 0) + cumulativeReloadBatchesProcessed = status.getReloadBatchesProcessed(); + cumulativeDataProcessed = status.getDataProcessed(); + cumulativeBatchesProcessed = status.getBatchesProcessed(); + } while (((immediatePushIfDataFound && lastBatchesProcessed > 0) || lastReloadBatchesProcessed > 0) && !status.failed()); } finally { startTimesOfNodesBeingPushedTo.remove(node.getNodeId());