diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NetworkedNode.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NetworkedNode.java new file mode 100644 index 0000000000..aeea9259f4 --- /dev/null +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NetworkedNode.java @@ -0,0 +1,147 @@ +/* + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU Lesser General Public License (the + * "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.model; + +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +public class NetworkedNode implements Comparable { + + private Node node; + + private NetworkedNode parent; + + private Set children; + + public NetworkedNode(Node node) { + this.node = node; + } + + public void addChild(NetworkedNode node) { + if (children == null) { + children = new TreeSet(); + } + node.parent = this; + children.add(node); + } + + public Node getNode() { + return node; + } + + public NetworkedNode getParent() { + return parent; + } + + public NetworkedNode findNetworkedNode(String nodeId) { + if (this.node.getNodeId().equals(nodeId)) { + return this; + } else { + if (children != null) { + for (NetworkedNode child : children) { + if (child.getNode().getNodeId().equals(nodeId)) { + return child; + } else { + NetworkedNode foundIt = child.findNetworkedNode(nodeId); + if (foundIt != null) { + return foundIt; + } + } + } + } + } + return null; + } + + public boolean isInParentHierarchy(String nodeId) { + if (parent != null) { + if (parent.getNode().getNodeId().equals(nodeId)) { + return true; + } else { + return parent.isInParentHierarchy(nodeId); + } + } else { + return false; + } + } + + public boolean hasChildrenThatBelongToGroups(Set groupIds) { + if (children != null) { + for (NetworkedNode child : children) { + if (groupIds.contains(child.getNode().getNodeGroupId())) { + return true; + } else { + if (child.hasChildrenThatBelongToGroups(groupIds)) { + return true; + } + } + } + } + return false; + } + + public boolean isInChildHierarchy(String nodeId) { + if (children != null) { + for (NetworkedNode child : children) { + if (child.getNode().getNodeId().equals(nodeId)) { + return true; + } else { + if (child.isInChildHierarchy(nodeId)) { + return true; + } + } + } + } + return false; + } + + public void addParents(Map nodes, Map leaves) { + String parentNodeId = node.getCreatedAtNodeId(); + NetworkedNode parentNetworkedNode = leaves.get(parentNodeId); + if (parentNetworkedNode == null) { + Node parentNode = nodes.get(parentNodeId); + if (parentNode != null) { + parentNetworkedNode = new NetworkedNode(parentNode); + parentNetworkedNode.addParents(nodes, leaves); + leaves.put(parentNodeId, parentNetworkedNode); + } + } + + if (parentNetworkedNode != null) { + parentNetworkedNode.addChild(this); + } + this.parent = parentNetworkedNode; + } + + public NetworkedNode getRoot() { + if (parent != null) { + return parent.getRoot(); + } else { + return this; + } + } + + public int compareTo(NetworkedNode o) { + return node.getNodeId().compareTo(o.getNode().getNodeId()); + } + +} diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedRouter.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedRouter.java new file mode 100644 index 0000000000..00d759536b --- /dev/null +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedRouter.java @@ -0,0 +1,210 @@ +/* + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU Lesser General Public License (the + * "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.route; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.jumpmind.symmetric.common.TableConstants; +import org.jumpmind.symmetric.model.DataMetaData; +import org.jumpmind.symmetric.model.NetworkedNode; +import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.NodeGroupLink; +import org.jumpmind.symmetric.model.OutgoingBatch; +import org.jumpmind.symmetric.service.IConfigurationService; +import org.jumpmind.symmetric.service.INodeService; + +public class ConfigurationChangedRouter extends AbstractDataRouter implements IDataRouter { + + public final static String KEY = "symconfig"; + + protected String tablePrefix; + + private IConfigurationService configurationService; + + private INodeService nodeService; + + public Collection routeToNodes(IRouterContext routingContext, + DataMetaData dataMetaData, Set possibleTargetNodes, boolean initialLoad) { + + // the list of nodeIds that we will return + Set nodeIds = null; + + // the inbound data + Map columnValues = getDataMap(dataMetaData); + + // if this is sym_node or sym_node_security determine which nodes it + // goes to. + if (tableMatches(dataMetaData, TableConstants.SYM_NODE) + || tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY)) { + + String nodeIdInQuestion = columnValues.get("NODE_ID"); + Node me = findIdentity(); + NetworkedNode rootNetworkedNode = getRootNetworkNodeFromContext(routingContext); + List nodeGroupLinks = getNodeGroupLinksFromContext(routingContext); + for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) { + if (isLinked(nodeIdInQuestion, nodeThatMayBeRoutedTo, rootNetworkedNode, me, + nodeGroupLinks)) { + if (nodeIds == null) { + nodeIds = new HashSet(); + } + nodeIds.add(nodeThatMayBeRoutedTo.getNodeId()); + } + } + } else { + nodeIds = toNodeIds(possibleTargetNodes, nodeIds); + } + + return nodeIds; + } + + protected Node findIdentity() { + return nodeService.findIdentity(); + } + + @SuppressWarnings("unchecked") + protected List getNodeGroupLinksFromContext(IRouterContext routingContext) { + List list = (List) routingContext.getContextCache().get( + NodeGroupLink.class.getName()); + if (list == null) { + list = configurationService.getNodeGroupLinks(); + routingContext.getContextCache().put(NodeGroupLink.class.getName(), list); + } + return list; + } + + protected NetworkedNode getRootNetworkNodeFromContext(IRouterContext routingContext) { + NetworkedNode root = (NetworkedNode) routingContext.getContextCache().get( + NetworkedNode.class.getName()); + if (root == null) { + root = nodeService.getRootNetworkedNode(); + routingContext.getContextCache().put(NetworkedNode.class.getName(), root); + } + return root; + } + + private boolean isLinked(String nodeIdInQuestion, Node nodeThatCouldBeRoutedTo, + NetworkedNode root, Node me, List allLinks) { + if (!nodeIdInQuestion.equals(nodeThatCouldBeRoutedTo.getNodeId())) { + NetworkedNode networkedNodeInQuestion = root.findNetworkedNode(nodeIdInQuestion); + NetworkedNode networkedNodeThatCouldBeRoutedTo = root + .findNetworkedNode(nodeThatCouldBeRoutedTo.getNodeId()); + if (networkedNodeInQuestion != null) { + if (networkedNodeInQuestion + .isInParentHierarchy(nodeThatCouldBeRoutedTo.getNodeId())) { + // always route changes to parent nodes + return true; + } + String createdAtNodeId = networkedNodeInQuestion.getNode().getCreatedAtNodeId(); + if (createdAtNodeId != null && !createdAtNodeId.equals(me.getNodeId())) { + if (createdAtNodeId.equals(nodeThatCouldBeRoutedTo.getNodeId())) { + return true; + } else { + // the node was created at some other node. lets attempt + // to get that update back to that node + return networkedNodeThatCouldBeRoutedTo.isInChildHierarchy(createdAtNodeId); + } + } + + // if we haven't found a place to route by now, then we need to + // send the row to all nodes that have links to the node's group + String groupId = networkedNodeInQuestion.getNode().getNodeGroupId(); + Set groupsThatWillBeInterested = new HashSet(); + for (NodeGroupLink nodeGroupLink : allLinks) { + if (nodeGroupLink.getTargetNodeGroupId().equals(groupId)) { + groupsThatWillBeInterested.add(nodeGroupLink.getSourceNodeGroupId()); + } else if (nodeGroupLink.getSourceNodeGroupId().equals(groupId)) { + groupsThatWillBeInterested.add(nodeGroupLink.getTargetNodeGroupId()); + } + } + + if (groupsThatWillBeInterested.contains(nodeThatCouldBeRoutedTo.getNodeGroupId())) { + return true; + } else { + return networkedNodeThatCouldBeRoutedTo + .hasChildrenThatBelongToGroups(groupsThatWillBeInterested); + } + } else { + return false; + } + } else { + return true; + } + } + + public void completeBatch(IRouterContext context, OutgoingBatch batch) { + // TODO resync triggers if sym_trigger, sym_trigger_router or sym_router + // has changed we could do a synch triggers call here when we know these are + // changing + } + + public void setTablePrefix(String tablePrefix) { + this.tablePrefix = tablePrefix; + } + + public void setConfigurationService(IConfigurationService configurationService) { + this.configurationService = configurationService; + } + + public void setNodeService(INodeService nodeService) { + this.nodeService = nodeService; + } + + private boolean tableMatches(DataMetaData dataMetaData, String tableName) { + boolean matches = false; + if (dataMetaData.getTable().getName() + .equalsIgnoreCase(TableConstants.getTableName(tablePrefix, tableName))) { + matches = true; + } + return matches; + } + + // TODO: This guy is going to replace the initial load selects for sym_node + // and sym_node_security found in + // triggerrouter-service.xml. The nodes variable has all eligible nodes that + // can be sync'd to. + // Go through them all and figure out if the sym_node or sym_node_security + // rows should be synced. If so, + // return the nodeid in the returned collection. - same criteira as the + // initial load sql should be implemented in code here + + // DONE - if the configuration table is something other than node or + // security, then return all node ids (configuration + // goes everywhere. + // + // STILL USED IN TRIGGER ROUTER SERVICE + // - this router is configured in symmetric-routers.xml. it will be used in + // TriggerRouterService.buildRegistrationTriggerRouter() + // we can get rid of rootConfigChannelInitialLoadSelect in + // triggerrouter-service.xml + + // TODO: side note: if the external id of a node exists in + // registration_redirect, then we should sync that node only + // to the registration_node_id. + + // TODO: another other side node: we should put some indicator into the + // context if sym_trigger, sym_trigger_router, or sym_router + // changes so we can run syncTriggers when the batch is completed. + +} diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationRouter.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationRouter.java deleted file mode 100644 index 9963e1c643..0000000000 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationRouter.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to JumpMind Inc under one or more contributor - * license agreements. See the NOTICE file distributed - * with this work for additional information regarding - * copyright ownership. JumpMind Inc licenses this file - * to you under the GNU Lesser General Public License (the - * "License"); you may not use this file except in compliance - * with the License. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, see - * . - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.jumpmind.symmetric.route; - -import java.util.Collection; -import java.util.Map; -import java.util.Set; - -import org.jumpmind.symmetric.common.TableConstants; -import org.jumpmind.symmetric.model.DataMetaData; -import org.jumpmind.symmetric.model.Node; -import org.jumpmind.symmetric.model.OutgoingBatch; -import org.jumpmind.symmetric.service.INodeService; - -public class ConfigurationRouter extends AbstractDataRouter implements IDataRouter { - - public final static String KEY="symconfig"; - protected String tablePrefix; - private INodeService nodeService; - - public Collection routeToNodes(IRouterContext routingContext, DataMetaData dataMetaData, - Set nodes, boolean initialLoad) { - - //the list of nodeIds that we will return - Set nodeIds = null; - - //the inbound data - Map columnValues = getDataMap(dataMetaData); - - //if this is sym_node or sym_node_security determine which nodes it goes to, - if (tableMatches(dataMetaData, TableConstants.SYM_NODE) || - tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY)) { - - //only route these rows to nodes that belong to node groups to which it is linked - for (Node node : nodes) { - String nodeId = columnValues.get("NODE_ID"); - if (isLinked(nodeId, node.getNodeGroupId())) { - nodeIds = addNodeId(node.getNodeId(), nodeIds, nodes); - } - } - } - else { - //otherwise goes to all nodes - for (Node node : nodes) { - nodeIds = addNodeId(node.getNodeId(), nodeIds, nodes); - } - } - - return nodeIds; - } - - public void completeBatch(IRouterContext context, OutgoingBatch batch) { - // TODO resync triggers if sym_trigger, sym_trigger_router or sym_router has changed - //we could do a synch triggers call here when we know these are changing - } - - public void contextCommitted(IRouterContext context) { - } - - public void setTablePrefix(String tablePrefix) { - this.tablePrefix = tablePrefix; - } - - public String getTablePrefix() { - return tablePrefix; - } - - private boolean tableMatches(DataMetaData dataMetaData, String tableName) { - boolean matches = false; - if (dataMetaData.getTable().getName().equalsIgnoreCase( - TableConstants.getTableName(tablePrefix, tableName))) { - matches = true; - } - return matches; - } - - private boolean isLinked(String nodeId, String nodeGroup) { - boolean isLinked = false; - - return isLinked; - } - - - // TODO: This guy is going to replace the initial load selects for sym_node and sym_node_security found in - // triggerrouter-service.xml. The nodes variable has all eligible nodes that can be sync'd to. - // Go through them all and figure out if the sym_node or sym_node_security rows should be synced. If so, - // return the nodeid in the returned collection. - same criteira as the initial load sql should be implemented in code here - - // DONE - if the configuration table is something other than node or security, then return all node ids (configuration - // goes everywhere. - // - // STILL USED IN TRIGGER ROUTER SERVICE - // - this router is configured in symmetric-routers.xml. it will be used in TriggerRouterService.buildRegistrationTriggerRouter() - // we can get rid of rootConfigChannelInitialLoadSelect in triggerrouter-service.xml - - // TODO: side note: if the external id of a node exists in registration_redirect, then we should sync that node only - // to the registration_node_id. - - // TODO: another other side node: we should put some indicator into the context if sym_trigger, sym_trigger_router, or sym_router - // changes so we can run syncTriggers when the batch is completed. - -} diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DefaultDataRouter.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DefaultDataRouter.java index 46df15f954..f4a004ab0c 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DefaultDataRouter.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DefaultDataRouter.java @@ -28,7 +28,7 @@ import org.jumpmind.symmetric.model.OutgoingBatch; /** - * This data router will route data to all nodes that belong to the target node group + * This data router will route data to all of the nodes that are passed to it. */ public class DefaultDataRouter extends AbstractDataRouter { diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/INodeService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/INodeService.java index e0844423fb..875ba96044 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/INodeService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/INodeService.java @@ -29,6 +29,7 @@ import org.jumpmind.symmetric.config.INodeIdGenerator; import org.jumpmind.symmetric.ext.IOfflineServerListener; import org.jumpmind.symmetric.io.IOfflineClientListener; +import org.jumpmind.symmetric.model.NetworkedNode; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.NodeGroupLinkAction; import org.jumpmind.symmetric.model.NodeHost; @@ -173,4 +174,6 @@ public void insertNode(String nodeId, String nodeGroupdId, String externalId, public void addOfflineServerListener(IOfflineServerListener listener); public boolean removeOfflineServerListener(IOfflineServerListener listener); + + public NetworkedNode getRootNetworkedNode(); } \ No newline at end of file diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java index 8510a17728..ff2d31f17b 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java @@ -19,37 +19,38 @@ * under the License. */ package org.jumpmind.symmetric.service.impl; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Types; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.lang.StringUtils; -import org.jumpmind.symmetric.common.ParameterConstants; -import org.jumpmind.symmetric.config.INodeIdGenerator; -import org.jumpmind.symmetric.ext.IOfflineServerListener; -import org.jumpmind.symmetric.model.Node; -import org.jumpmind.symmetric.model.NodeGroupLinkAction; -import org.jumpmind.symmetric.model.NodeHost; -import org.jumpmind.symmetric.model.NodeSecurity; -import org.jumpmind.symmetric.model.NodeStatus; -import org.jumpmind.symmetric.security.INodePasswordFilter; -import org.jumpmind.symmetric.service.INodeService; -import org.jumpmind.symmetric.util.AppUtils; -import org.springframework.dao.CannotAcquireLockException; -import org.springframework.dao.DataAccessException; -import org.springframework.dao.DataIntegrityViolationException; -import org.springframework.dao.EmptyResultDataAccessException; -import org.springframework.jdbc.core.ResultSetExtractor; -import org.springframework.jdbc.core.RowMapper; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.jumpmind.symmetric.common.ParameterConstants; +import org.jumpmind.symmetric.config.INodeIdGenerator; +import org.jumpmind.symmetric.ext.IOfflineServerListener; +import org.jumpmind.symmetric.model.NetworkedNode; +import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.NodeGroupLinkAction; +import org.jumpmind.symmetric.model.NodeHost; +import org.jumpmind.symmetric.model.NodeSecurity; +import org.jumpmind.symmetric.model.NodeStatus; +import org.jumpmind.symmetric.security.INodePasswordFilter; +import org.jumpmind.symmetric.service.INodeService; +import org.jumpmind.symmetric.util.AppUtils; +import org.springframework.dao.CannotAcquireLockException; +import org.springframework.dao.DataAccessException; +import org.springframework.dao.DataIntegrityViolationException; +import org.springframework.dao.EmptyResultDataAccessException; +import org.springframework.jdbc.core.ResultSetExtractor; +import org.springframework.jdbc.core.RowMapper; /** * @see INodeService @@ -342,6 +343,30 @@ public List findAllExternalIds() { public List findAllNodes() { return jdbcTemplate.query(getSql("selectNodePrefixSql"), new NodeRowMapper()); + } + + public Map findAllNodesAsMap() { + List nodes = findAllNodes(); + Map nodeMap = new HashMap(nodes.size()); + for (Node node : nodes) { + nodeMap.put(node.getNodeId(), node); + } + return nodeMap; + } + + public NetworkedNode getRootNetworkedNode() { + Map nodes = findAllNodesAsMap(); + Map leaves = new HashMap(nodes.size()); + NetworkedNode nodeLeaf = null; + for (Node node : nodes.values()) { + nodeLeaf = leaves.get(node.getNodeId()); + if (nodeLeaf == null) { + nodeLeaf = new NetworkedNode(node); + nodeLeaf.addParents(nodes, leaves); + leaves.put(node.getNodeId(), nodeLeaf); + } + } + return nodeLeaf.getRoot(); } public boolean updateNodeSecurity(NodeSecurity security) { diff --git a/symmetric/symmetric-core/src/main/resources/symmetric-routers.xml b/symmetric/symmetric-core/src/main/resources/symmetric-routers.xml index c5508592ae..bcf541015e 100644 --- a/symmetric/symmetric-core/src/main/resources/symmetric-routers.xml +++ b/symmetric/symmetric-core/src/main/resources/symmetric-routers.xml @@ -38,7 +38,9 @@ - + + + diff --git a/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/route/ColumnMatchDataRouterUnitTest.java b/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/route/ColumnMatchDataRouterTest.java similarity index 97% rename from symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/route/ColumnMatchDataRouterUnitTest.java rename to symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/route/ColumnMatchDataRouterTest.java index 504d3456d3..3e0abfcb4d 100644 --- a/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/route/ColumnMatchDataRouterUnitTest.java +++ b/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/route/ColumnMatchDataRouterTest.java @@ -7,7 +7,7 @@ import org.jumpmind.symmetric.route.ColumnMatchDataRouter.Expression; import org.junit.Test; -public class ColumnMatchDataRouterUnitTest { +public class ColumnMatchDataRouterTest { @Test public void testExpressionUsingLineFeedsParsing() { diff --git a/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouterTest.java b/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouterTest.java new file mode 100644 index 0000000000..36e489453c --- /dev/null +++ b/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouterTest.java @@ -0,0 +1,178 @@ +package org.jumpmind.symmetric.route; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import junit.framework.Assert; + +import org.jumpmind.symmetric.ddl.model.Table; +import org.jumpmind.symmetric.model.Data; +import org.jumpmind.symmetric.model.DataEventType; +import org.jumpmind.symmetric.model.DataMetaData; +import org.jumpmind.symmetric.model.NetworkedNode; +import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.NodeGroupLink; +import org.jumpmind.symmetric.model.TriggerHistory; +import org.junit.Test; + +public class ConfigurationChangedDataRouterTest { + + private static List THREE_TIER_LINKS; + + private static NetworkedNode THREE_TIER_NETWORKED_ROOT; + + private static List MULTIPLE_GROUPS_PLUS_REG_SVR_LINKS; + + private static NetworkedNode MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT; + + static { + THREE_TIER_LINKS = new ArrayList(); + THREE_TIER_LINKS.add(new NodeGroupLink("corp", "region")); + THREE_TIER_LINKS.add(new NodeGroupLink("region", "corp")); + THREE_TIER_LINKS.add(new NodeGroupLink("region", "laptop")); + THREE_TIER_LINKS.add(new NodeGroupLink("laptop", "region")); + + THREE_TIER_NETWORKED_ROOT = new NetworkedNode(new Node("corp", "corp")); + + Node rgn1 = new Node("rgn1", "region"); + rgn1.setCreatedAtNodeId("corp"); + THREE_TIER_NETWORKED_ROOT.addChild(new NetworkedNode(rgn1)); + + Node rgn2 = new Node("rgn2", "region"); + rgn2.setCreatedAtNodeId("corp"); + THREE_TIER_NETWORKED_ROOT.addChild(new NetworkedNode(rgn2)); + + Node laptop1 = new Node("laptop1", "laptop"); + laptop1.setCreatedAtNodeId("rgn1"); + THREE_TIER_NETWORKED_ROOT.findNetworkedNode(laptop1.getCreatedAtNodeId()).addChild( + new NetworkedNode(laptop1)); + + Node laptop2 = new Node("laptop2", "laptop"); + laptop2.setCreatedAtNodeId("rgn2"); + THREE_TIER_NETWORKED_ROOT.findNetworkedNode(laptop2.getCreatedAtNodeId()).addChild( + new NetworkedNode(laptop2)); + + MULTIPLE_GROUPS_PLUS_REG_SVR_LINKS = new ArrayList(); + MULTIPLE_GROUPS_PLUS_REG_SVR_LINKS.add(new NodeGroupLink("regsvr", "s1")); + MULTIPLE_GROUPS_PLUS_REG_SVR_LINKS.add(new NodeGroupLink("regsvr", "s2")); + MULTIPLE_GROUPS_PLUS_REG_SVR_LINKS.add(new NodeGroupLink("regsvr", "dw")); + MULTIPLE_GROUPS_PLUS_REG_SVR_LINKS.add(new NodeGroupLink("s1", "regsvr")); + MULTIPLE_GROUPS_PLUS_REG_SVR_LINKS.add(new NodeGroupLink("s2", "regsvr")); + MULTIPLE_GROUPS_PLUS_REG_SVR_LINKS.add(new NodeGroupLink("dw", "regsvr")); + MULTIPLE_GROUPS_PLUS_REG_SVR_LINKS.add(new NodeGroupLink("s1", "dw")); + MULTIPLE_GROUPS_PLUS_REG_SVR_LINKS.add(new NodeGroupLink("s2", "dw")); + + MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT = new NetworkedNode(new Node("regsvr", "regsvr")); + + Node node = null; + + node = new Node("s1", "s1"); + node.setCreatedAtNodeId("regsvr"); + MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.addChild(new NetworkedNode(node)); + + node = new Node("s2", "s2"); + node.setCreatedAtNodeId("regsvr"); + MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.addChild(new NetworkedNode(node)); + + node = new Node("dw", "dw"); + node.setCreatedAtNodeId("regsvr"); + MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.addChild(new NetworkedNode(node)); + + + } + + @Test + public void testRouteHeartbeatToParent() { + IDataRouter router = buildTestableRouter( + THREE_TIER_NETWORKED_ROOT.findNetworkedNode("laptop1").getNode(), THREE_TIER_LINKS, + THREE_TIER_NETWORKED_ROOT); + Set nodes = new HashSet(); + nodes.add(THREE_TIER_NETWORKED_ROOT.findNetworkedNode("rgn1").getNode()); + Collection nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "laptop1"), nodes, false); + Assert.assertNotNull(nodeIds); + Assert.assertEquals(1, nodeIds.size()); + Assert.assertEquals("rgn1", nodeIds.iterator().next()); + } + + @Test + public void testRouteLaptop1FromCorp() { + IDataRouter router = buildTestableRouter( + THREE_TIER_NETWORKED_ROOT.findNetworkedNode("corp").getNode(), THREE_TIER_LINKS, + THREE_TIER_NETWORKED_ROOT); + Set nodes = new HashSet(); + nodes.add(THREE_TIER_NETWORKED_ROOT.findNetworkedNode("rgn1").getNode()); + nodes.add(THREE_TIER_NETWORKED_ROOT.findNetworkedNode("rgn2").getNode()); + Collection nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "laptop1"), nodes, false); + Assert.assertNotNull(nodeIds); + Assert.assertEquals(1, nodeIds.size()); + Assert.assertEquals("rgn1", nodeIds.iterator().next()); + } + + @Test + public void testRouteS1ToDWFromRegsvr() { + IDataRouter router = buildTestableRouter( + MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.findNetworkedNode("regsvr").getNode(), MULTIPLE_GROUPS_PLUS_REG_SVR_LINKS, + MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT); + Set nodes = new HashSet(); + nodes.add(MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.findNetworkedNode("s1").getNode()); + nodes.add(MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.findNetworkedNode("s2").getNode()); + nodes.add(MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.findNetworkedNode("dw").getNode()); + Collection nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "s1"), nodes, false); + Assert.assertNotNull(nodeIds); + Assert.assertEquals(2, nodeIds.size()); + Assert.assertTrue(nodeIds.contains("s1")); + Assert.assertTrue(nodeIds.contains("dw")); + } + + @Test + public void testRouteDWToS1andS2FromRegsvr() { + IDataRouter router = buildTestableRouter( + MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.findNetworkedNode("regsvr").getNode(), MULTIPLE_GROUPS_PLUS_REG_SVR_LINKS, + MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT); + Set nodes = new HashSet(); + nodes.add(MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.findNetworkedNode("s1").getNode()); + nodes.add(MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.findNetworkedNode("s2").getNode()); + nodes.add(MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.findNetworkedNode("dw").getNode()); + Collection nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "dw"), nodes, false); + Assert.assertNotNull(nodeIds); + Assert.assertEquals(3, nodeIds.size()); + Assert.assertTrue(nodeIds.contains("s1")); + Assert.assertTrue(nodeIds.contains("s2")); + Assert.assertTrue(nodeIds.contains("dw")); + } + + protected DataMetaData buildDataMetaData(String tableName, String nodeId) { + Data data = new Data(); + data.setTableName(tableName); + data.setEventType(DataEventType.UPDATE); + data.setTriggerHistory(new TriggerHistory(tableName, "NODE_ID", "NODE_ID")); + data.setPkData(nodeId); + data.setRowData(nodeId); + return new DataMetaData(data, new Table(tableName), null, null); + } + + protected IDataRouter buildTestableRouter(final Node nodeThatIsRouting, + final List links, final NetworkedNode root) { + ConfigurationChangedRouter router = new ConfigurationChangedRouter() { + @Override + protected Node findIdentity() { + return nodeThatIsRouting; + } + + @Override + protected List getNodeGroupLinksFromContext(IRouterContext routingContext) { + return links; + } + + @Override + protected NetworkedNode getRootNetworkNodeFromContext(IRouterContext routingContext) { + return root; + } + }; + router.setTablePrefix("sym"); + return router; + } +} diff --git a/symmetric/symmetric-ddl/src/main/java/org/jumpmind/symmetric/ddl/model/Table.java b/symmetric/symmetric-ddl/src/main/java/org/jumpmind/symmetric/ddl/model/Table.java index 5f8a6b96bd..36d4f1bde5 100644 --- a/symmetric/symmetric-ddl/src/main/java/org/jumpmind/symmetric/ddl/model/Table.java +++ b/symmetric/symmetric-ddl/src/main/java/org/jumpmind/symmetric/ddl/model/Table.java @@ -27,7 +27,6 @@ import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.Predicate; @@ -37,8 +36,6 @@ /** * Represents a table in the database model. - * - * @version $Revision: 494338 $ */ public class Table implements Serializable, Cloneable { @@ -61,6 +58,14 @@ public class Table implements Serializable, Cloneable private ArrayList _foreignKeys = new ArrayList(); /** The indices applied to this table. */ private ArrayList _indices = new ArrayList(); + + public Table() { + + } + + public Table(String tableName) { + this._name = tableName; + } /** * Returns the catalog of this table as read from the database.