Skip to content

Commit

Permalink
0001718: If a node is deregistered by deleting it's sym_node* records…
Browse files Browse the repository at this point in the history
… and it is registered it will route and sync the deletes
  • Loading branch information
chenson42 committed May 10, 2014
1 parent 567c4dc commit 0b7a412
Showing 1 changed file with 83 additions and 82 deletions.
Expand Up @@ -92,7 +92,7 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
TriggerRouter triggerRouter) {

// the list of nodeIds that we will return
Set<String> nodeIds = null;
Set<String> nodeIds = new HashSet<String>();

// the inbound data
Map<String, String> columnValues = getDataMap(dataMetaData,
Expand All @@ -103,84 +103,15 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
if (me != null) {
NetworkedNode rootNetworkedNode = getRootNetworkNodeFromContext(routingContext);

// if this is sym_node or sym_node_security determine which nodes it
// goes to.
if (tableMatches(dataMetaData, TableConstants.SYM_NODE)
|| tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY)
|| tableMatches(dataMetaData, TableConstants.SYM_NODE_HOST)) {

String nodeIdInQuestion = columnValues.get("NODE_ID");
List<NodeGroupLink> nodeGroupLinks = getNodeGroupLinksFromContext(routingContext);
for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) {
if (!Constants.DEPLOYMENT_TYPE_REST.equals(nodeThatMayBeRoutedTo
.getDeploymentType())
&& !nodeThatMayBeRoutedTo.requires13Compatiblity()
&& isLinked(nodeIdInQuestion, nodeThatMayBeRoutedTo, rootNetworkedNode,
me, nodeGroupLinks)
&& !isSameNumberOfLinksAwayFromRoot(nodeThatMayBeRoutedTo,
rootNetworkedNode, me)
|| (nodeThatMayBeRoutedTo.getNodeId().equals(me.getNodeId()) && initialLoad)) {
if (nodeIds == null) {
nodeIds = new HashSet<String>();
}
nodeIds.add(nodeThatMayBeRoutedTo.getNodeId());
}
}

if (!initialLoad && nodeIds != null) {
DataEventType eventType = dataMetaData.getData().getDataEventType();
/*
* Don't route node security to it's own node. That node
* will get node security via registration and it will be
* updated by initial load. Otherwise, updates can be
* unpredictable in the order they will be applied at the
* node because updates are on a different channel than
* reloads
*/
if (tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY)) {
if (nodeIds.contains(nodeIdInQuestion)) {
boolean remove = true;
if (eventType == DataEventType.UPDATE) {
if ("1".equals(columnValues.get("REV_INITIAL_LOAD_ENABLED"))) {
boolean reverseLoadQueued = engine.getParameterService().is(
ParameterConstants.INITIAL_LOAD_REVERSE_FIRST)
|| "0".equals(columnValues.get("INITIAL_LOAD_ENABLED"));
/*
* Only send the update if the client is
* going to be expected to queue up a
* reverse load. The trigger to do this is
* the arrival of sym_node_security with
* REV_INITIAL_LOAD_ENABLED set to 1.
*/
if (reverseLoadQueued) {
remove = false;
}
}
}
if (remove) {
nodeIds.remove(nodeIdInQuestion);
}
}

/*
* The parent node never needs node_security updates.
*/
nodeIds.remove(columnValues.get("CREATED_AT_NODE_ID"));
}

/*
* Don't route insert events for a node to itself. they will
* be loaded during registration. If we route them, then an
* old state can override the correct state
*
* Don't send deletes to a node. A node should be
* responsible for deleting itself.
*/
if (dataMetaData.getData().getDataEventType() == DataEventType.INSERT
|| dataMetaData.getData().getDataEventType() == DataEventType.DELETE) {
nodeIds.remove(nodeIdInQuestion);
}
}
/*
* If this is sym_node or sym_node_security determine which
* nodes it goes to.
*/
routeNodeTables(nodeIds, columnValues, rootNetworkedNode, me, routingContext,
dataMetaData, possibleTargetNodes, initialLoad);
} else if (tableMatches(dataMetaData, TableConstants.SYM_TABLE_RELOAD_REQUEST)) {
String sourceNodeId = columnValues.get("SOURCE_NODE_ID");
String reloadEnabled = columnValues.get("RELOAD_ENABLED");
Expand All @@ -206,9 +137,6 @@ && isLinked(nodeIdInQuestion, nodeThatMayBeRoutedTo, rootNetworkedNode,
.getDeploymentType())
&& !nodeThatMayBeRoutedTo.requires13Compatiblity()
&& nodeThatMayBeRoutedTo.getNodeId().equals(sourceNodeId)) {
if (nodeIds == null) {
nodeIds = new HashSet<String>();
}
nodeIds.add(sourceNodeId);
}
}
Expand All @@ -226,9 +154,6 @@ && isLinked(nodeIdInQuestion, nodeThatMayBeRoutedTo, rootNetworkedNode,
NodeGroupLink link = configurationService.getNodeGroupLinkFor(
me.getNodeGroupId(), nodeThatMayBeRoutedTo.getNodeGroupId(), false);
if (link != null && link.isSyncConfigEnabled()) {
if (nodeIds == null) {
nodeIds = new HashSet<String>();
}
nodeIds.add(nodeThatMayBeRoutedTo.getNodeId());
}
}
Expand Down Expand Up @@ -271,6 +196,82 @@ && isLinked(nodeIdInQuestion, nodeThatMayBeRoutedTo, rootNetworkedNode,
return nodeIds;
}

