Skip to content

Commit

Permalink
0004405: Initial Load: Retrieve active trigger histories after
Browse files Browse the repository at this point in the history
retrieving Sync Trigger lock
  • Loading branch information
philipmarzullo64 committed May 22, 2020
1 parent 90e1ab8 commit 21ce112
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.jumpmind.symmetric.ext.IHeartbeatListener;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.load.IReloadGenerator;
import org.jumpmind.symmetric.model.AbstractBatch.Status;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataEvent;
Expand Down Expand Up @@ -103,8 +104,11 @@ public String reloadTableImmediate(String nodeId, String catalogName, String sch

public String sendSQL(String nodeId, String sql);

public Map<Integer, ExtractRequest> insertReloadEvents(Node targetNode, boolean reverse, List<TableReloadRequest> reloadRequests, ProcessInfo processInfo,
List<TriggerHistory> activeHistories, List<TriggerRouter> triggerRouters, Map<Integer, ExtractRequest> extractRequests);
public Map<Integer, ExtractRequest> insertReloadEvents(
Node targetNode, boolean reverse, List<TableReloadRequest> reloadRequests,
ProcessInfo processInfo, List<TriggerRouter> triggerRouters,
Map<Integer, ExtractRequest> extractRequests,
IReloadGenerator reloadGenerator);

public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtClient);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.jumpmind.symmetric.io.data.transform.TransformPoint;
import org.jumpmind.symmetric.job.OracleNoOrderHeartbeat;
import org.jumpmind.symmetric.job.PushHeartbeatListener;
import org.jumpmind.symmetric.load.IReloadGenerator;
import org.jumpmind.symmetric.load.IReloadListener;
import org.jumpmind.symmetric.load.IReloadVariableFilter;
import org.jumpmind.symmetric.model.AbstractBatch.Status;
Expand Down Expand Up @@ -813,13 +814,22 @@ private String getReloadChannelIdForTrigger(Trigger trigger, Map<String, Channel

@Override
public Map<Integer, ExtractRequest> insertReloadEvents(Node targetNode, boolean reverse, List<TableReloadRequest> reloadRequests, ProcessInfo processInfo,
List<TriggerHistory> activeHistories, List<TriggerRouter> triggerRouters, Map<Integer, ExtractRequest> extractRequests) {
List<TriggerRouter> triggerRouters, Map<Integer, ExtractRequest> extractRequests,
IReloadGenerator reloadGenerator)
{
if (engine.getClusterService().lock(ClusterConstants.SYNC_TRIGGERS)) {
try {
INodeService nodeService = engine.getNodeService();
ITriggerRouterService triggerRouterService = engine.getTriggerRouterService();

synchronized (triggerRouterService) {

List<TriggerHistory> activeHistories = null;
if (reloadGenerator == null) {
activeHistories = triggerRouterService.getActiveTriggerHistories();
} else {
activeHistories = reloadGenerator.getActiveTriggerHistories(targetNode);
}

boolean isFullLoad = reloadRequests == null
|| (reloadRequests.size() == 1 && reloadRequests.get(0).isFullLoadRequest());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.jumpmind.symmetric.model.ExtractRequest;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfoKey;
Expand Down Expand Up @@ -236,12 +235,8 @@ protected void processTableRequestLoads(Node source, ProcessInfo processInfo) {
Map<String, List<TableReloadRequest>> requestsSplitByLoad = new HashMap<String, List<TableReloadRequest>>();
Map<String, List<TriggerRouter>> triggerRoutersByNodeGroup = new HashMap<String, List<TriggerRouter>>();
Map<Integer, ExtractRequest> extractRequests = null;
List<TriggerHistory> activeHistories = null;

IReloadGenerator reloadGenerator = extensionService.getExtensionPoint(IReloadGenerator.class);
if (reloadGenerator == null) {
activeHistories = engine.getTriggerRouterService().getActiveTriggerHistories();
}

for (TableReloadRequest load : loadsToProcess) {
Node targetNode = engine.getNodeService().findNode(load.getTargetNodeId(), true);
Expand All @@ -251,12 +246,9 @@ protected void processTableRequestLoads(Node source, ProcessInfo processInfo) {
fullLoad.add(load);
List<TriggerRouter> triggerRouters = getTriggerRoutersForNodeGroup(triggerRoutersByNodeGroup, targetNode.getNodeGroupId());

if (reloadGenerator != null) {
activeHistories = reloadGenerator.getActiveTriggerHistories(targetNode);
}

extractRequests = engine.getDataService().insertReloadEvents(targetNode, false, fullLoad, processInfo, activeHistories,
triggerRouters, extractRequests);
extractRequests = engine.getDataService().insertReloadEvents(targetNode, false, fullLoad, processInfo,
triggerRouters, extractRequests, reloadGenerator);

loadCountToProcess--;
if (++activeLoadCount >= maxLoadCount) {
Expand Down Expand Up @@ -302,12 +294,9 @@ protected void processTableRequestLoads(Node source, ProcessInfo processInfo) {
targetNode.getNodeGroupId());
triggerRoutersByTargetNodeGroupId.put(targetNode.getNodeGroupId(), triggerRouters);
}
if (reloadGenerator != null) {
activeHistories = reloadGenerator.getActiveTriggerHistories(targetNode);
}

extractRequests = engine.getDataService().insertReloadEvents(targetNode, false, entry.getValue(), processInfo, activeHistories,
triggerRouters, extractRequests);
extractRequests = engine.getDataService().insertReloadEvents(targetNode, false, entry.getValue(), processInfo,
triggerRouters, extractRequests, reloadGenerator);

loadCountToProcess--;
if (++activeLoadCount >= maxLoadCount) {
Expand Down Expand Up @@ -353,23 +342,23 @@ protected List<NodeSecurity> findNodesThatAreReadyForInitialLoad() {
return toReturn;
}

protected void sendReverseInitialLoad(ProcessInfo processInfo, List<TriggerHistory> activeHistories,
Map<String, List<TriggerRouter>> triggerRoutersByNodeGroup) {
INodeService nodeService = engine.getNodeService();
boolean queuedLoad = false;
List<Node> nodes = new ArrayList<Node>();
nodes.addAll(nodeService.findTargetNodesFor(NodeGroupLinkAction.P));
nodes.addAll(nodeService.findTargetNodesFor(NodeGroupLinkAction.W));
for (Node node : nodes) {
List<TriggerRouter> triggerRouters = getTriggerRoutersForNodeGroup(triggerRoutersByNodeGroup, node.getNodeGroupId());
engine.getDataService().insertReloadEvents(node, true, null, processInfo, activeHistories, triggerRouters, null);
queuedLoad = true;
}

if (!queuedLoad) {
log.info("{} was enabled but no nodes were linked to load", ParameterConstants.AUTO_RELOAD_REVERSE_ENABLED);
}
}
// protected void sendReverseInitialLoad(ProcessInfo processInfo, List<TriggerHistory> activeHistories,
// Map<String, List<TriggerRouter>> triggerRoutersByNodeGroup) {
// INodeService nodeService = engine.getNodeService();
// boolean queuedLoad = false;
// List<Node> nodes = new ArrayList<Node>();
// nodes.addAll(nodeService.findTargetNodesFor(NodeGroupLinkAction.P));
// nodes.addAll(nodeService.findTargetNodesFor(NodeGroupLinkAction.W));
// for (Node node : nodes) {
// List<TriggerRouter> triggerRouters = getTriggerRoutersForNodeGroup(triggerRoutersByNodeGroup, node.getNodeGroupId());
// engine.getDataService().insertReloadEvents(node, true, null, processInfo, activeHistories, triggerRouters, null);
// queuedLoad = true;
// }
//
// if (!queuedLoad) {
// log.info("{} was enabled but no nodes were linked to load", ParameterConstants.AUTO_RELOAD_REVERSE_ENABLED);
// }
// }

protected boolean isValidLoadTarget(String targetNodeId) {
boolean result = false;
Expand Down

0 comments on commit 21ce112

Please sign in to comment.