Skip to content

Commit

Permalink
Allow for multiple column matcher definitions, deliminated by line fe…
Browse files Browse the repository at this point in the history
…eds.
  • Loading branch information
chenson42 committed Jan 11, 2010
1 parent 25f7e8c commit 658ceae
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 86 deletions.
Expand Up @@ -48,7 +48,7 @@ public boolean isAutoRegister() {
public void setAutoRegister(boolean autoRegister) {
this.autoRegister = autoRegister;
}

protected Map<String, String> getDataMap(DataMetaData dataMetaData) {
Map<String, String> data = null;
DataEventType dml = dataMetaData.getData().getEventType();
Expand All @@ -62,7 +62,7 @@ protected Map<String, String> getDataMap(DataMetaData dataMetaData) {
case INSERT:
data = new HashMap<String, String>(dataMetaData.getTable().getColumnCount() * 2);
data.putAll(getNewDataAsString(null, dataMetaData));
Map<String,String> map = getNullData(OLD_, dataMetaData);
Map<String, String> map = getNullData(OLD_, dataMetaData);
data.putAll(map);
data.put("EXTERNAL_DATA", dataMetaData.getData().getExternalData());
break;
Expand All @@ -82,13 +82,14 @@ protected Map<String, String> getNewDataAsString(String prefix, DataMetaData dat
String[] rowData = dataMetaData.getData().toParsedRowData();
return getDataAsString(prefix, dataMetaData, rowData);
}

protected Map<String, String> getOldDataAsString(String prefix, DataMetaData dataMetaData) {
String[] rowData = dataMetaData.getData().toParsedOldData();
return getDataAsString(prefix, dataMetaData, rowData);
}

protected Map<String, String> getDataAsString(String prefix, DataMetaData dataMetaData, String[] rowData) {

protected Map<String, String> getDataAsString(String prefix, DataMetaData dataMetaData,
String[] rowData) {
String[] columns = dataMetaData.getTriggerHistory().getParsedColumnNames();
Map<String, String> map = new HashMap<String, String>(columns.length);
for (int i = 0; i < columns.length; i++) {
Expand All @@ -110,7 +111,7 @@ protected Map<String, Object> getDataObjectMap(DataMetaData dataMetaData, IDbDia
case INSERT:
data = new HashMap<String, Object>(dataMetaData.getTable().getColumnCount() * 2);
data.putAll(getNewDataAsObject(null, dataMetaData, dbDialect));
Map<String,Object> map = getNullData(OLD_, dataMetaData);
Map<String, Object> map = getNullData(OLD_, dataMetaData);
data.putAll(map);
break;
case DELETE:
Expand All @@ -124,12 +125,16 @@ protected Map<String, Object> getDataObjectMap(DataMetaData dataMetaData, IDbDia
return data;
}

protected Map<String, Object> getNewDataAsObject(String prefix, DataMetaData dataMetaData, IDbDialect dbDialect) {
return getDataAsObject(prefix, dataMetaData, dbDialect, dataMetaData.getData().toParsedRowData());
protected Map<String, Object> getNewDataAsObject(String prefix, DataMetaData dataMetaData,
IDbDialect dbDialect) {
return getDataAsObject(prefix, dataMetaData, dbDialect, dataMetaData.getData()
.toParsedRowData());
}

protected Map<String, Object> getOldDataAsObject(String prefix, DataMetaData dataMetaData, IDbDialect dbDialect) {
return getDataAsObject(prefix, dataMetaData, dbDialect, dataMetaData.getData().toParsedOldData());
protected Map<String, Object> getOldDataAsObject(String prefix, DataMetaData dataMetaData,
IDbDialect dbDialect) {
return getDataAsObject(prefix, dataMetaData, dbDialect, dataMetaData.getData()
.toParsedOldData());
}

protected <T> Map<String, T> getNullData(String prefix, DataMetaData dataMetaData) {
Expand All @@ -141,24 +146,31 @@ protected <T> Map<String, T> getNullData(String prefix, DataMetaData dataMetaDat
return data;
}

protected Map<String, Object> getDataAsObject(String prefix, DataMetaData dataMetaData, IDbDialect dbDialect,
String[] rowData) {
protected Map<String, Object> getDataAsObject(String prefix, DataMetaData dataMetaData,
IDbDialect dbDialect, String[] rowData) {
if (rowData != null) {
Map<String, Object> data = new HashMap<String, Object>(rowData.length);
String[] columnNames = dataMetaData.getTriggerHistory().getParsedColumnNames();
Object[] objects = dbDialect.getObjectValues(dbDialect.getBinaryEncoding(), dataMetaData.getTable(),
columnNames, rowData);
Object[] objects = dbDialect.getObjectValues(dbDialect.getBinaryEncoding(),
dataMetaData.getTable(), columnNames, rowData);
for (int i = 0; i < columnNames.length; i++) {
data.put(prefix != null ? (prefix + columnNames[i]).toUpperCase() : columnNames[i].toUpperCase(), objects[i]);
data.put(prefix != null ? (prefix + columnNames[i]).toUpperCase() : columnNames[i]
.toUpperCase(), objects[i]);
}
return data;
} else {
return Collections.emptyMap();
}
}

protected Set<String> toNodeIds(Set<Node> nodes) {
Set<String> nodeIds = new HashSet<String>(nodes.size());
protected Set<String> addNodeId(String nodeId, Set<String> nodeIds) {
nodeIds = nodeIds == null ? new HashSet<String>(1) : nodeIds;
nodeIds.add(nodeId);
return nodeIds;
}

protected Set<String> toNodeIds(Set<Node> nodes, Set<String> nodeIds) {
nodeIds = nodeIds == null ? new HashSet<String>(nodes.size()) : nodeIds;
for (Node node : nodes) {
nodeIds.add(node.getNodeId());
}
Expand Down
Expand Up @@ -25,6 +25,8 @@
public class BshDataRouter extends AbstractDataRouter {

protected IDbDialect dbDialect;

final static String INTERPRETER_KEY = String.format("%s.Interpreter", BshDataRouter.class.getName());

public Collection<String> routeToNodes(IRouterContext context, DataMetaData dataMetaData, Set<Node> nodes,
boolean initialLoad) {
Expand All @@ -47,11 +49,10 @@ public Collection<String> routeToNodes(IRouterContext context, DataMetaData data
}

protected Interpreter getInterpreter(IRouterContext context) {
final String KEY = String.format("%s.Interpreter", getClass().getName());
Interpreter interpreter = (Interpreter) context.getContextCache().get(KEY);
Interpreter interpreter = (Interpreter) context.getContextCache().get(INTERPRETER_KEY);
if (interpreter == null) {
interpreter = new Interpreter();
context.getContextCache().put(KEY, interpreter);
context.getContextCache().put(INTERPRETER_KEY, interpreter);
}
return interpreter;
}
Expand All @@ -69,7 +70,7 @@ protected Collection<String> eval(Object value, Set<Node> nodes, Set<String> tar
}
return nodeIds;
} else if (value instanceof Boolean && value.equals(Boolean.TRUE)) {
return toNodeIds(nodes);
return toNodeIds(nodes, null);
} else {
return Collections.emptySet();
}
Expand Down
Expand Up @@ -19,8 +19,9 @@
*/
package org.jumpmind.symmetric.route;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -69,85 +70,109 @@
*
*/
public class ColumnMatchDataRouter extends AbstractDataRouter implements IDataRouter {

private IRegistrationService registrationService;

final static String EXPRESSION_KEY = String.format("%s.Expression", ColumnMatchDataRouter.class
.getName());

public Collection<String> routeToNodes(IRouterContext routingContext,
DataMetaData dataMetaData, Set<Node> nodes, boolean initialLoad) {
Collection<String> nodeIds = null;
String expression = dataMetaData.getTrigger().getRouter().getRouterExpression();
if (!StringUtils.isBlank(expression)) {
Map<String, String> columnValues = getDataMap(dataMetaData);
String[] tokens = null;
boolean equals = !expression.contains("!=");
if (!equals) {
tokens = expression.split("!=");
} else {
tokens = expression.split("=");
}

if (tokens.length == 2) {
String column = tokens[0];
String value = tokens[1];
if (value.equalsIgnoreCase(":NODE_ID")) {
nodeIds = new HashSet<String>();
for (Node node : nodes) {
if (equals && node.getNodeId().equals(columnValues.get(column))) {
nodeIds.add(node.getNodeId());
}
}
} else if (value.equalsIgnoreCase(":EXTERNAL_ID")) {
nodeIds = new HashSet<String>();
for (Node node : nodes) {
if (equals && node.getExternalId().equals(columnValues.get(column))) {
nodeIds.add(node.getNodeId());
}
Set<String> nodeIds = null;
List<Expression> expressions = getExpression(dataMetaData.getTrigger().getRouter()
.getRouterExpression(), routingContext);
Map<String, String> columnValues = getDataMap(dataMetaData);
for (Expression e : expressions) {
String column = e.tokens[0];
String value = e.tokens[1];
if (value.equalsIgnoreCase(":NODE_ID")) {
for (Node node : nodes) {
if (e.equals && node.getNodeId().equals(columnValues.get(column))) {
nodeIds = addNodeId(node.getNodeId(), nodeIds);
}
} else if (value.equalsIgnoreCase(":NODE_GROUP_ID")) {
nodeIds = new HashSet<String>();
for (Node node : nodes) {
if (equals && node.getNodeGroupId().equals(columnValues.get(column))) {
nodeIds.add(node.getNodeId());
}
}
} else if (equals && value.equalsIgnoreCase(":REDIRECT_NODE")) {
nodeIds = new HashSet<String>();
Map<String, String> redirectMap = getRedirectMap(routingContext);
String nodeId = redirectMap.get(columnValues.get(column));
if (nodeId != null) {
nodeIds.add(nodeId);
}
} else if (value.startsWith(":")) {
String firstValue = columnValues.get(column);
String secondValue = columnValues.get(value.substring(1));
if (equals
&& ((firstValue == null && secondValue == null) || (firstValue != null
&& secondValue != null && firstValue.equals(secondValue)))) {
nodeIds = toNodeIds(nodes);
} else if (!equals
&& ((firstValue != null && secondValue == null)
|| (firstValue == null && secondValue != null) || (firstValue != null
&& secondValue != null && !firstValue.equals(secondValue)))) {
nodeIds = toNodeIds(nodes);
}
} else if (value.equalsIgnoreCase(":EXTERNAL_ID")) {
for (Node node : nodes) {
if (e.equals && node.getExternalId().equals(columnValues.get(column))) {
nodeIds = addNodeId(node.getNodeId(), nodeIds);
}
} else {
if (equals && value.equals(columnValues.get(column))) {
nodeIds = toNodeIds(nodes);
} else if (!equals && !value.equals(columnValues.get(column))) {
nodeIds = toNodeIds(nodes);
}
} else if (value.equalsIgnoreCase(":NODE_GROUP_ID")) {
for (Node node : nodes) {
if (e.equals && node.getNodeGroupId().equals(columnValues.get(column))) {
nodeIds = addNodeId(node.getNodeId(), nodeIds);
}
}
} else if (e.equals && value.equalsIgnoreCase(":REDIRECT_NODE")) {
Map<String, String> redirectMap = getRedirectMap(routingContext);
String nodeId = redirectMap.get(columnValues.get(column));
if (nodeId != null) {
nodeIds = addNodeId(nodeId, nodeIds);
}
} else if (value.startsWith(":")) {
String firstValue = columnValues.get(column);
String secondValue = columnValues.get(value.substring(1));
if (e.equals
&& ((firstValue == null && secondValue == null) || (firstValue != null
&& secondValue != null && firstValue.equals(secondValue)))) {
nodeIds = toNodeIds(nodes, nodeIds);
} else if (!e.equals
&& ((firstValue != null && secondValue == null)
|| (firstValue == null && secondValue != null) || (firstValue != null
&& secondValue != null && !firstValue.equals(secondValue)))) {
nodeIds = toNodeIds(nodes, nodeIds);
}
} else {
log.warn("RouterIllegalColumnMatchExpression", expression);
if (e.equals && value.equals(columnValues.get(column))) {
nodeIds = toNodeIds(nodes, nodeIds);
} else if (!e.equals && !value.equals(columnValues.get(column))) {
nodeIds = toNodeIds(nodes, nodeIds);
}
}

} else {
nodeIds = toNodeIds(nodes);
}

return nodeIds;

}

/**
* Cache parsed expressions in the context to minimize the amount of parsing we
* have to do when we have lots of throughput.
*/
@SuppressWarnings("unchecked")
protected List<Expression> getExpression(String routerExpression, IRouterContext context) {
List<Expression> expressions = (List<Expression>) context.getContextCache().get(
EXPRESSION_KEY);
if (expressions == null) {
expressions = new ArrayList<Expression>();
if (!StringUtils.isBlank(routerExpression)) {
context.getContextCache().put(EXPRESSION_KEY, expressions);
String[] expTokens = routerExpression.split("\r\n|\r|\n");
if (expTokens != null) {
for (String t : expTokens) {
if (!StringUtils.isBlank(t)) {
String[] tokens = null;
boolean equals = !routerExpression.contains("!=");
if (!equals) {
tokens = routerExpression.split("!=");
} else {
tokens = routerExpression.split("=");
}
if (tokens.length == 2) {
expressions.add(new Expression(equals, tokens));
} else {
log.warn("RouterIllegalColumnMatchExpression", routerExpression);
}

}
}
}
}
}
return expressions;
}

@SuppressWarnings("unchecked")
protected Map<String, String> getRedirectMap(IRouterContext ctx) {
final String CTX_CACHE_KEY = ColumnMatchDataRouter.class.getSimpleName() + "RouterMap";
Expand All @@ -163,4 +188,14 @@ protected Map<String, String> getRedirectMap(IRouterContext ctx) {
public void setRegistrationService(IRegistrationService registrationService) {
this.registrationService = registrationService;
}

class Expression {
boolean equals;
String[] tokens;

public Expression(boolean equals, String[] tokens) {
this.equals = equals;
this.tokens = tokens;
}
}
}
Expand Up @@ -30,7 +30,7 @@ public class DefaultDataRouter extends AbstractDataRouter {

public Collection<String> routeToNodes(IRouterContext routingContext, DataMetaData dataMetaData, Set<Node> nodes,
boolean initialLoad) {
return toNodeIds(nodes);
return toNodeIds(nodes, null);
}

public void completeBatch(IRouterContext context, OutgoingBatch batch) {
Expand Down
Expand Up @@ -71,7 +71,7 @@ public Collection<String> routeToNodes(IRouterContext routingContext, DataMetaDa
nodeIds = simpleTemplate.query(String.format("%s%s", sql, subSelect),
new SingleColumnRowMapper<String>(), sqlParams);
} else {
nodeIds = toNodeIds(nodes);
nodeIds = toNodeIds(nodes, null);
}
return nodeIds;
}
Expand Down

0 comments on commit 658ceae

Please sign in to comment.