Skip to content

Commit

Permalink
0006013: Added :SOURCE_NODE_ID, :SOURCE_EXTERNAL_ID, and :SOURCE_NODE…
Browse files Browse the repository at this point in the history
…_GROUP_ID variables to column match routers
  • Loading branch information
evan-miller-jumpmind committed Oct 11, 2023
1 parent b5c5d3a commit 53d3709
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 9 deletions.
Expand Up @@ -64,8 +64,11 @@ endif::pro[]
====

* :NODE_ID
* :SOURCE_NODE_ID
* :EXTERNAL_ID
* :SOURCE_EXTERNAL_ID
* :NODE_GROUP_ID
* :SOURCE_NODE_GROUP_ID
* :REDIRECT_NODE
====

Expand Down
Expand Up @@ -25,8 +25,11 @@ private TokenConstants() {
}

public static final String NODE_ID = ":NODE_ID";
public static final String SOURCE_NODE_ID = ":SOURCE_NODE_ID";
public static final String EXTERNAL_ID = ":EXTERNAL_ID";
public static final String SOURCE_EXTERNAL_ID = ":SOURCE_EXTERNAL_ID";
public static final String NODE_GROUP_ID = ":NODE_GROUP_ID";
public static final String SOURCE_NODE_GROUP_ID = ":SOURCE_NODE_GROUP_ID";
public static final String REDIRECT_NODE = ":REDIRECT_NODE";
public static final String EXTERNAL_DATA = ":EXTERNAL_DATA";
}
Expand Up @@ -29,6 +29,7 @@
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SyntaxParsingException;
import org.jumpmind.symmetric.common.TokenConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
Expand Down Expand Up @@ -69,17 +70,15 @@
*/
public class ColumnMatchDataRouter extends AbstractDataRouter implements IDataRouter, IBuiltInExtensionPoint {
private static final String NULL_VALUE = "NULL";
private IConfigurationService configurationService;
private ISymmetricDialect symmetricDialect;
private ISymmetricEngine engine;
final static String EXPRESSION_KEY = String.format("%s.Expression.", ColumnMatchDataRouter.class
.getName());

public ColumnMatchDataRouter() {
}

public ColumnMatchDataRouter(IConfigurationService configurationService, ISymmetricDialect symmetricDialect) {
this.configurationService = configurationService;
this.symmetricDialect = symmetricDialect;
public ColumnMatchDataRouter(ISymmetricEngine engine) {
this.engine = engine;
}

public Set<String> routeToNodes(SimpleRouterContext routingContext,
Expand All @@ -89,8 +88,9 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext,
nodeIds = toNodeIds(nodes, null);
} else {
List<Expression> expressions = getExpressions(dataMetaData.getRouter(), routingContext);
Map<String, String> columnValues = getDataMap(dataMetaData, symmetricDialect);
Map<String, String> columnValues = getDataMap(dataMetaData, engine.getSymmetricDialect());
if (columnValues != null) {
Node identity = engine.getNodeService().findIdentity();
for (Expression e : expressions) {
String column = e.tokens[0].trim();
String value = e.tokens[1];
Expand All @@ -100,16 +100,34 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext,
nodeIds = runExpression(e, columnValue, node.getNodeId(), nodes,
nodeIds, node);
}
} else if (value.equalsIgnoreCase(TokenConstants.SOURCE_NODE_ID)) {
String sourceNodeId = identity.getNodeId();
for (Node node : nodes) {
nodeIds = runExpression(e, columnValue, sourceNodeId, nodes,
nodeIds, node);
}
} else if (value.equalsIgnoreCase(TokenConstants.EXTERNAL_ID)) {
for (Node node : nodes) {
nodeIds = runExpression(e, columnValue, node.getExternalId(), nodes,
nodeIds, node);
}
} else if (value.equalsIgnoreCase(TokenConstants.SOURCE_EXTERNAL_ID)) {
String sourceExternalId = identity.getExternalId();
for (Node node : nodes) {
nodeIds = runExpression(e, columnValue, sourceExternalId, nodes,
nodeIds, node);
}
} else if (value.equalsIgnoreCase(TokenConstants.NODE_GROUP_ID)) {
for (Node node : nodes) {
nodeIds = runExpression(e, columnValue, node.getNodeGroupId(), nodes,
nodeIds, node);
}
} else if (value.equalsIgnoreCase(TokenConstants.SOURCE_NODE_GROUP_ID)) {
String sourceNodeGroupId = identity.getNodeGroupId();
for (Node node : nodes) {
nodeIds = runExpression(e, columnValue, sourceNodeGroupId, nodes,
nodeIds, node);
}
} else if (e.hasEquals && value.equalsIgnoreCase(TokenConstants.REDIRECT_NODE)) {
Map<String, String> redirectMap = getRedirectMap(routingContext);
String nodeId = redirectMap.get(columnValue);
Expand Down Expand Up @@ -247,7 +265,7 @@ protected Map<String, String> getRedirectMap(SimpleRouterContext ctx) {
Map<String, String> redirectMap = (Map<String, String>) ctx.getContextCache().get(
CTX_CACHE_KEY);
if (redirectMap == null) {
redirectMap = configurationService.getRegistrationRedirectMap();
redirectMap = engine.getConfigurationService().getRegistrationRedirectMap();
ctx.getContextCache().put(CTX_CACHE_KEY, redirectMap);
}
return redirectMap;
Expand Down
Expand Up @@ -138,8 +138,7 @@ public RouterService(ISymmetricEngine engine) {
extensionService.addExtensionPoint("lookuptable", new LookupTableDataRouter(symmetricDialect));
extensionService.addExtensionPoint("default", new DefaultDataRouter());
extensionService.addExtensionPoint("audit", new AuditTableDataRouter(engine));
extensionService.addExtensionPoint("column", new ColumnMatchDataRouter(engine.getConfigurationService(),
engine.getSymmetricDialect()));
extensionService.addExtensionPoint("column", new ColumnMatchDataRouter(engine));
extensionService.addExtensionPoint(FileSyncDataRouter.ROUTER_TYPE, new FileSyncDataRouter(engine));
extensionService.addExtensionPoint("dbf", new DBFRouter(engine));
extensionService.addExtensionPoint("tps", new TPSRouter(engine));
Expand Down

0 comments on commit 53d3709

Please sign in to comment.