Skip to content

Commit

Permalink
0005203: allow cancel of load from remote node
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Jan 25, 2022
1 parent f6412f5 commit ac41008
Showing 1 changed file with 27 additions and 0 deletions.
Expand Up @@ -40,6 +40,7 @@
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterFilterAdapter;
import org.jumpmind.symmetric.model.IncomingBatch;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.model.TableReloadStatus;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
import org.slf4j.Logger;
Expand All @@ -58,6 +59,7 @@ public class ConfigurationChangedDatabaseWriterFilter extends DatabaseWriterFilt
private static final String CTX_KEY_INITIAL_LOAD_LISTENER = "InitialLoadListener." + SUFFIX;
private static final String CTX_KEY_MY_NODE_ID = "MyNodeId." + SUFFIX;
private static final String CTX_KEY_MY_NODE_SECURITY = "MyNodeSecurity." + SUFFIX;
private static final String CTX_KEY_CANCEL_LOAD = "CancelLoad." + SUFFIX;
private ISymmetricEngine engine;
private ConfigurationChangedHelper helper;

Expand Down Expand Up @@ -130,6 +132,20 @@ public void afterWrite(DataContext context, Table table, CsvData data) {
if (matchesTable(table, TableConstants.SYM_NODE_SECURITY)) {
context.put(CTX_KEY_CHANGED_NODE_SECURITY, true);
}
if (matchesTable(table, TableConstants.SYM_TABLE_RELOAD_STATUS) && data.getDataEventType() == DataEventType.UPDATE) {
Map<String, String> newData = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA);
boolean isCancelled = "1".equals(newData.get("cancelled"));
String loadId = newData.get("load_id");
if (isCancelled && loadId != null) {
@SuppressWarnings("unchecked")
List<Long> loadIds = (List<Long>) context.get(CTX_KEY_CANCEL_LOAD);
if (loadIds == null) {
loadIds = new ArrayList<Long>();
context.put(CTX_KEY_CANCEL_LOAD, loadIds);
}
loadIds.add(Long.parseLong(loadId));
}
}
}

private boolean hasClientReloadListener(DataContext context) {
Expand Down Expand Up @@ -179,6 +195,17 @@ public void syncEnded(DataContext context, List<IncomingBatch> batchesProcessed,
}
}
}
@SuppressWarnings("unchecked")
List<Long> loadIds = (List<Long>) context.get(CTX_KEY_CANCEL_LOAD);
String identityId = (String) context.get(CTX_KEY_MY_NODE_ID);
if (loadIds != null && identityId != null) {
for (Long loadId : loadIds) {
TableReloadStatus status = engine.getDataService().getTableReloadStatusByLoadId(loadId);
if (status != null && identityId.equals(status.getSourceNodeId())) {
engine.getInitialLoadService().cancelLoad(status);
}
}
}
}

@Override
Expand Down

0 comments on commit ac41008

Please sign in to comment.