Skip to content

Commit

Permalink
0005636: Only cancel a full load in progress if the reload select is
Browse files Browse the repository at this point in the history
empty
  • Loading branch information
joshahicks committed Dec 15, 2022
1 parent cc6df13 commit 4521d85
Showing 1 changed file with 27 additions and 23 deletions.
Expand Up @@ -96,31 +96,35 @@ public synchronized void queueLoads(boolean force) {

@Override
public void cancelLoad(TableReloadStatus status) {
Node identity = engine.getNodeService().findIdentity();
boolean isSourceNode = identity != null && identity.getNodeId().equals(status.getSourceNodeId());
log.info("Cancelling {} load {} {} node {}", isSourceNode ? "outgoing" : "incoming", status.getLoadId(),
isSourceNode ? "for" : "from", isSourceNode ? status.getTargetNodeId() : status.getSourceNodeId());
List<ProcessInfo> infos = engine.getStatisticManager().getProcessInfos();
for (ProcessInfo info : infos) {
if (info.getCurrentLoadId() == status.getLoadId()) {
log.info("Sending interrupt to " + info.getKey() + ",batchId=" + info.getCurrentBatchId());
info.getThread().interrupt();
TableReloadRequest reloadRequest = engine.getDataService().getTableReloadRequest(status.getLoadId());

if (status.isFullLoad() && (reloadRequest.getReloadSelect().isEmpty() || reloadRequest.getReloadSelect() == null)) {
Node identity = engine.getNodeService().findIdentity();
boolean isSourceNode = identity != null && identity.getNodeId().equals(status.getSourceNodeId());
log.info("Cancelling {} load {} {} node {}", isSourceNode ? "outgoing" : "incoming", status.getLoadId(),
isSourceNode ? "for" : "from", isSourceNode ? status.getTargetNodeId() : status.getSourceNodeId());
List<ProcessInfo> infos = engine.getStatisticManager().getProcessInfos();
for (ProcessInfo info : infos) {
if (info.getCurrentLoadId() == status.getLoadId()) {
log.info("Sending interrupt to " + info.getKey() + ",batchId=" + info.getCurrentBatchId());
info.getThread().interrupt();
}
}
}
if (isSourceNode) {
IOutgoingBatchService outgoingBatchService = engine.getOutgoingBatchService();
int count = engine.getDataService().updateTableReloadRequestsCancelled(status.getLoadId());
log.info("Marked {} load requests as OK for node {}", count, status.getTargetNodeId());
count = engine.getDataExtractorService().cancelExtractRequests(status.getLoadId());
log.info("Marked {} extract requests as OK for node {}", count, status.getTargetNodeId());
count = outgoingBatchService.cancelLoadBatches(status.getLoadId());
log.info("Marked {} batches as OK or IG for node {}", count, status.getTargetNodeId());
engine.getDataExtractorService().releaseMissedExtractRequests();
if (status.isFullLoad()) {
engine.getNodeService().setInitialLoadEnded(null, status.getTargetNodeId());
if (isSourceNode) {
IOutgoingBatchService outgoingBatchService = engine.getOutgoingBatchService();
int count = engine.getDataService().updateTableReloadRequestsCancelled(status.getLoadId());
log.info("Marked {} load requests as OK for node {}", count, status.getTargetNodeId());
count = engine.getDataExtractorService().cancelExtractRequests(status.getLoadId());
log.info("Marked {} extract requests as OK for node {}", count, status.getTargetNodeId());
count = outgoingBatchService.cancelLoadBatches(status.getLoadId());
log.info("Marked {} batches as OK or IG for node {}", count, status.getTargetNodeId());
engine.getDataExtractorService().releaseMissedExtractRequests();
if (status.isFullLoad()) {
engine.getNodeService().setInitialLoadEnded(null, status.getTargetNodeId());
}
} else {
engine.getDataService().updateTableReloadRequestsCancelled(status.getLoadId());
}
} else {
engine.getDataService().updateTableReloadRequestsCancelled(status.getLoadId());
}
}

Expand Down

0 comments on commit 4521d85

Please sign in to comment.