Skip to content

Commit

Permalink
0001347: Initial load ignores subselect routers and routes to all nodes.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jul 26, 2013
1 parent 1491be7 commit 295238e
Show file tree
Hide file tree
Showing 13 changed files with 28 additions and 27 deletions.
Expand Up @@ -58,7 +58,7 @@ public AuditTableDataRouter(ISymmetricEngine engine) {
}

public Set<String> routeToNodes(SimpleRouterContext context, DataMetaData dataMetaData,
Set<Node> nodes, boolean initialLoad) {
Set<Node> nodes, boolean initialLoad, boolean initialLoadSelectUsed) {
DataEventType eventType = dataMetaData.getData().getDataEventType();
if (eventType == DataEventType.INSERT || eventType == DataEventType.UPDATE
|| eventType == DataEventType.DELETE) {
Expand Down
Expand Up @@ -53,7 +53,7 @@ public BshDataRouter(ISymmetricEngine engine) {
}

public Set<String> routeToNodes(SimpleRouterContext context, DataMetaData dataMetaData, Set<Node> nodes,
boolean initialLoad) {
boolean initialLoad, boolean initialLoadSelectUsed) {
try {
long ts = System.currentTimeMillis();
Interpreter interpreter = getInterpreter(context);
Expand Down
Expand Up @@ -93,7 +93,7 @@ public ColumnMatchDataRouter(IConfigurationService configurationService, ISymmet
}

public Set<String> routeToNodes(SimpleRouterContext routingContext,
DataMetaData dataMetaData, Set<Node> nodes, boolean initialLoad) {
DataMetaData dataMetaData, Set<Node> nodes, boolean initialLoad, boolean initialLoadSelectUsed) {
Set<String> nodeIds = null;
List<Expression> expressions = getExpressions(dataMetaData.getRouter(), routingContext);
Map<String, String> columnValues = getDataMap(dataMetaData, symmetricDialect);
Expand Down
Expand Up @@ -81,7 +81,7 @@ public ConfigurationChangedDataRouter(ISymmetricEngine engine) {
}

public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData dataMetaData,
Set<Node> possibleTargetNodes, boolean initialLoad) {
Set<Node> possibleTargetNodes, boolean initialLoad, boolean initialLoadSelectUsed) {

// the list of nodeIds that we will return
Set<String> nodeIds = null;
Expand Down
Expand Up @@ -32,7 +32,7 @@
public class DefaultDataRouter extends AbstractDataRouter {

public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData dataMetaData, Set<Node> nodes,
boolean initialLoad) {
boolean initialLoad, boolean initialLoadSelectUsed) {
return toNodeIds(nodes, null);
}

Expand Down
Expand Up @@ -44,7 +44,7 @@ public FileSyncDataRouter(ISymmetricEngine engine) {
}

public Set<String> routeToNodes(SimpleRouterContext context, DataMetaData dataMetaData,
Set<Node> nodes, boolean initialLoad) {
Set<Node> nodes, boolean initialLoad, boolean initialLoadSelectUsed) {
Set<String> nodeIds = new HashSet<String>();
IFileSyncService fileSyncService = engine.getFileSyncService();
IRouterService routerService = engine.getRouterService();
Expand Down Expand Up @@ -80,7 +80,7 @@ public Set<String> routeToNodes(SimpleRouterContext context, DataMetaData dataMe
((ChannelRouterContext) context).addUsedDataRouter(dataRouter);
}
dataMetaData.setRouter(router);
nodeIds.addAll(dataRouter.routeToNodes(context, dataMetaData, nodes, false));
nodeIds.addAll(dataRouter.routeToNodes(context, dataMetaData, nodes, false, false));
nodeIds.remove(sourceNodeId);
} else {
log.error(
Expand Down
Expand Up @@ -44,7 +44,7 @@
*/
public interface IDataRouter extends IExtensionPoint {

public Set<String> routeToNodes(SimpleRouterContext context, DataMetaData dataMetaData, Set<Node> nodes, boolean initialLoad);
public Set<String> routeToNodes(SimpleRouterContext context, DataMetaData dataMetaData, Set<Node> nodes, boolean initialLoad, boolean initialLoadSelectUsed);

public void completeBatch(SimpleRouterContext context, OutgoingBatch batch);

Expand Down
Expand Up @@ -65,7 +65,7 @@ public LookupTableDataRouter() {
}

public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData dataMetaData,
Set<Node> nodes, boolean initialLoad) {
Set<Node> nodes, boolean initialLoad, boolean initialLoadSelectUsed) {

Set<String> nodeIds = null;
Router router = dataMetaData.getRouter();
Expand Down
Expand Up @@ -69,12 +69,12 @@ public SubSelectDataRouter(ISymmetricDialect symmetricDialect) {
}

public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData dataMetaData,
Set<Node> nodes, boolean initialLoad) {
Set<Node> nodes, boolean initialLoad, boolean initialLoadSelectUsed) {
String sql = FormatUtils.replaceToken(SQL, "prefixName", symmetricDialect.getTablePrefix(),
true);
String subSelect = dataMetaData.getRouter().getRouterExpression();
Set<String> nodeIds = null;
if (!StringUtils.isBlank(subSelect) && !initialLoad) {
if (!StringUtils.isBlank(subSelect) && !initialLoadSelectUsed) {
try {
Map<String, Object> sqlParams = getDataObjectMap(dataMetaData, symmetricDialect, true);
sqlParams.put("NODE_GROUP_ID", dataMetaData.getRouter().getNodeGroupLink()
Expand All @@ -91,7 +91,7 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
dataMetaData.getRouter().getRouterId());
throw ex;
}
} else if (initialLoad) {
} else if (initialLoadSelectUsed) {
nodeIds = toNodeIds(nodes, null);
} else {
throw new InvalidSqlException("The subselect expression is missing for the %s router", dataMetaData.getRouter().getRouterId());
Expand Down
Expand Up @@ -43,7 +43,7 @@ public interface IRouterService extends IService {
public long getUnroutedDataCount();

public boolean shouldDataBeRouted(SimpleRouterContext context, DataMetaData dataMetaData,
Node node, boolean initialLoad);
Node node, boolean initialLoad, boolean initialLoadSelectUsed);

public void addDataRouter(String name, IDataRouter dataRouter);

Expand Down
Expand Up @@ -1352,7 +1352,8 @@ public CsvData next() {
&& routingContext != null
&& !routerService.shouldDataBeRouted(routingContext,
new DataMetaData((Data) data, targetTable, triggerRouter.getRouter(),
routingContext.getChannel()), node, true));
routingContext.getChannel()), node, true, StringUtils
.isNotBlank(triggerRouter.getInitialLoadSelect())));

if (data != null && outgoingBatch != null && !outgoingBatch.isExtractJobFlag()) {
outgoingBatch.incrementDataEventCount();
Expand Down
Expand Up @@ -123,12 +123,12 @@ public RouterService(ISymmetricEngine engine) {
* For use in data load events
*/
public boolean shouldDataBeRouted(SimpleRouterContext context, DataMetaData dataMetaData,
Node node, boolean initialLoad) {
Node node, boolean initialLoad, boolean initialLoadSelectUsed) {
IDataRouter router = getDataRouter(dataMetaData.getRouter());
Set<Node> oneNodeSet = new HashSet<Node>(1);
oneNodeSet.add(node);
Collection<String> nodeIds = router.routeToNodes(context, dataMetaData, oneNodeSet,
initialLoad);
initialLoad, initialLoadSelectUsed);
return nodeIds != null && nodeIds.contains(node.getNodeId());
}

Expand Down Expand Up @@ -624,7 +624,7 @@ protected int routeData(ProcessInfo processInfo, Data data, ChannelRouterContext
Collection<String> nodeIds = null;
if (!context.getChannel().isIgnoreEnabled()
&& triggerRouter.isRouted(data.getDataEventType())) {

String targetNodeIds = data.getNodeList();
if (StringUtils.isNotBlank(targetNodeIds)) {
List<String> targetNodeIdsList = Arrays.asList(targetNodeIds.split(","));
Expand All @@ -640,7 +640,7 @@ protected int routeData(ProcessInfo processInfo, Data data, ChannelRouterContext
context.addUsedDataRouter(dataRouter);
long ts = System.currentTimeMillis();
nodeIds = dataRouter.routeToNodes(context, dataMetaData,
findAvailableNodes(triggerRouter, context), false);
findAvailableNodes(triggerRouter, context), false, false);
context.incrementStat(System.currentTimeMillis() - ts,
ChannelRouterContext.STAT_DATA_ROUTER_MS);
}
Expand Down
Expand Up @@ -111,7 +111,7 @@ public void testRouteHeartbeatToParent() {
THREE_TIER_NETWORKED_ROOT);
Set<Node> nodes = new HashSet<Node>();
nodes.add(THREE_TIER_NETWORKED_ROOT.findNetworkedNode("rgn1").getNode());
Collection<String> nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "laptop1"), nodes, false);
Collection<String> nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "laptop1"), nodes, false, false);
Assert.assertNotNull(nodeIds);
Assert.assertEquals(1, nodeIds.size());
Assert.assertEquals("rgn1", nodeIds.iterator().next());
Expand All @@ -125,7 +125,7 @@ public void testRouteLaptop1FromRgn1() {
THREE_TIER_NETWORKED_ROOT);
Set<Node> nodes = new HashSet<Node>();
nodes.add(THREE_TIER_NETWORKED_ROOT.findNetworkedNode("laptop1").getNode());
Collection<String> nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "laptop1"), nodes, false);
Collection<String> nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "laptop1"), nodes, false, false);
Assert.assertNotNull(nodeIds);
Assert.assertEquals(1, nodeIds.size());
Assert.assertEquals("laptop1", nodeIds.iterator().next());
Expand All @@ -141,7 +141,7 @@ public void testRouteRgn2FromCorp() {
nodes.add(THREE_TIER_NETWORKED_ROOT.findNetworkedNode("rgn1").getNode());
nodes.add(THREE_TIER_NETWORKED_ROOT.findNetworkedNode("rgn2").getNode());

Collection<String> nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "rgn2"), nodes, false);
Collection<String> nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "rgn2"), nodes, false, false);
Assert.assertNotNull(nodeIds);
Assert.assertEquals(1, nodeIds.size());
Assert.assertEquals("rgn2", nodeIds.iterator().next());
Expand All @@ -156,7 +156,7 @@ public void testConfigurationExtract() {
Set<Node> nodes = new HashSet<Node>();
nodes.add(THREE_TIER_NETWORKED_ROOT.findNetworkedNode("corp").getNode());

Collection<String> nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "corp"), nodes, true);
Collection<String> nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "corp"), nodes, true, false);
Assert.assertNotNull(nodeIds);
Assert.assertEquals(1, nodeIds.size());
Assert.assertEquals("corp", nodeIds.iterator().next());
Expand All @@ -172,7 +172,7 @@ public void testRouteRgn1FromCorp() {
nodes.add(THREE_TIER_NETWORKED_ROOT.findNetworkedNode("rgn1").getNode());
nodes.add(THREE_TIER_NETWORKED_ROOT.findNetworkedNode("rgn2").getNode());

Collection<String> nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "rgn1"), nodes, false);
Collection<String> nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "rgn1"), nodes, false, false);
Assert.assertNotNull(nodeIds);
Assert.assertEquals(1, nodeIds.size());
Assert.assertEquals("rgn1", nodeIds.iterator().next());
Expand All @@ -186,7 +186,7 @@ public void testRouteLaptop1FromCorp() {
Set<Node> nodes = new HashSet<Node>();
nodes.add(THREE_TIER_NETWORKED_ROOT.findNetworkedNode("rgn1").getNode());
nodes.add(THREE_TIER_NETWORKED_ROOT.findNetworkedNode("rgn2").getNode());
Collection<String> nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "laptop1"), nodes, false);
Collection<String> nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "laptop1"), nodes, false, false);
Assert.assertNotNull(nodeIds);
Assert.assertEquals(1, nodeIds.size());
Assert.assertEquals("rgn1", nodeIds.iterator().next());
Expand All @@ -201,7 +201,7 @@ public void testRouteS1ToDWFromRegsvr() {
nodes.add(MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.findNetworkedNode("s1").getNode());
nodes.add(MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.findNetworkedNode("s2").getNode());
nodes.add(MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.findNetworkedNode("dw").getNode());
Collection<String> nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "s1"), nodes, false);
Collection<String> nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "s1"), nodes, false, false);
Assert.assertNotNull(nodeIds);
Assert.assertEquals(2, nodeIds.size());
Assert.assertTrue(nodeIds.contains("s1"));
Expand All @@ -217,7 +217,7 @@ public void testRouteDWToS1andS2FromRegsvr() {
nodes.add(MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.findNetworkedNode("s1").getNode());
nodes.add(MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.findNetworkedNode("s2").getNode());
nodes.add(MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.findNetworkedNode("dw").getNode());
Collection<String> nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "dw"), nodes, false);
Collection<String> nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "dw"), nodes, false, false);
Assert.assertNotNull(nodeIds);
Assert.assertEquals(3, nodeIds.size());
Assert.assertTrue(nodeIds.contains("s1"));
Expand All @@ -234,7 +234,7 @@ public void testRouteS1toRegsvrFromS1() {
nodes.add(MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.findNetworkedNode("s1").getNode());
nodes.add(MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.findNetworkedNode("dw").getNode());
nodes.add(MULTIPLE_GROUPS_PLUS_REG_SVR_NETWORKED_ROOT.findNetworkedNode("regsvr").getNode());
Collection<String> nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "s1"), nodes, false);
Collection<String> nodeIds = router.routeToNodes(new SimpleRouterContext(), buildDataMetaData("SYM_NODE", "s1"), nodes, false, false);
Assert.assertNotNull(nodeIds);
Assert.assertEquals(1, nodeIds.size());
Assert.assertTrue(nodeIds.contains("regsvr"));
Expand Down

0 comments on commit 295238e

Please sign in to comment.