protected void routeNodeTables(Set<String> nodeIds, Map<String, String> columnValues,
NetworkedNode rootNetworkedNode, Node me, SimpleRouterContext routingContext,
DataMetaData dataMetaData, Set<Node> possibleTargetNodes, boolean initialLoad) {
String nodeIdInQuestion = columnValues.get("NODE_ID");
if (!(me.getNodeId().equals(nodeIdInQuestion) && dataMetaData.getData().getDataEventType() == DataEventType.DELETE))
{
List<NodeGroupLink> nodeGroupLinks = getNodeGroupLinksFromContext(routingContext);
for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) {
if (!Constants.DEPLOYMENT_TYPE_REST.equals(nodeThatMayBeRoutedTo
.getDeploymentType())
&& !nodeThatMayBeRoutedTo.requires13Compatiblity()
&& isLinked(nodeIdInQuestion, nodeThatMayBeRoutedTo, rootNetworkedNode, me,
nodeGroupLinks)
&& !isSameNumberOfLinksAwayFromRoot(nodeThatMayBeRoutedTo,
rootNetworkedNode, me)
|| (nodeThatMayBeRoutedTo.getNodeId().equals(me.getNodeId()) && initialLoad)) {
nodeIds.add(nodeThatMayBeRoutedTo.getNodeId());
}
}

if (!initialLoad && nodeIds != null) {
DataEventType eventType = dataMetaData.getData().getDataEventType();
/*
* Don't route node security to it's own node. That node will
* get node security via registration and it will be updated by
* initial load. Otherwise, updates can be unpredictable in the
* order they will be applied at the node because updates are on
* a different channel than reloads
*/
if (tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY)) {
if (nodeIds.contains(nodeIdInQuestion)) {
boolean remove = true;
if (eventType == DataEventType.UPDATE) {
if ("1".equals(columnValues.get("REV_INITIAL_LOAD_ENABLED"))) {
boolean reverseLoadQueued = engine.getParameterService().is(
ParameterConstants.INITIAL_LOAD_REVERSE_FIRST)
|| "0".equals(columnValues.get("INITIAL_LOAD_ENABLED"));
/*
* Only send the update if the client is going
* to be expected to queue up a reverse load.
* The trigger to do this is the arrival of
* sym_node_security with
* REV_INITIAL_LOAD_ENABLED set to 1.
*/
if (reverseLoadQueued) {
remove = false;
}
}
}
if (remove) {
nodeIds.remove(nodeIdInQuestion);
}
}

/*
* The parent node never needs node_security updates.
*/
nodeIds.remove(columnValues.get("CREATED_AT_NODE_ID"));
}

/*
* Don't route insert events for a node to itself. they will be
* loaded during registration. If we route them, then an old
* state can override the correct state
*
* Don't send deletes to a node. A node should be responsible
* for deleting itself.
*/
if (dataMetaData.getData().getDataEventType() == DataEventType.INSERT
|| dataMetaData.getData().getDataEventType() == DataEventType.DELETE) {
nodeIds.remove(nodeIdInQuestion);
}
}
}
}

@SuppressWarnings("unchecked")
protected void queueSyncTriggers(SimpleRouterContext routingContext, DataMetaData dataMetaData,
Map<String, String> columnValues) {
Expand Down

0 comments on commit 0b7a412

Please sign in to comment.