Skip to content

Commit

Permalink
null pointer check for node identity during routing of configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jul 3, 2012
1 parent 20e0f7e commit 61f0c8e
Showing 1 changed file with 82 additions and 74 deletions.
Expand Up @@ -80,96 +80,104 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
Map<String, String> columnValues = getDataMap(dataMetaData);

Node me = findIdentity();
NetworkedNode rootNetworkedNode = getRootNetworkNodeFromContext(routingContext);

// if this is sym_node or sym_node_security determine which nodes it
// goes to.
if (tableMatches(dataMetaData, TableConstants.SYM_NODE)
|| tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY)
|| tableMatches(dataMetaData, TableConstants.SYM_NODE_HOST)) {

String nodeIdInQuestion = columnValues.get("NODE_ID");
List<NodeGroupLink> nodeGroupLinks = getNodeGroupLinksFromContext(routingContext);
for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) {
if (isLinked(nodeIdInQuestion, nodeThatMayBeRoutedTo, rootNetworkedNode, me,
nodeGroupLinks)
&& !isSameNumberOfLinksAwayFromRoot(nodeThatMayBeRoutedTo,
rootNetworkedNode, me)
|| (nodeThatMayBeRoutedTo.getNodeId().equals(me.getNodeId()) && initialLoad)) {
if (nodeIds == null) {
nodeIds = new HashSet<String>();

if (me != null) {
NetworkedNode rootNetworkedNode = getRootNetworkNodeFromContext(routingContext);

// if this is sym_node or sym_node_security determine which nodes it
// goes to.
if (tableMatches(dataMetaData, TableConstants.SYM_NODE)
|| tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY)
|| tableMatches(dataMetaData, TableConstants.SYM_NODE_HOST)) {

String nodeIdInQuestion = columnValues.get("NODE_ID");
List<NodeGroupLink> nodeGroupLinks = getNodeGroupLinksFromContext(routingContext);
for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) {
if (isLinked(nodeIdInQuestion, nodeThatMayBeRoutedTo, rootNetworkedNode, me,
nodeGroupLinks)
&& !isSameNumberOfLinksAwayFromRoot(nodeThatMayBeRoutedTo,
rootNetworkedNode, me)
|| (nodeThatMayBeRoutedTo.getNodeId().equals(me.getNodeId()) && initialLoad)) {
if (nodeIds == null) {
nodeIds = new HashSet<String>();
}
nodeIds.add(nodeThatMayBeRoutedTo.getNodeId());
}
nodeIds.add(nodeThatMayBeRoutedTo.getNodeId());
}
}

if (!initialLoad && nodeIds != null) {
/*
* don't route node security to it's own node. that node will
* get node security via registration and it will be updated by
* initial load
*/
if (tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY)) {
if (!initialLoad && nodeIds != null) {
/*
* don't route node security to it's own node. that node
* will get node security via registration and it will be
* updated by initial load
*/
if (tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY)) {

nodeIds.remove(nodeIdInQuestion);
}
/*
* don't route insert events for a node to itself. they will be
* loaded during
*/
if (dataMetaData.getData().getDataEventType() == DataEventType.INSERT) {
nodeIds.remove(nodeIdInQuestion);
nodeIds.remove(nodeIdInQuestion);
}
/*
* don't route insert events for a node to itself. they will
* be loaded during
*/
if (dataMetaData.getData().getDataEventType() == DataEventType.INSERT) {
nodeIds.remove(nodeIdInQuestion);
}
}
}
} else {
for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) {
if (!isSameNumberOfLinksAwayFromRoot(nodeThatMayBeRoutedTo, rootNetworkedNode, me)
|| (nodeThatMayBeRoutedTo.getNodeId().equals(me.getNodeId()) && initialLoad)) {
if (nodeIds == null) {
nodeIds = new HashSet<String>();
} else {
for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) {
if (!isSameNumberOfLinksAwayFromRoot(nodeThatMayBeRoutedTo, rootNetworkedNode,
me)
|| (nodeThatMayBeRoutedTo.getNodeId().equals(me.getNodeId()) && initialLoad)) {
if (nodeIds == null) {
nodeIds = new HashSet<String>();
}
nodeIds.add(nodeThatMayBeRoutedTo.getNodeId());
}
nodeIds.add(nodeThatMayBeRoutedTo.getNodeId());
}
}

if (StringUtils.isBlank(dataMetaData.getData().getSourceNodeId())
&& (tableMatches(dataMetaData, TableConstants.SYM_TRIGGER)
|| tableMatches(dataMetaData, TableConstants.SYM_TRIGGER_ROUTER)
|| tableMatches(dataMetaData, TableConstants.SYM_ROUTER) || tableMatches(
dataMetaData, TableConstants.SYM_NODE_GROUP_LINK))) {
routingContext.getContextCache().put(CTX_KEY_RESYNC_NEEDED, Boolean.TRUE);
}

if (tableMatches(dataMetaData, TableConstants.SYM_CHANNEL)) {
routingContext.getContextCache().put(CTX_KEY_FLUSH_CHANNELS_NEEDED, Boolean.TRUE);
}

if (tableMatches(dataMetaData, TableConstants.SYM_CONFLICT)) {
routingContext.getContextCache().put(CTX_KEY_FLUSH_CONFLICTS_NEEDED, Boolean.TRUE);
}
if (StringUtils.isBlank(dataMetaData.getData().getSourceNodeId())
&& (tableMatches(dataMetaData, TableConstants.SYM_TRIGGER)
|| tableMatches(dataMetaData, TableConstants.SYM_TRIGGER_ROUTER)
|| tableMatches(dataMetaData, TableConstants.SYM_ROUTER) || tableMatches(
dataMetaData, TableConstants.SYM_NODE_GROUP_LINK))) {
routingContext.getContextCache().put(CTX_KEY_RESYNC_NEEDED, Boolean.TRUE);
}

if (tableMatches(dataMetaData, TableConstants.SYM_PARAMETER)) {
routingContext.getContextCache().put(CTX_KEY_FLUSH_PARAMETERS_NEEDED, Boolean.TRUE);
if (tableMatches(dataMetaData, TableConstants.SYM_CHANNEL)) {
routingContext.getContextCache().put(CTX_KEY_FLUSH_CHANNELS_NEEDED,
Boolean.TRUE);
}

if (dataMetaData.getData().getRowData() != null
&& dataMetaData.getData().getRowData().contains("job.")) {
routingContext.getContextCache().put(CTX_KEY_RESTART_JOBMANAGER_NEEDED,
if (tableMatches(dataMetaData, TableConstants.SYM_CONFLICT)) {
routingContext.getContextCache().put(CTX_KEY_FLUSH_CONFLICTS_NEEDED,
Boolean.TRUE);
}

if (dataMetaData.getData().getRowData() != null
&& (dataMetaData.getData().getRowData()
.contains(ParameterConstants.PULL_THREAD_COUNT_PER_SERVER)
|| dataMetaData.getData().getRowData()
.contains(ParameterConstants.PUSH_THREAD_COUNT_PER_SERVER))) {
routingContext.getContextCache().put(CTX_KEY_RESTART_NODE_COMMUNICATOR_NEEDED,
if (tableMatches(dataMetaData, TableConstants.SYM_PARAMETER)) {
routingContext.getContextCache().put(CTX_KEY_FLUSH_PARAMETERS_NEEDED,
Boolean.TRUE);

if (dataMetaData.getData().getRowData() != null
&& dataMetaData.getData().getRowData().contains("job.")) {
routingContext.getContextCache().put(CTX_KEY_RESTART_JOBMANAGER_NEEDED,
Boolean.TRUE);
}

if (dataMetaData.getData().getRowData() != null
&& (dataMetaData.getData().getRowData()
.contains(ParameterConstants.PULL_THREAD_COUNT_PER_SERVER) || dataMetaData
.getData().getRowData()
.contains(ParameterConstants.PUSH_THREAD_COUNT_PER_SERVER))) {
routingContext.getContextCache().put(
CTX_KEY_RESTART_NODE_COMMUNICATOR_NEEDED, Boolean.TRUE);
}
}
}

if (tableMatches(dataMetaData, TableConstants.SYM_TRANSFORM_COLUMN)
|| tableMatches(dataMetaData, TableConstants.SYM_TRANSFORM_TABLE)) {
routingContext.getContextCache().put(CTX_KEY_FLUSH_TRANSFORMS_NEEDED, Boolean.TRUE);
if (tableMatches(dataMetaData, TableConstants.SYM_TRANSFORM_COLUMN)
|| tableMatches(dataMetaData, TableConstants.SYM_TRANSFORM_TABLE)) {
routingContext.getContextCache().put(CTX_KEY_FLUSH_TRANSFORMS_NEEDED,
Boolean.TRUE);
}
}
}

Expand Down

0 comments on commit 61f0c8e

Please sign in to comment.