Skip to content

Commit

Permalink
0001689: While routing config changes sync triggers by id that haven'…
Browse files Browse the repository at this point in the history
…t already been sync'd versus calling syncTriggers()

0001607: Saving trigger link on GUI causes race with sync triggers
  • Loading branch information
chenson42 committed Apr 24, 2014
1 parent 84a5463 commit ad7524c
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 65 deletions.
Expand Up @@ -21,6 +21,7 @@
package org.jumpmind.symmetric.route;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -40,8 +41,11 @@
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.TableReloadRequest;
import org.jumpmind.symmetric.model.TableReloadRequestKey;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.ITriggerRouterService;

public class ConfigurationChangedDataRouter extends AbstractDataRouter implements IDataRouter {

Expand Down Expand Up @@ -82,14 +86,17 @@ public ConfigurationChangedDataRouter(ISymmetricEngine engine) {
this.engine = engine;
}

@SuppressWarnings("unchecked")
public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData dataMetaData,
Set<Node> possibleTargetNodes, boolean initialLoad, boolean initialLoadSelectUsed, TriggerRouter triggerRouter) {
Set<Node> possibleTargetNodes, boolean initialLoad, boolean initialLoadSelectUsed,
TriggerRouter triggerRouter) {

// the list of nodeIds that we will return
Set<String> nodeIds = null;

// the inbound data
Map<String, String> columnValues = getDataMap(dataMetaData, engine != null ? engine.getSymmetricDialect() : null);
Map<String, String> columnValues = getDataMap(dataMetaData,
engine != null ? engine.getSymmetricDialect() : null);

Node me = findIdentity();

Expand All @@ -105,10 +112,11 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
String nodeIdInQuestion = columnValues.get("NODE_ID");
List<NodeGroupLink> nodeGroupLinks = getNodeGroupLinksFromContext(routingContext);
for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) {
if (!Constants.DEPLOYMENT_TYPE_REST.equals(nodeThatMayBeRoutedTo.getDeploymentType()) &&
!nodeThatMayBeRoutedTo.requires13Compatiblity() &&
isLinked(nodeIdInQuestion, nodeThatMayBeRoutedTo, rootNetworkedNode, me,
nodeGroupLinks)
if (!Constants.DEPLOYMENT_TYPE_REST.equals(nodeThatMayBeRoutedTo
.getDeploymentType())
&& !nodeThatMayBeRoutedTo.requires13Compatiblity()
&& isLinked(nodeIdInQuestion, nodeThatMayBeRoutedTo, rootNetworkedNode,
me, nodeGroupLinks)
&& !isSameNumberOfLinksAwayFromRoot(nodeThatMayBeRoutedTo,
rootNetworkedNode, me)
|| (nodeThatMayBeRoutedTo.getNodeId().equals(me.getNodeId()) && initialLoad)) {
Expand All @@ -124,9 +132,10 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
/*
* Don't route node security to it's own node. That node
* will get node security via registration and it will be
* updated by initial load. Otherwise, updates can be
* updated by initial load. Otherwise, updates can be
* unpredictable in the order they will be applied at the
* node because updates are on a different channel than reloads
* node because updates are on a different channel than
* reloads
*/
if (tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY)) {
if (nodeIds.contains(nodeIdInQuestion)) {
Expand All @@ -137,9 +146,11 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
ParameterConstants.INTITAL_LOAD_REVERSE_FIRST)
|| "0".equals(columnValues.get("INITIAL_LOAD_ENABLED"));
/*
* Only send the update if the client is going to be expected
* to queue up a reverse load. The trigger to do this is the arrival
* of sym_node_security with REV_INITIAL_LOAD_ENABLED set to 1.
* Only send the update if the client is
* going to be expected to queue up a
* reverse load. The trigger to do this is
* the arrival of sym_node_security with
* REV_INITIAL_LOAD_ENABLED set to 1.
*/
if (reverseLoadQueued) {
remove = false;
Expand All @@ -159,14 +170,14 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData

/*
* Don't route insert events for a node to itself. they will
* be loaded during registration. If we route them, then an old
* state can override the correct state
*
* Don't send deletes to a node. A node should be responsible for deleting
* itself.
* be loaded during registration. If we route them, then an
* old state can override the correct state
*
* Don't send deletes to a node. A node should be
* responsible for deleting itself.
*/
if (dataMetaData.getData().getDataEventType() == DataEventType.INSERT ||
dataMetaData.getData().getDataEventType() == DataEventType.DELETE) {
if (dataMetaData.getData().getDataEventType() == DataEventType.INSERT
|| dataMetaData.getData().getDataEventType() == DataEventType.DELETE) {
nodeIds.remove(nodeIdInQuestion);
}
}
Expand All @@ -175,7 +186,6 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
String reloadEnabled = columnValues.get("RELOAD_ENABLED");
if (me.getNodeId().equals(sourceNodeId)) {
if ("1".equals(reloadEnabled)) {
@SuppressWarnings("unchecked")
List<TableReloadRequestKey> list = (List<TableReloadRequestKey>) routingContext
.get(CTX_KEY_TABLE_RELOAD_NEEDED);
if (list == null) {
Expand All @@ -192,9 +202,10 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
}
} else {
for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) {
if (!Constants.DEPLOYMENT_TYPE_REST.equals(nodeThatMayBeRoutedTo.getDeploymentType()) &&
!nodeThatMayBeRoutedTo.requires13Compatiblity() &&
nodeThatMayBeRoutedTo.getNodeId().equals(sourceNodeId)) {
if (!Constants.DEPLOYMENT_TYPE_REST.equals(nodeThatMayBeRoutedTo
.getDeploymentType())
&& !nodeThatMayBeRoutedTo.requires13Compatiblity()
&& nodeThatMayBeRoutedTo.getNodeId().equals(sourceNodeId)) {
if (nodeIds == null) {
nodeIds = new HashSet<String>();
}
Expand Down Expand Up @@ -223,61 +234,107 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
}
}

if (StringUtils.isBlank(dataMetaData.getData().getSourceNodeId())
&& (tableMatches(dataMetaData, TableConstants.SYM_TRIGGER)
|| tableMatches(dataMetaData, TableConstants.SYM_TRIGGER_ROUTER)
|| tableMatches(dataMetaData, TableConstants.SYM_ROUTER) || tableMatches(
dataMetaData, TableConstants.SYM_NODE_GROUP_LINK))) {
routingContext.put(CTX_KEY_RESYNC_NEEDED, Boolean.TRUE);
if (StringUtils.isBlank(dataMetaData.getData().getSourceNodeId())) {
queueSyncTriggers(routingContext, dataMetaData, columnValues);
}

if (tableMatches(dataMetaData, TableConstants.SYM_CHANNEL)) {
routingContext.put(CTX_KEY_FLUSH_CHANNELS_NEEDED,
Boolean.TRUE);
routingContext.put(CTX_KEY_FLUSH_CHANNELS_NEEDED, Boolean.TRUE);
}

if (tableMatches(dataMetaData, TableConstants.SYM_CONFLICT)) {
routingContext.put(CTX_KEY_FLUSH_CONFLICTS_NEEDED,
Boolean.TRUE);
routingContext.put(CTX_KEY_FLUSH_CONFLICTS_NEEDED, Boolean.TRUE);
}

if (tableMatches(dataMetaData, TableConstants.SYM_LOAD_FILTER)) {
routingContext.put(CTX_KEY_FLUSH_LOADFILTERS_NEEDED,
Boolean.TRUE);
routingContext.put(CTX_KEY_FLUSH_LOADFILTERS_NEEDED, Boolean.TRUE);
}

if (tableMatches(dataMetaData, TableConstants.SYM_PARAMETER)) {
routingContext.put(CTX_KEY_FLUSH_PARAMETERS_NEEDED,
Boolean.TRUE);

if (StringUtils.isBlank(dataMetaData.getData().getSourceNodeId()) &&
(dataMetaData.getData().getRowData() != null
&& dataMetaData.getData().getRowData().contains("job."))) {
routingContext.put(CTX_KEY_RESTART_JOBMANAGER_NEEDED,
Boolean.TRUE);
routingContext.put(CTX_KEY_FLUSH_PARAMETERS_NEEDED, Boolean.TRUE);

if (StringUtils.isBlank(dataMetaData.getData().getSourceNodeId())
&& (dataMetaData.getData().getRowData() != null && dataMetaData
.getData().getRowData().contains("job."))) {
routingContext.put(CTX_KEY_RESTART_JOBMANAGER_NEEDED, Boolean.TRUE);
}

}

if (tableMatches(dataMetaData, TableConstants.SYM_TRANSFORM_COLUMN)
|| tableMatches(dataMetaData, TableConstants.SYM_TRANSFORM_TABLE)) {
routingContext.put(CTX_KEY_FLUSH_TRANSFORMS_NEEDED,
Boolean.TRUE);
routingContext.put(CTX_KEY_FLUSH_TRANSFORMS_NEEDED, Boolean.TRUE);
}
}
}

return nodeIds;
}

@SuppressWarnings("unchecked")
protected void queueSyncTriggers(SimpleRouterContext routingContext, DataMetaData dataMetaData,
Map<String, String> columnValues) {
if ((tableMatches(dataMetaData, TableConstants.SYM_TRIGGER) || tableMatches(dataMetaData,
TableConstants.SYM_TRIGGER_ROUTER))) {
Object needResync = routingContext.get(CTX_KEY_RESYNC_NEEDED);
if (needResync == null || needResync instanceof Set) {
if (needResync == null) {
needResync = new HashSet<Trigger>();
routingContext.put(CTX_KEY_RESYNC_NEEDED, needResync);
}

ITriggerRouterService triggerRouterService = engine.getTriggerRouterService();
Trigger trigger = null;
Date lastUpdateTime = null;
String triggerId = columnValues.get("TRIGGER_ID");
if (tableMatches(dataMetaData, TableConstants.SYM_TRIGGER_ROUTER)) {
String routerId = columnValues.get("ROUTER_ID");
TriggerRouter tr = triggerRouterService.findTriggerRouterById(triggerId,
routerId);
if (tr != null) {
trigger = tr.getTrigger();
lastUpdateTime = tr.getLastUpdateTime();
}
} else {
trigger = triggerRouterService.getTriggerById(triggerId);
if (trigger != null) {
lastUpdateTime = trigger.getLastUpdateTime();
}
}
if (trigger != null) {
List<TriggerHistory> histories = triggerRouterService
.getActiveTriggerHistories(trigger);
boolean sync = false;
if (histories != null && histories.size() > 0) {
for (TriggerHistory triggerHistory : histories) {
if (triggerHistory.getCreateTime().before(lastUpdateTime)) {
sync = true;
}
}
} else {
sync = true;
}

if (sync) {
((Set<Trigger>) needResync).add(trigger);
}
}
}
} else if (tableMatches(dataMetaData, TableConstants.SYM_ROUTER)
|| tableMatches(dataMetaData, TableConstants.SYM_NODE_GROUP_LINK)) {
routingContext.put(CTX_KEY_RESYNC_NEEDED, Boolean.TRUE);
}

}

protected Node findIdentity() {
return engine.getNodeService().findIdentity();
}

@SuppressWarnings("unchecked")
protected List<NodeGroupLink> getNodeGroupLinksFromContext(SimpleRouterContext routingContext) {
List<NodeGroupLink> list = (List<NodeGroupLink>) routingContext.get(
NodeGroupLink.class.getName());
List<NodeGroupLink> list = (List<NodeGroupLink>) routingContext.get(NodeGroupLink.class
.getName());
if (list == null) {
list = engine.getConfigurationService().getNodeGroupLinks(false);
routingContext.put(NodeGroupLink.class.getName(), list);
Expand All @@ -286,8 +343,7 @@ protected List<NodeGroupLink> getNodeGroupLinksFromContext(SimpleRouterContext r
}

protected NetworkedNode getRootNetworkNodeFromContext(SimpleRouterContext routingContext) {
NetworkedNode root = (NetworkedNode) routingContext.get(
NetworkedNode.class.getName());
NetworkedNode root = (NetworkedNode) routingContext.get(NetworkedNode.class.getName());
if (root == null) {
root = engine.getNodeService().getRootNetworkedNode();
routingContext.put(NetworkedNode.class.getName(), root);
Expand All @@ -297,8 +353,10 @@ protected NetworkedNode getRootNetworkNodeFromContext(SimpleRouterContext routin

private boolean isSameNumberOfLinksAwayFromRoot(Node nodeThatCouldBeRoutedTo,
NetworkedNode root, Node me) {
return me != null && root != null && root.getNumberOfLinksAwayFromRoot(nodeThatCouldBeRoutedTo.getNodeId()) == root
.getNumberOfLinksAwayFromRoot(me.getNodeId());
return me != null
&& root != null
&& root.getNumberOfLinksAwayFromRoot(nodeThatCouldBeRoutedTo.getNodeId()) == root
.getNumberOfLinksAwayFromRoot(me.getNodeId());
}

private boolean isLinked(String nodeIdInQuestion, Node nodeThatCouldBeRoutedTo,
Expand Down Expand Up @@ -378,11 +436,24 @@ public void contextCommitted(SimpleRouterContext routingContext) {
engine.getConfigurationService().clearCache();
}

if (routingContext.get(CTX_KEY_RESYNC_NEEDED) != null
Object needsSynced = routingContext.get(CTX_KEY_RESYNC_NEEDED);
if (needsSynced != null
&& engine.getParameterService().is(ParameterConstants.AUTO_SYNC_TRIGGERS)
&& engine.getParameterService().is(ParameterConstants.AUTO_SYNC_TRIGGERS_AFTER_CONFIG_CHANGED) ) {
log.info("About to syncTriggers because new configuration came through the data router");
engine.getTriggerRouterService().syncTriggers();
&& engine.getParameterService().is(
ParameterConstants.AUTO_SYNC_TRIGGERS_AFTER_CONFIG_CHANGED)) {
if (Boolean.TRUE.equals(needsSynced)) {
log.info("About to syncTriggers because new configuration came through the data router");
engine.getTriggerRouterService().syncTriggers();
} else if (needsSynced instanceof Set) {
@SuppressWarnings("unchecked")
Set<Trigger> triggers = (Set<Trigger>) needsSynced;
for (Trigger trigger : triggers) {
log.info("About to sync the "
+ trigger.getTriggerId()
+ " trigger because a change was detected by the config data router");
engine.getTriggerRouterService().syncTrigger(trigger, null, false);
}
}
}

if (routingContext.get(CTX_KEY_FLUSH_TRANSFORMS_NEEDED) != null) {
Expand Down Expand Up @@ -419,9 +490,10 @@ protected void insertReloadEvents(SimpleRouterContext routingContext) {
.get(CTX_KEY_TABLE_RELOAD_NEEDED);
if (reloadRequestKeys != null) {
for (TableReloadRequestKey reloadRequestKey : reloadRequestKeys) {
TableReloadRequest request = engine.getDataService()
.getTableReloadRequest(reloadRequestKey);
if (engine.getDataService().insertReloadEvent(request, reloadRequestKey.getReceivedFromNodeId() != null)) {
TableReloadRequest request = engine.getDataService().getTableReloadRequest(
reloadRequestKey);
if (engine.getDataService().insertReloadEvent(request,
reloadRequestKey.getReceivedFromNodeId() != null)) {
log.info(
"Inserted table reload request from config data router for node {} and trigger {}",
reloadRequestKey.getTargetNodeId(), reloadRequestKey.getTriggerId());
Expand All @@ -432,16 +504,13 @@ protected void insertReloadEvents(SimpleRouterContext routingContext) {
}

private String tableName(String tableName) {
return TableConstants.getTableName(engine != null ? engine.getTablePrefix()
: "sym", tableName);
return TableConstants.getTableName(engine != null ? engine.getTablePrefix() : "sym",
tableName);
}

private boolean tableMatches(DataMetaData dataMetaData, String tableName) {
boolean matches = false;
if (dataMetaData
.getTable()
.getName()
.equalsIgnoreCase(tableName(tableName))) {
if (dataMetaData.getTable().getName().equalsIgnoreCase(tableName(tableName))) {
matches = true;
}
return matches;
Expand Down
Expand Up @@ -39,7 +39,9 @@ public interface ITriggerRouterService {

public boolean refreshFromDatabase();

public List<TriggerHistory> getActiveTriggerHistories();
public List<TriggerHistory> getActiveTriggerHistories();

public List<TriggerHistory> getActiveTriggerHistories(Trigger trigger);

public List<TriggerHistory> getActiveTriggerHistories(String tableName);

Expand Down

0 comments on commit ad7524c

Please sign in to comment.