diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/InitialLoadService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/InitialLoadService.java index 6f6c097dae..095dfa5145 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/InitialLoadService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/InitialLoadService.java @@ -96,31 +96,35 @@ public synchronized void queueLoads(boolean force) { @Override public void cancelLoad(TableReloadStatus status) { - Node identity = engine.getNodeService().findIdentity(); - boolean isSourceNode = identity != null && identity.getNodeId().equals(status.getSourceNodeId()); - log.info("Cancelling {} load {} {} node {}", isSourceNode ? "outgoing" : "incoming", status.getLoadId(), - isSourceNode ? "for" : "from", isSourceNode ? status.getTargetNodeId() : status.getSourceNodeId()); - List infos = engine.getStatisticManager().getProcessInfos(); - for (ProcessInfo info : infos) { - if (info.getCurrentLoadId() == status.getLoadId()) { - log.info("Sending interrupt to " + info.getKey() + ",batchId=" + info.getCurrentBatchId()); - info.getThread().interrupt(); + TableReloadRequest reloadRequest = engine.getDataService().getTableReloadRequest(status.getLoadId()); + + if (status.isFullLoad() && (reloadRequest.getReloadSelect().isEmpty() || reloadRequest.getReloadSelect() == null)) { + Node identity = engine.getNodeService().findIdentity(); + boolean isSourceNode = identity != null && identity.getNodeId().equals(status.getSourceNodeId()); + log.info("Cancelling {} load {} {} node {}", isSourceNode ? "outgoing" : "incoming", status.getLoadId(), + isSourceNode ? "for" : "from", isSourceNode ? status.getTargetNodeId() : status.getSourceNodeId()); + List infos = engine.getStatisticManager().getProcessInfos(); + for (ProcessInfo info : infos) { + if (info.getCurrentLoadId() == status.getLoadId()) { + log.info("Sending interrupt to " + info.getKey() + ",batchId=" + info.getCurrentBatchId()); + info.getThread().interrupt(); + } } - } - if (isSourceNode) { - IOutgoingBatchService outgoingBatchService = engine.getOutgoingBatchService(); - int count = engine.getDataService().updateTableReloadRequestsCancelled(status.getLoadId()); - log.info("Marked {} load requests as OK for node {}", count, status.getTargetNodeId()); - count = engine.getDataExtractorService().cancelExtractRequests(status.getLoadId()); - log.info("Marked {} extract requests as OK for node {}", count, status.getTargetNodeId()); - count = outgoingBatchService.cancelLoadBatches(status.getLoadId()); - log.info("Marked {} batches as OK or IG for node {}", count, status.getTargetNodeId()); - engine.getDataExtractorService().releaseMissedExtractRequests(); - if (status.isFullLoad()) { - engine.getNodeService().setInitialLoadEnded(null, status.getTargetNodeId()); + if (isSourceNode) { + IOutgoingBatchService outgoingBatchService = engine.getOutgoingBatchService(); + int count = engine.getDataService().updateTableReloadRequestsCancelled(status.getLoadId()); + log.info("Marked {} load requests as OK for node {}", count, status.getTargetNodeId()); + count = engine.getDataExtractorService().cancelExtractRequests(status.getLoadId()); + log.info("Marked {} extract requests as OK for node {}", count, status.getTargetNodeId()); + count = outgoingBatchService.cancelLoadBatches(status.getLoadId()); + log.info("Marked {} batches as OK or IG for node {}", count, status.getTargetNodeId()); + engine.getDataExtractorService().releaseMissedExtractRequests(); + if (status.isFullLoad()) { + engine.getNodeService().setInitialLoadEnded(null, status.getTargetNodeId()); + } + } else { + engine.getDataService().updateTableReloadRequestsCancelled(status.getLoadId()); } - } else { - engine.getDataService().updateTableReloadRequestsCancelled(status.getLoadId()); } }