Skip to content

Commit

Permalink
0001346: reloadTable JMX method and command should always route to th…
Browse files Browse the repository at this point in the history
…e node_list
  • Loading branch information
chenson42 committed Jul 25, 2013
1 parent 0954093 commit 1491be7
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 25 deletions.
Expand Up @@ -395,6 +395,10 @@ protected File getSnapshotDirectory() {
snapshotsDir.mkdirs();
return snapshotsDir;
}

public ApplicationContext getSpringContext() {
return springContext;
}

public File snapshot() {

Expand Down
Expand Up @@ -169,6 +169,18 @@ protected void close(ISqlTransaction transaction) {
transaction.close();
}
}

protected Set<String> toNodeIds(Set<Node> nodes) {
return toNodeIds(nodes, null);
}

protected Set<String> toNodeIds(Set<Node> nodes, Set<String> nodeIds) {
nodeIds = nodeIds == null ? new HashSet<String>(nodes.size()) : nodeIds;
for (Node node : nodes) {
nodeIds.add(node.getNodeId());
}
return nodeIds;
}

protected String getRootMessage(Exception ex) {
Throwable cause = ExceptionUtils.getRootCause(ex);
Expand Down
Expand Up @@ -261,7 +261,7 @@ public long insertReloadEvent(ISqlTransaction transaction, Node targetNode,
}

String channelId = triggerRouter.getTrigger().getChannelId();
if (isLoad && !Constants.CHANNEL_FILESYNC.equals(triggerRouter.getTrigger().getChannelId()) &&
if (!Constants.CHANNEL_FILESYNC.equals(triggerRouter.getTrigger().getChannelId()) &&
parameterService.is(ParameterConstants.INITIAL_LOAD_USE_RELOAD_CHANNEL)) {
channelId = Constants.CHANNEL_RELOAD;
}
Expand Down
Expand Up @@ -624,32 +624,32 @@ protected int routeData(ProcessInfo processInfo, Data data, ChannelRouterContext
Collection<String> nodeIds = null;
if (!context.getChannel().isIgnoreEnabled()
&& triggerRouter.isRouted(data.getDataEventType())) {
IDataRouter dataRouter = getDataRouter(triggerRouter.getRouter());
context.addUsedDataRouter(dataRouter);
long ts = System.currentTimeMillis();
nodeIds = dataRouter.routeToNodes(context, dataMetaData,
findAvailableNodes(triggerRouter, context), false);
context.incrementStat(System.currentTimeMillis() - ts,
ChannelRouterContext.STAT_DATA_ROUTER_MS);

if (nodeIds != null) {
String targetNodeIds = data.getNodeList();
if (StringUtils.isNotBlank(targetNodeIds)) {
List<String> targetNodeIdsList = Arrays
.asList(targetNodeIds.split(","));
nodeIds = CollectionUtils.intersection(targetNodeIdsList, nodeIds);

if (nodeIds.size() == 0) {
log.warn(
"None of the target nodes specified in the data.node_list field ({}) were qualified nodes. {} will not be routed",
targetNodeIds, data.getDataId());
}
}

String targetNodeIds = data.getNodeList();
if (StringUtils.isNotBlank(targetNodeIds)) {
List<String> targetNodeIdsList = Arrays.asList(targetNodeIds.split(","));
nodeIds = CollectionUtils.intersection(targetNodeIdsList, toNodeIds(findAvailableNodes(triggerRouter, context)));

if (nodeIds.size() == 0) {
log.warn(
"None of the target nodes specified in the data.node_list field ({}) were qualified nodes. {} will not be routed",
targetNodeIds, data.getDataId());
}
} else {
IDataRouter dataRouter = getDataRouter(triggerRouter.getRouter());
context.addUsedDataRouter(dataRouter);
long ts = System.currentTimeMillis();
nodeIds = dataRouter.routeToNodes(context, dataMetaData,
findAvailableNodes(triggerRouter, context), false);
context.incrementStat(System.currentTimeMillis() - ts,
ChannelRouterContext.STAT_DATA_ROUTER_MS);
}

if (nodeIds != null) {
if (!triggerRouter.isPingBackEnabled() && data.getSourceNodeId() != null) {
nodeIds.remove(data.getSourceNodeId());
}

// should never route to self
nodeIds.remove(engine.getNodeService().findIdentityNodeId());

Expand Down
Expand Up @@ -27,8 +27,13 @@

import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.model.OutgoingBatches;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.web.rest.RestService;

public class NonDmlEventsTest extends AbstractTest {

Expand All @@ -49,13 +54,17 @@ protected Table[] getTables(String name) {

Table b = new Table("B");
b.addColumn(new Column("ID", true, Types.BIGINT, -1, -1));

Table nodeSpecific = new Table("NODE_SPECIFIC");
nodeSpecific.addColumn(new Column("NODE_ID", true, Types.BIGINT, -1, -1));

return new Table[] { testTable, a, b };
return new Table[] { testTable, a, b, nodeSpecific };
}

@Override
protected void test(ISymmetricEngine rootServer, ISymmetricEngine clientServer)
throws Exception {

loadConfigAndRegisterNode("client", "root");

// pull to clear out any heartbeat events
Expand Down Expand Up @@ -152,6 +161,32 @@ protected void test(ISymmetricEngine rootServer, ISymmetricEngine clientServer)
10,
clientServer.getDatabasePlatform().getSqlTemplate()
.queryForInt(String.format("select count(*) from %s", clientTable.getName())));


testRoutingOfReloadEvents(rootServer, clientServer);

}

protected void testRoutingOfReloadEvents(ISymmetricEngine rootServer, ISymmetricEngine clientServer)
throws Exception {
rootServer.getParameterService().saveParameter(ParameterConstants.REST_API_ENABLED, true, "unit_test");
rootServer.getRegistrationService().openRegistration(clientServer.getParameterService().getNodeGroupId(), "2");
rootServer.getRegistrationService().openRegistration(clientServer.getParameterService().getNodeGroupId(), "3");
RestService restService = getRegServer().getRestService();
// register a few more nodes to make sure that when we insert reload events they are only routed to the node we want
restService.postRegisterNode("2", clientServer.getParameterService().getNodeGroupId(), DatabaseNamesConstants.H2, "1.2", "host2");
restService.postRegisterNode("3", clientServer.getParameterService().getNodeGroupId(), DatabaseNamesConstants.H2, "1.2", "host2");

Assert.assertEquals(0, rootServer.getOutgoingBatchService().countOutgoingBatchesUnsent(Constants.CHANNEL_RELOAD));
Table serverTable = rootServer.getDatabasePlatform().readTableFromDatabase(null, null, "NODE_SPECIFIC");
Assert.assertNotNull(serverTable);
Assert.assertTrue(rootServer.getDataService().reloadTable(clientServer.getNodeService().findIdentityNodeId(), null, null, serverTable.getName()).startsWith("Successfully created"));
rootServer.route();
Assert.assertEquals(1, rootServer.getOutgoingBatchService().countOutgoingBatchesUnsent(Constants.CHANNEL_RELOAD));

OutgoingBatches batches = rootServer.getOutgoingBatchService().getOutgoingBatches(clientServer.getNodeService().findIdentityNodeId(), true);

Assert.assertEquals(1, batches.getBatchesForChannel(Constants.CHANNEL_RELOAD).size());

}
}
Expand Up @@ -122,16 +122,19 @@ insert,"filesync","3","100","100","10000","0","1","1","1","1","0","default","def
catalog,
schema,
table,SYM_TRIGGER
insert,"NODE_SPECIFIC",,,"NODE_SPECIFIC","default","0","0","0","0",,,,"1=1","1=1","1=1",,,,,"1","1","1","0","2013-06-21 10:48:23.669","admin","2013-06-21 10:48:23.669"
insert,"DEAD_CAMELCASE",,,"CamelCase","default","0","0","0","0",,,,"1=1","1=1","1=1",,,,,"0","0","1","0","2013-06-21 10:48:23.669","admin","2013-06-21 10:48:23.669"
insert,"WILDCARD",,,"*,!CamelCase","default","0","0","0","0",,,,"1=1","1=1","1=1",,,,,"0","0","1","0","2013-06-22 21:10:33.127","admin","2013-06-22 21:11:23.787"
catalog,
schema,
table,SYM_ROUTER
insert,"CLIENT_2_ROOT",,,,"client","root","default",,"1","1","1","2013-06-21 10:37:23.472","admin","2013-06-21 10:37:23.472"
insert,"ROOT_2_CLIENT",,,,"root","client","default",,"1","1","1","2013-06-21 10:37:43.636","admin","2013-06-21 10:37:43.636"
insert,"ROOT_2_NODE",,,,"root","client","column","NODE_ID=:NODE_ID","1","1","1","2013-06-21 10:37:43.636","admin","2013-06-21 10:37:43.636"
catalog,
schema,
table,SYM_TRIGGER_ROUTER
insert,"DEAD_CAMELCASE","ROOT_2_CLIENT","1","50","","","0","2013-06-21 10:48:30.989","admin","2013-06-21 10:48:30.989"
insert,"WILDCARD","ROOT_2_CLIENT","1","50","","","0","2013-06-22 21:10:46.738","admin","2013-06-22 21:10:46.738"
insert,"NODE_SPECIFIC","ROOT_2_NODE","1","50","","","0","2013-06-22 21:10:46.738","admin","2013-06-22 21:10:46.738"
commit,-9999

0 comments on commit 1491be7

Please sign in to comment.