From ad7524cb66496c15f24439dcd37a8e59500ef6a7 Mon Sep 17 00:00:00 2001 From: chenson42 Date: Thu, 24 Apr 2014 13:41:19 +0000 Subject: [PATCH] 0001689: While routing config changes sync triggers by id that haven't already been sync'd versus calling syncTriggers() 0001607: Saving trigger link on GUI causes race with sync triggers --- .../route/ConfigurationChangedDataRouter.java | 195 ++++++++++++------ .../service/ITriggerRouterService.java | 4 +- .../service/impl/TriggerRouterService.java | 2 +- 3 files changed, 136 insertions(+), 65 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java index 40f6d30a99..48ad882c59 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java @@ -21,6 +21,7 @@ package org.jumpmind.symmetric.route; import java.util.ArrayList; +import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -40,8 +41,11 @@ import org.jumpmind.symmetric.model.NodeGroupLink; import org.jumpmind.symmetric.model.TableReloadRequest; import org.jumpmind.symmetric.model.TableReloadRequestKey; +import org.jumpmind.symmetric.model.Trigger; +import org.jumpmind.symmetric.model.TriggerHistory; import org.jumpmind.symmetric.model.TriggerRouter; import org.jumpmind.symmetric.service.IConfigurationService; +import org.jumpmind.symmetric.service.ITriggerRouterService; public class ConfigurationChangedDataRouter extends AbstractDataRouter implements IDataRouter { @@ -82,14 +86,17 @@ public ConfigurationChangedDataRouter(ISymmetricEngine engine) { this.engine = engine; } + @SuppressWarnings("unchecked") public Set routeToNodes(SimpleRouterContext routingContext, DataMetaData dataMetaData, - Set possibleTargetNodes, boolean initialLoad, boolean initialLoadSelectUsed, TriggerRouter triggerRouter) { + Set possibleTargetNodes, boolean initialLoad, boolean initialLoadSelectUsed, + TriggerRouter triggerRouter) { // the list of nodeIds that we will return Set nodeIds = null; // the inbound data - Map columnValues = getDataMap(dataMetaData, engine != null ? engine.getSymmetricDialect() : null); + Map columnValues = getDataMap(dataMetaData, + engine != null ? engine.getSymmetricDialect() : null); Node me = findIdentity(); @@ -105,10 +112,11 @@ public Set routeToNodes(SimpleRouterContext routingContext, DataMetaData String nodeIdInQuestion = columnValues.get("NODE_ID"); List nodeGroupLinks = getNodeGroupLinksFromContext(routingContext); for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) { - if (!Constants.DEPLOYMENT_TYPE_REST.equals(nodeThatMayBeRoutedTo.getDeploymentType()) && - !nodeThatMayBeRoutedTo.requires13Compatiblity() && - isLinked(nodeIdInQuestion, nodeThatMayBeRoutedTo, rootNetworkedNode, me, - nodeGroupLinks) + if (!Constants.DEPLOYMENT_TYPE_REST.equals(nodeThatMayBeRoutedTo + .getDeploymentType()) + && !nodeThatMayBeRoutedTo.requires13Compatiblity() + && isLinked(nodeIdInQuestion, nodeThatMayBeRoutedTo, rootNetworkedNode, + me, nodeGroupLinks) && !isSameNumberOfLinksAwayFromRoot(nodeThatMayBeRoutedTo, rootNetworkedNode, me) || (nodeThatMayBeRoutedTo.getNodeId().equals(me.getNodeId()) && initialLoad)) { @@ -124,9 +132,10 @@ public Set routeToNodes(SimpleRouterContext routingContext, DataMetaData /* * Don't route node security to it's own node. That node * will get node security via registration and it will be - * updated by initial load. Otherwise, updates can be + * updated by initial load. Otherwise, updates can be * unpredictable in the order they will be applied at the - * node because updates are on a different channel than reloads + * node because updates are on a different channel than + * reloads */ if (tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY)) { if (nodeIds.contains(nodeIdInQuestion)) { @@ -137,9 +146,11 @@ public Set routeToNodes(SimpleRouterContext routingContext, DataMetaData ParameterConstants.INTITAL_LOAD_REVERSE_FIRST) || "0".equals(columnValues.get("INITIAL_LOAD_ENABLED")); /* - * Only send the update if the client is going to be expected - * to queue up a reverse load. The trigger to do this is the arrival - * of sym_node_security with REV_INITIAL_LOAD_ENABLED set to 1. + * Only send the update if the client is + * going to be expected to queue up a + * reverse load. The trigger to do this is + * the arrival of sym_node_security with + * REV_INITIAL_LOAD_ENABLED set to 1. */ if (reverseLoadQueued) { remove = false; @@ -159,14 +170,14 @@ public Set routeToNodes(SimpleRouterContext routingContext, DataMetaData /* * Don't route insert events for a node to itself. they will - * be loaded during registration. If we route them, then an old - * state can override the correct state - * - * Don't send deletes to a node. A node should be responsible for deleting - * itself. + * be loaded during registration. If we route them, then an + * old state can override the correct state + * + * Don't send deletes to a node. A node should be + * responsible for deleting itself. */ - if (dataMetaData.getData().getDataEventType() == DataEventType.INSERT || - dataMetaData.getData().getDataEventType() == DataEventType.DELETE) { + if (dataMetaData.getData().getDataEventType() == DataEventType.INSERT + || dataMetaData.getData().getDataEventType() == DataEventType.DELETE) { nodeIds.remove(nodeIdInQuestion); } } @@ -175,7 +186,6 @@ public Set routeToNodes(SimpleRouterContext routingContext, DataMetaData String reloadEnabled = columnValues.get("RELOAD_ENABLED"); if (me.getNodeId().equals(sourceNodeId)) { if ("1".equals(reloadEnabled)) { - @SuppressWarnings("unchecked") List list = (List) routingContext .get(CTX_KEY_TABLE_RELOAD_NEEDED); if (list == null) { @@ -192,9 +202,10 @@ public Set routeToNodes(SimpleRouterContext routingContext, DataMetaData } } else { for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) { - if (!Constants.DEPLOYMENT_TYPE_REST.equals(nodeThatMayBeRoutedTo.getDeploymentType()) && - !nodeThatMayBeRoutedTo.requires13Compatiblity() && - nodeThatMayBeRoutedTo.getNodeId().equals(sourceNodeId)) { + if (!Constants.DEPLOYMENT_TYPE_REST.equals(nodeThatMayBeRoutedTo + .getDeploymentType()) + && !nodeThatMayBeRoutedTo.requires13Compatiblity() + && nodeThatMayBeRoutedTo.getNodeId().equals(sourceNodeId)) { if (nodeIds == null) { nodeIds = new HashSet(); } @@ -223,46 +234,36 @@ public Set routeToNodes(SimpleRouterContext routingContext, DataMetaData } } - if (StringUtils.isBlank(dataMetaData.getData().getSourceNodeId()) - && (tableMatches(dataMetaData, TableConstants.SYM_TRIGGER) - || tableMatches(dataMetaData, TableConstants.SYM_TRIGGER_ROUTER) - || tableMatches(dataMetaData, TableConstants.SYM_ROUTER) || tableMatches( - dataMetaData, TableConstants.SYM_NODE_GROUP_LINK))) { - routingContext.put(CTX_KEY_RESYNC_NEEDED, Boolean.TRUE); + if (StringUtils.isBlank(dataMetaData.getData().getSourceNodeId())) { + queueSyncTriggers(routingContext, dataMetaData, columnValues); } if (tableMatches(dataMetaData, TableConstants.SYM_CHANNEL)) { - routingContext.put(CTX_KEY_FLUSH_CHANNELS_NEEDED, - Boolean.TRUE); + routingContext.put(CTX_KEY_FLUSH_CHANNELS_NEEDED, Boolean.TRUE); } if (tableMatches(dataMetaData, TableConstants.SYM_CONFLICT)) { - routingContext.put(CTX_KEY_FLUSH_CONFLICTS_NEEDED, - Boolean.TRUE); + routingContext.put(CTX_KEY_FLUSH_CONFLICTS_NEEDED, Boolean.TRUE); } if (tableMatches(dataMetaData, TableConstants.SYM_LOAD_FILTER)) { - routingContext.put(CTX_KEY_FLUSH_LOADFILTERS_NEEDED, - Boolean.TRUE); + routingContext.put(CTX_KEY_FLUSH_LOADFILTERS_NEEDED, Boolean.TRUE); } if (tableMatches(dataMetaData, TableConstants.SYM_PARAMETER)) { - routingContext.put(CTX_KEY_FLUSH_PARAMETERS_NEEDED, - Boolean.TRUE); - - if (StringUtils.isBlank(dataMetaData.getData().getSourceNodeId()) && - (dataMetaData.getData().getRowData() != null - && dataMetaData.getData().getRowData().contains("job."))) { - routingContext.put(CTX_KEY_RESTART_JOBMANAGER_NEEDED, - Boolean.TRUE); + routingContext.put(CTX_KEY_FLUSH_PARAMETERS_NEEDED, Boolean.TRUE); + + if (StringUtils.isBlank(dataMetaData.getData().getSourceNodeId()) + && (dataMetaData.getData().getRowData() != null && dataMetaData + .getData().getRowData().contains("job."))) { + routingContext.put(CTX_KEY_RESTART_JOBMANAGER_NEEDED, Boolean.TRUE); } } if (tableMatches(dataMetaData, TableConstants.SYM_TRANSFORM_COLUMN) || tableMatches(dataMetaData, TableConstants.SYM_TRANSFORM_TABLE)) { - routingContext.put(CTX_KEY_FLUSH_TRANSFORMS_NEEDED, - Boolean.TRUE); + routingContext.put(CTX_KEY_FLUSH_TRANSFORMS_NEEDED, Boolean.TRUE); } } } @@ -270,14 +271,70 @@ public Set routeToNodes(SimpleRouterContext routingContext, DataMetaData return nodeIds; } + @SuppressWarnings("unchecked") + protected void queueSyncTriggers(SimpleRouterContext routingContext, DataMetaData dataMetaData, + Map columnValues) { + if ((tableMatches(dataMetaData, TableConstants.SYM_TRIGGER) || tableMatches(dataMetaData, + TableConstants.SYM_TRIGGER_ROUTER))) { + Object needResync = routingContext.get(CTX_KEY_RESYNC_NEEDED); + if (needResync == null || needResync instanceof Set) { + if (needResync == null) { + needResync = new HashSet(); + routingContext.put(CTX_KEY_RESYNC_NEEDED, needResync); + } + + ITriggerRouterService triggerRouterService = engine.getTriggerRouterService(); + Trigger trigger = null; + Date lastUpdateTime = null; + String triggerId = columnValues.get("TRIGGER_ID"); + if (tableMatches(dataMetaData, TableConstants.SYM_TRIGGER_ROUTER)) { + String routerId = columnValues.get("ROUTER_ID"); + TriggerRouter tr = triggerRouterService.findTriggerRouterById(triggerId, + routerId); + if (tr != null) { + trigger = tr.getTrigger(); + lastUpdateTime = tr.getLastUpdateTime(); + } + } else { + trigger = triggerRouterService.getTriggerById(triggerId); + if (trigger != null) { + lastUpdateTime = trigger.getLastUpdateTime(); + } + } + if (trigger != null) { + List histories = triggerRouterService + .getActiveTriggerHistories(trigger); + boolean sync = false; + if (histories != null && histories.size() > 0) { + for (TriggerHistory triggerHistory : histories) { + if (triggerHistory.getCreateTime().before(lastUpdateTime)) { + sync = true; + } + } + } else { + sync = true; + } + + if (sync) { + ((Set) needResync).add(trigger); + } + } + } + } else if (tableMatches(dataMetaData, TableConstants.SYM_ROUTER) + || tableMatches(dataMetaData, TableConstants.SYM_NODE_GROUP_LINK)) { + routingContext.put(CTX_KEY_RESYNC_NEEDED, Boolean.TRUE); + } + + } + protected Node findIdentity() { return engine.getNodeService().findIdentity(); } @SuppressWarnings("unchecked") protected List getNodeGroupLinksFromContext(SimpleRouterContext routingContext) { - List list = (List) routingContext.get( - NodeGroupLink.class.getName()); + List list = (List) routingContext.get(NodeGroupLink.class + .getName()); if (list == null) { list = engine.getConfigurationService().getNodeGroupLinks(false); routingContext.put(NodeGroupLink.class.getName(), list); @@ -286,8 +343,7 @@ protected List getNodeGroupLinksFromContext(SimpleRouterContext r } protected NetworkedNode getRootNetworkNodeFromContext(SimpleRouterContext routingContext) { - NetworkedNode root = (NetworkedNode) routingContext.get( - NetworkedNode.class.getName()); + NetworkedNode root = (NetworkedNode) routingContext.get(NetworkedNode.class.getName()); if (root == null) { root = engine.getNodeService().getRootNetworkedNode(); routingContext.put(NetworkedNode.class.getName(), root); @@ -297,8 +353,10 @@ protected NetworkedNode getRootNetworkNodeFromContext(SimpleRouterContext routin private boolean isSameNumberOfLinksAwayFromRoot(Node nodeThatCouldBeRoutedTo, NetworkedNode root, Node me) { - return me != null && root != null && root.getNumberOfLinksAwayFromRoot(nodeThatCouldBeRoutedTo.getNodeId()) == root - .getNumberOfLinksAwayFromRoot(me.getNodeId()); + return me != null + && root != null + && root.getNumberOfLinksAwayFromRoot(nodeThatCouldBeRoutedTo.getNodeId()) == root + .getNumberOfLinksAwayFromRoot(me.getNodeId()); } private boolean isLinked(String nodeIdInQuestion, Node nodeThatCouldBeRoutedTo, @@ -378,11 +436,24 @@ public void contextCommitted(SimpleRouterContext routingContext) { engine.getConfigurationService().clearCache(); } - if (routingContext.get(CTX_KEY_RESYNC_NEEDED) != null + Object needsSynced = routingContext.get(CTX_KEY_RESYNC_NEEDED); + if (needsSynced != null && engine.getParameterService().is(ParameterConstants.AUTO_SYNC_TRIGGERS) - && engine.getParameterService().is(ParameterConstants.AUTO_SYNC_TRIGGERS_AFTER_CONFIG_CHANGED) ) { - log.info("About to syncTriggers because new configuration came through the data router"); - engine.getTriggerRouterService().syncTriggers(); + && engine.getParameterService().is( + ParameterConstants.AUTO_SYNC_TRIGGERS_AFTER_CONFIG_CHANGED)) { + if (Boolean.TRUE.equals(needsSynced)) { + log.info("About to syncTriggers because new configuration came through the data router"); + engine.getTriggerRouterService().syncTriggers(); + } else if (needsSynced instanceof Set) { + @SuppressWarnings("unchecked") + Set triggers = (Set) needsSynced; + for (Trigger trigger : triggers) { + log.info("About to sync the " + + trigger.getTriggerId() + + " trigger because a change was detected by the config data router"); + engine.getTriggerRouterService().syncTrigger(trigger, null, false); + } + } } if (routingContext.get(CTX_KEY_FLUSH_TRANSFORMS_NEEDED) != null) { @@ -419,9 +490,10 @@ protected void insertReloadEvents(SimpleRouterContext routingContext) { .get(CTX_KEY_TABLE_RELOAD_NEEDED); if (reloadRequestKeys != null) { for (TableReloadRequestKey reloadRequestKey : reloadRequestKeys) { - TableReloadRequest request = engine.getDataService() - .getTableReloadRequest(reloadRequestKey); - if (engine.getDataService().insertReloadEvent(request, reloadRequestKey.getReceivedFromNodeId() != null)) { + TableReloadRequest request = engine.getDataService().getTableReloadRequest( + reloadRequestKey); + if (engine.getDataService().insertReloadEvent(request, + reloadRequestKey.getReceivedFromNodeId() != null)) { log.info( "Inserted table reload request from config data router for node {} and trigger {}", reloadRequestKey.getTargetNodeId(), reloadRequestKey.getTriggerId()); @@ -432,16 +504,13 @@ protected void insertReloadEvents(SimpleRouterContext routingContext) { } private String tableName(String tableName) { - return TableConstants.getTableName(engine != null ? engine.getTablePrefix() - : "sym", tableName); + return TableConstants.getTableName(engine != null ? engine.getTablePrefix() : "sym", + tableName); } private boolean tableMatches(DataMetaData dataMetaData, String tableName) { boolean matches = false; - if (dataMetaData - .getTable() - .getName() - .equalsIgnoreCase(tableName(tableName))) { + if (dataMetaData.getTable().getName().equalsIgnoreCase(tableName(tableName))) { matches = true; } return matches; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java index 4b3be19bbe..d9eee8fabb 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ITriggerRouterService.java @@ -39,7 +39,9 @@ public interface ITriggerRouterService { public boolean refreshFromDatabase(); - public List getActiveTriggerHistories(); + public List getActiveTriggerHistories(); + + public List getActiveTriggerHistories(Trigger trigger); public List getActiveTriggerHistories(String tableName); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java index ed4c1a6ac0..8e795fdff9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java @@ -289,7 +289,7 @@ public TriggerHistory getTriggerHistory(int histId) { return history; } - protected List getActiveTriggerHistories(Trigger trigger) { + public List getActiveTriggerHistories(Trigger trigger) { List active = getActiveTriggerHistories(); List list = new ArrayList(); for (TriggerHistory triggerHistory : active) {