Skip to content

Commit

Permalink
More testing of configuration routing.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Oct 31, 2011
1 parent f37a8fe commit 6234754
Show file tree
Hide file tree
Showing 22 changed files with 154 additions and 90 deletions.
Expand Up @@ -29,7 +29,7 @@
import org.springframework.jdbc.core.JdbcTemplate;

/*
*
* Synchronization support for the H2 database platform.
*/
public class H2DbDialect extends AbstractEmbeddedDbDialect implements IDbDialect {

Expand Down
Expand Up @@ -34,7 +34,7 @@
*/
public class DefaultOfflineClientListener implements IOfflineClientListener, IBuiltInExtensionPoint {

protected final ILog log = LogFactory.getLog(getClass());
protected ILog log = LogFactory.getLog(getClass());
protected IParameterService parameterService;
protected INodeService nodeService;

Expand Down Expand Up @@ -63,7 +63,6 @@ public void syncDisabled(Node remoteNode) {

public void registrationRequired(Node remoteNode) {
log.warn("RegistrationRequired");
nodeService.deleteIdentity();
}

public boolean isAutoRegister() {
Expand All @@ -75,6 +74,7 @@ public void setNodeService(NodeService nodeService) {
}

public void setParameterService(IParameterService parameterService) {
this.parameterService = parameterService;
this.parameterService = parameterService;
this.log = LogFactory.getLog(parameterService);
}
}
Expand Down
Expand Up @@ -101,7 +101,8 @@ private void recordTransformFlushNeeded(IDataLoaderContext context) {
private boolean isSyncTriggersNeeded(IDataLoaderContext context) {
return matchesTable(context, TableConstants.SYM_TRIGGER)
|| matchesTable(context, TableConstants.SYM_ROUTER)
|| matchesTable(context, TableConstants.SYM_TRIGGER_ROUTER);
|| matchesTable(context, TableConstants.SYM_TRIGGER_ROUTER)
|| matchesTable(context, TableConstants.SYM_NODE_GROUP_LINK);
}

private boolean isChannelFlushNeeded(IDataLoaderContext context) {
Expand Down
Expand Up @@ -52,6 +52,34 @@ public NetworkedNode getParent() {
return parent;
}

public int getNumberOfLinksAwayFromRoot(String nodeId) {
int numberOfLinksAwayFromRoot = getRoot().getNumberOfLinksAwayFromMe(nodeId, 0);
if (numberOfLinksAwayFromRoot == 0 && !node.getNodeId().equals(nodeId)) {
return -1;
} else {
return numberOfLinksAwayFromRoot;
}
}

protected int getNumberOfLinksAwayFromMe(String nodeId, int numberOfLinksIAmFromRoot) {
if (!node.getNodeId().equals(nodeId)) {
if (children != null) {
for (NetworkedNode child : children) {
if (child.getNode().getNodeId().equals(nodeId)) {
return numberOfLinksIAmFromRoot + 1;
} else {
int numberOfLinksAwayFromMe = child.getNumberOfLinksAwayFromMe(nodeId,
numberOfLinksIAmFromRoot + 1);
if (numberOfLinksAwayFromMe > numberOfLinksIAmFromRoot) {
return numberOfLinksAwayFromMe;
}
}
}
}
}
return numberOfLinksIAmFromRoot;
}

public NetworkedNode findNetworkedNode(String nodeId) {
if (this.node.getNodeId().equals(nodeId)) {
return this;
Expand Down Expand Up @@ -116,20 +144,23 @@ public boolean isInChildHierarchy(String nodeId) {

public void addParents(Map<String, Node> nodes, Map<String, NetworkedNode> leaves) {
String parentNodeId = node.getCreatedAtNodeId();
NetworkedNode parentNetworkedNode = leaves.get(parentNodeId);
if (parentNetworkedNode == null) {
Node parentNode = nodes.get(parentNodeId);
if (parentNode != null) {
parentNetworkedNode = new NetworkedNode(parentNode);
parentNetworkedNode.addParents(nodes, leaves);
leaves.put(parentNodeId, parentNetworkedNode);
if (parentNodeId != null && !parentNodeId.equals(node.getNodeId())) {
NetworkedNode parentNetworkedNode = leaves.get(parentNodeId);
if (parentNetworkedNode == null) {
Node parentNode = nodes.get(parentNodeId);
if (parentNode != null) {
parentNetworkedNode = new NetworkedNode(parentNode);
parentNetworkedNode.addParents(nodes, leaves);
leaves.put(parentNodeId, parentNetworkedNode);
}
}
}

if (parentNetworkedNode != null) {
parentNetworkedNode.addChild(this);
if (parentNetworkedNode != null) {
parentNetworkedNode.addChild(this);
}
this.parent = parentNetworkedNode;
}
this.parent = parentNetworkedNode;

}

public NetworkedNode getRoot() {
Expand Down
Expand Up @@ -47,7 +47,7 @@ public class BshDataRouter extends AbstractDataRouter {

final String INTERPRETER_KEY = String.format("%d.BshInterpreter", hashCode());

public Collection<String> routeToNodes(IRouterContext context, DataMetaData dataMetaData, Set<Node> nodes,
public Set<String> routeToNodes(IRouterContext context, DataMetaData dataMetaData, Set<Node> nodes,
boolean initialLoad) {
try {
long ts = System.currentTimeMillis();
Expand Down Expand Up @@ -76,11 +76,11 @@ protected Interpreter getInterpreter(IRouterContext context) {
return interpreter;
}

protected Collection<String> eval(Object value, Set<Node> nodes, Set<String> targetNodes) {
protected Set<String> eval(Object value, Set<Node> nodes, Set<String> targetNodes) {
if (targetNodes.size() > 0) {
return targetNodes;
} else if (value instanceof Collection<?>) {
Collection<?> values = (Collection<?>) value;
} else if (value instanceof Set<?>) {
Set<?> values = (Set<?>) value;
Set<String> nodeIds = new HashSet<String>(values.size());
for (Object v : values) {
if (v != null) {
Expand Down
Expand Up @@ -21,7 +21,6 @@
package org.jumpmind.symmetric.route;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -80,7 +79,7 @@ public class ColumnMatchDataRouter extends AbstractDataRouter implements IDataRo
final static String EXPRESSION_KEY = String.format("%s.Expression.", ColumnMatchDataRouter.class
.getName());

public Collection<String> routeToNodes(IRouterContext routingContext,
public Set<String> routeToNodes(IRouterContext routingContext,
DataMetaData dataMetaData, Set<Node> nodes, boolean initialLoad) {
Set<String> nodeIds = null;
List<Expression> expressions = getExpressions(dataMetaData.getTriggerRouter().getRouter(), routingContext);
Expand Down
Expand Up @@ -20,7 +20,6 @@
*/
package org.jumpmind.symmetric.route;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -38,16 +37,16 @@
import org.jumpmind.symmetric.service.ITriggerRouterService;
import org.jumpmind.symmetric.transform.ITransformService;

public class ConfigurationChangedRouter extends AbstractDataRouter implements IDataRouter {
public class ConfigurationChangedDataRouter extends AbstractDataRouter implements IDataRouter {

final String CTX_KEY_RESYNC_NEEDED = "Resync."
+ ConfigurationChangedRouter.class.getSimpleName() + hashCode();
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

final String CTX_KEY_FLUSH_CHANNELS_NEEDED = "FlushChannels."
+ ConfigurationChangedRouter.class.getSimpleName() + hashCode();
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

final String CTX_KEY_FLUSH_TRANSFORMS_NEEDED = "FlushTransforms."
+ ConfigurationChangedRouter.class.getSimpleName() + hashCode();
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

public final static String KEY = "symconfig";

Expand All @@ -60,10 +59,10 @@ public class ConfigurationChangedRouter extends AbstractDataRouter implements ID
protected ITriggerRouterService triggerRouterService;

protected IParameterService parameterService;

protected ITransformService transformService;

public Collection<String> routeToNodes(IRouterContext routingContext,
public Set<String> routeToNodes(IRouterContext routingContext,
DataMetaData dataMetaData, Set<Node> possibleTargetNodes, boolean initialLoad) {

// the list of nodeIds that we will return
Expand All @@ -72,30 +71,41 @@ public Collection<String> routeToNodes(IRouterContext routingContext,
// the inbound data
Map<String, String> columnValues = getDataMap(dataMetaData);

Node me = findIdentity();
NetworkedNode rootNetworkedNode = getRootNetworkNodeFromContext(routingContext);

// if this is sym_node or sym_node_security determine which nodes it
// goes to.
if (tableMatches(dataMetaData, TableConstants.SYM_NODE)
|| tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY)) {

String nodeIdInQuestion = columnValues.get("NODE_ID");
Node me = findIdentity();
NetworkedNode rootNetworkedNode = getRootNetworkNodeFromContext(routingContext);
List<NodeGroupLink> nodeGroupLinks = getNodeGroupLinksFromContext(routingContext);
for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) {
if (isLinked(nodeIdInQuestion, nodeThatMayBeRoutedTo, rootNetworkedNode, me,
nodeGroupLinks)) {
nodeGroupLinks)
&& !isSameNumberOfLinksAwayFromRoot(nodeThatMayBeRoutedTo,
rootNetworkedNode, me)) {
if (nodeIds == null) {
nodeIds = new HashSet<String>();
}
nodeIds.add(nodeThatMayBeRoutedTo.getNodeId());
}
}
} else {
nodeIds = toNodeIds(possibleTargetNodes, nodeIds);
for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) {
if (!isSameNumberOfLinksAwayFromRoot(nodeThatMayBeRoutedTo, rootNetworkedNode, me)) {
if (nodeIds == null) {
nodeIds = new HashSet<String>();
}
nodeIds.add(nodeThatMayBeRoutedTo.getNodeId());
}
}

if (tableMatches(dataMetaData, TableConstants.SYM_TRIGGER)
|| tableMatches(dataMetaData, TableConstants.SYM_TRIGGER_ROUTER)
|| tableMatches(dataMetaData, TableConstants.SYM_ROUTER)) {
|| tableMatches(dataMetaData, TableConstants.SYM_ROUTER)
|| tableMatches(dataMetaData, TableConstants.SYM_NODE_GROUP_LINK)) {
routingContext.getContextCache().put(CTX_KEY_RESYNC_NEEDED, Boolean.TRUE);
}

Expand Down Expand Up @@ -137,6 +147,12 @@ protected NetworkedNode getRootNetworkNodeFromContext(IRouterContext routingCont
return root;
}

private boolean isSameNumberOfLinksAwayFromRoot(Node nodeThatCouldBeRoutedTo,
NetworkedNode root, Node me) {
return root.getNumberOfLinksAwayFromRoot(nodeThatCouldBeRoutedTo.getNodeId()) == root
.getNumberOfLinksAwayFromRoot(me.getNodeId());
}

private boolean isLinked(String nodeIdInQuestion, Node nodeThatCouldBeRoutedTo,
NetworkedNode root, Node me, List<NodeGroupLink> allLinks) {
if (!nodeIdInQuestion.equals(nodeThatCouldBeRoutedTo.getNodeId())) {
Expand All @@ -149,8 +165,11 @@ private boolean isLinked(String nodeIdInQuestion, Node nodeThatCouldBeRoutedTo,
// always route changes to parent nodes
return true;
}


String createdAtNodeId = networkedNodeInQuestion.getNode().getCreatedAtNodeId();
if (createdAtNodeId != null && !createdAtNodeId.equals(me.getNodeId())) {
if (createdAtNodeId != null && !createdAtNodeId.equals(me.getNodeId()) &&
!networkedNodeInQuestion.getNode().getNodeId().equals(me.getNodeId())) {
if (createdAtNodeId.equals(nodeThatCouldBeRoutedTo.getNodeId())) {
return true;
} else {
Expand Down Expand Up @@ -223,7 +242,7 @@ public void setTriggerRouterService(ITriggerRouterService triggerRouterService)
public void setParameterService(IParameterService parameterService) {
this.parameterService = parameterService;
}

public void setTransformService(ITransformService transformService) {
this.transformService = transformService;
}
Expand Down
Expand Up @@ -20,7 +20,6 @@

package org.jumpmind.symmetric.route;

import java.util.Collection;
import java.util.Set;

import org.jumpmind.symmetric.model.DataMetaData;
Expand All @@ -32,7 +31,7 @@
*/
public class DefaultDataRouter extends AbstractDataRouter {

public Collection<String> routeToNodes(IRouterContext routingContext, DataMetaData dataMetaData, Set<Node> nodes,
public Set<String> routeToNodes(IRouterContext routingContext, DataMetaData dataMetaData, Set<Node> nodes,
boolean initialLoad) {
return toNodeIds(nodes, null);
}
Expand Down
Expand Up @@ -20,7 +20,6 @@

package org.jumpmind.symmetric.route;

import java.util.Collection;
import java.util.Set;

import org.jumpmind.symmetric.ext.IExtensionPoint;
Expand All @@ -45,7 +44,7 @@
*/
public interface IDataRouter extends IExtensionPoint {

public Collection<String> routeToNodes(IRouterContext context, DataMetaData dataMetaData, Set<Node> nodes, boolean initialLoad);
public Set<String> routeToNodes(IRouterContext context, DataMetaData dataMetaData, Set<Node> nodes, boolean initialLoad);

public void completeBatch(IRouterContext context, OutgoingBatch batch);

Expand Down
Expand Up @@ -22,7 +22,6 @@

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -59,7 +58,7 @@ public class LookupTableDataRouter extends AbstractDataRouter implements IDataRo

private JdbcTemplate jdbcTemplate;

public Collection<String> routeToNodes(IRouterContext routingContext,
public Set<String> routeToNodes(IRouterContext routingContext,
DataMetaData dataMetaData, Set<Node> nodes, boolean initialLoad) {

Set<String> nodeIds = null;
Expand Down
Expand Up @@ -20,7 +20,8 @@

package org.jumpmind.symmetric.route;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -62,17 +63,20 @@ public class SubSelectDataRouter extends AbstractDataRouter {

private IDbDialect dbDialect;

public Collection<String> routeToNodes(IRouterContext routingContext, DataMetaData dataMetaData, Set<Node> nodes,
public Set<String> routeToNodes(IRouterContext routingContext, DataMetaData dataMetaData, Set<Node> nodes,
boolean initialLoad) {
TriggerRouter trigger = dataMetaData.getTriggerRouter();
String subSelect = trigger.getRouter().getRouterExpression();
Collection<String> nodeIds = null;
Set<String> nodeIds = null;
if (!StringUtils.isBlank(subSelect)) {
SimpleJdbcTemplate simpleTemplate = new SimpleJdbcTemplate(jdbcTemplate);
Map<String, Object> sqlParams = getDataObjectMap(dataMetaData, dbDialect);
sqlParams.put("NODE_GROUP_ID", trigger.getRouter().getNodeGroupLink().getTargetNodeGroupId());
nodeIds = simpleTemplate.query(String.format("%s%s", sql, subSelect),
List<String> ids = simpleTemplate.query(String.format("%s%s", sql, subSelect),
new SingleColumnRowMapper<String>(), sqlParams);
if (ids != null) {
nodeIds = new HashSet<String>(ids);
}
} else {
nodeIds = toNodeIds(nodes, null);
}
Expand Down

0 comments on commit 6234754

Please sign in to comment.