Skip to content

Commit

Permalink
Tested BshDataRouter. Added new, current and old data. Always capture…
Browse files Browse the repository at this point in the history
… old data.
  • Loading branch information
chenson42 committed Aug 12, 2009
1 parent da4dcc4 commit 6c09b8a
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 31 deletions.
Expand Up @@ -19,6 +19,7 @@
*/
package org.jumpmind.symmetric.route;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -27,13 +28,16 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.db.IDbDialect;
import org.jumpmind.symmetric.model.DataEventType;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.Node;

public abstract class AbstractDataRouter implements IDataRouter {

private static final String OLD_ = "OLD_";

protected final Log logger = LogFactory.getLog(getClass());

private boolean autoRegister = true;

public boolean isAutoRegister() {
Expand All @@ -43,7 +47,7 @@ public boolean isAutoRegister() {
public void setAutoRegister(boolean autoRegister) {
this.autoRegister = autoRegister;
}

protected Map<String, String> getNewDataAsString(DataMetaData dataMetaData) {
String[] rowData = dataMetaData.getData().getParsedRowData();
String[] columns = dataMetaData.getTriggerHistory().getParsedColumnNames();
Expand All @@ -52,21 +56,66 @@ protected Map<String, String> getNewDataAsString(DataMetaData dataMetaData) {
String name = columns[i];
map.put(name, rowData[i]);
}
return map;
return map;
}

protected Map<String, Object> getNewData(DataMetaData dataMetaData, IDbDialect dbDialect) {
Map<String, Object> newData = new HashMap<String, Object>();
String[] rowData = dataMetaData.getData().getParsedRowData();
String[] columnNames = dataMetaData.getTriggerHistory().getParsedColumnNames();
Object[] objects = dbDialect.getObjectValues(dbDialect.getBinaryEncoding(), dataMetaData.getTable(), columnNames, rowData);
for (int i = 0; i < columnNames.length; i++) {
newData.put(columnNames[i], objects[i]);

protected Map<String, Object> getDataObjectMap(DataMetaData dataMetaData, IDbDialect dbDialect) {
Map<String, Object> data = null;
DataEventType dml = dataMetaData.getData().getEventType();
switch (dml) {
case UPDATE:
data = new HashMap<String, Object>(dataMetaData.getTable().getColumnCount() * 2);
data.putAll(getNewData(null, dataMetaData, dbDialect));
data.putAll(getOldData(OLD_, dataMetaData, dbDialect));
break;
case INSERT:
data = new HashMap<String, Object>(dataMetaData.getTable().getColumnCount() * 2);
data.putAll(getNewData(null, dataMetaData, dbDialect));
data.putAll(getNullData(OLD_, dataMetaData));
break;
case DELETE:
data = new HashMap<String, Object>(dataMetaData.getTable().getColumnCount() * 2);
data.putAll(getOldData(null, dataMetaData, dbDialect));
data.putAll(getOldData(OLD_, dataMetaData, dbDialect));
break;
default:
break;
}
return newData;
return data;
}

protected Map<String, Object> getNewData(String prefix, DataMetaData dataMetaData, IDbDialect dbDialect) {
return getData(prefix, dataMetaData, dbDialect, dataMetaData.getData().getParsedRowData());
}

protected Map<String, Object> getOldData(String prefix, DataMetaData dataMetaData, IDbDialect dbDialect) {
return getData(prefix, dataMetaData, dbDialect, dataMetaData.getData().getParsedOldData());
}


protected Map<String, Object> getNullData(String prefix, DataMetaData dataMetaData) {
String[] columnNames = dataMetaData.getTriggerHistory().getParsedColumnNames();
Map<String, Object> data = new HashMap<String, Object>(columnNames.length);
for (String columnName : columnNames) {
data.put(prefix != null ? prefix + columnName : columnName, null);
}
return data;
}

protected Map<String, Object> getData(String prefix, DataMetaData dataMetaData, IDbDialect dbDialect,
String[] rowData) {
if (rowData != null) {
Map<String, Object> data = new HashMap<String, Object>(rowData.length);
String[] columnNames = dataMetaData.getTriggerHistory().getParsedColumnNames();
Object[] objects = dbDialect.getObjectValues(dbDialect.getBinaryEncoding(), dataMetaData.getTable(),
columnNames, rowData);
for (int i = 0; i < columnNames.length; i++) {
data.put(prefix != null ? prefix + columnNames[i] : columnNames[i], objects[i]);
}
return data;
} else {
return Collections.emptyMap();
}
}

protected Set<String> toNodeIds(Set<Node> nodes) {
Set<String> nodeIds = new HashSet<String>(nodes.size());
Expand All @@ -75,6 +124,5 @@ protected Set<String> toNodeIds(Set<Node> nodes) {
}
return nodeIds;
}


}
Expand Up @@ -3,46 +3,67 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.jumpmind.symmetric.db.IDbDialect;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.Node;

import bsh.EvalError;
import bsh.Interpreter;

/**
* In Progress ...
* TODO javadoc and unit test
* In Progress ... TODO javadoc and unit test
*/
public class BshDataRouter extends AbstractDataRouter {

protected IDbDialect dbDialect;

public void completeBatch(IRouterContext context) {
}

public Collection<String> routeToNodes(IRouterContext context, DataMetaData dataMetaData, Set<Node> nodes,
boolean initialLoad) {
try {
Interpreter interpreter = new Interpreter();
interpreter.set("nodes", nodes);
// set old and new and cur column values
bind(interpreter, dataMetaData, nodes);
Object value = interpreter.eval(dataMetaData.getTrigger().getRouterExpression());
if (value instanceof Boolean && value.equals(Boolean.TRUE)) {
return toNodeIds(nodes);
} else if (value instanceof Collection<?>) {
Collection<?> values = (Collection<?>) value;
Set<String> nodeIds = new HashSet<String>(values.size());
for (Object v : values) {
if (v != null) {
nodeIds.add(v.toString());
}
}
return nodeIds;
}
return eval(value, nodes);
} catch (EvalError e) {
logger.error(e, e);
}
return Collections.emptySet();
}

protected Collection<String> eval(Object value, Set<Node> nodes) {
if (value instanceof Boolean && value.equals(Boolean.TRUE)) {
return toNodeIds(nodes);
} else if (value instanceof Collection<?>) {
Collection<?> values = (Collection<?>) value;
Set<String> nodeIds = new HashSet<String>(values.size());
for (Object v : values) {
if (v != null) {
nodeIds.add(v.toString());
}
}
return nodeIds;
} else {
return Collections.emptySet();
}
}

protected void bind(Interpreter interpreter, DataMetaData dataMetaData, Set<Node> nodes) throws EvalError {
interpreter.set("nodes", nodes);
Map<String, Object> params = getDataObjectMap(dataMetaData, dbDialect);
if (params != null) {
for (String param : params.keySet()) {
interpreter.set(param, params.get(param));
}
}
}

public void setDbDialect(IDbDialect dbDialect) {
this.dbDialect = dbDialect;
}
}
Expand Up @@ -49,7 +49,7 @@ public Collection<String> routeToNodes(IRouterContext routingContext, DataMetaDa
Collection<String> nodeIds = null;
if (!StringUtils.isBlank(subSelect)) {
SimpleJdbcTemplate simpleTemplate = new SimpleJdbcTemplate(jdbcTemplate);
Map<String, Object> sqlParams = getNewData(dataMetaData, dbDialect);
Map<String, Object> sqlParams = getDataObjectMap(dataMetaData, dbDialect);
sqlParams.put("NODE_GROUP_ID", trigger.getTargetGroupId());
nodeIds = simpleTemplate.query(String.format("%s%s", sql, subSelect), new ParameterizedSingleColumnRowMapper<String>(), sqlParams);
} else {
Expand Down
5 changes: 5 additions & 0 deletions symmetric/src/main/resources/symmetric-routers.xml
Expand Up @@ -15,6 +15,11 @@
<property name="dbDialect" ref="dbDialect" />
</bean>
</entry>
<entry key="bsh">
<bean class="org.jumpmind.symmetric.route.BshDataRouter">
<property name="dbDialect" ref="dbDialect" />
</bean>
</entry>
<entry key="column">
<bean class="org.jumpmind.symmetric.route.ColumnMatchDataRouter">
<property name="registrationService" ref="registrationService" />
Expand Down
Expand Up @@ -236,6 +236,85 @@ public void testLargeNumberOfEventsToManyNodes() {
getRoutingService().routeData();
logger.info("Done routing data");
}

@Test
public void testBshTransactionalRoutingOnUpdate() {
resetBatches();

Trigger trigger1 = getTestRoutingTableTrigger(TEST_TABLE_1);
trigger1.setRouterName("bsh");
trigger1.setRouterExpression("HashSet set = new HashSet(2); set.add(ROUTING_VARCHAR); set.add(OLD_ROUTING_VARCHAR); return set;");

getTriggerService().saveTrigger(trigger1);
getTriggerService().syncTriggers();

NodeChannel testChannel = getConfigurationService().getChannel(TestConstants.TEST_CHANNEL_ID);
testChannel.setMaxBatchToSend(1000);
testChannel.setMaxBatchSize(5);
testChannel.setBatchAlgorithm("transactional");
getConfigurationService().saveChannel(testChannel);

SimpleJdbcTemplate t = new SimpleJdbcTemplate(getJdbcTemplate());
int count = t.update(String.format("update %s set ROUTING_VARCHAR=?", TEST_TABLE_1),
NODE_GROUP_NODE_3);
logger.info("Just recorded a change to " + count + " rows in " + TEST_TABLE_1);
getRoutingService().routeData();

List<OutgoingBatch> batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1);
filterForChannels(batches, testChannel);
Assert.assertEquals(1, batches.size());
Assert.assertEquals(count, (int)batches.get(0).getDataEventCount());

batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2);
filterForChannels(batches, testChannel);
// Node 2 has sync disabled
Assert.assertEquals(0, batches.size());

batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3);
filterForChannels(batches, testChannel);
Assert.assertEquals(1, batches.size());
Assert.assertEquals(count, (int)batches.get(0).getDataEventCount());

resetBatches();
}

@Test
public void testBshTransactionalRoutingInsert() {
resetBatches();

Trigger trigger1 = getTestRoutingTableTrigger(TEST_TABLE_1);
trigger1.setRouterName("bsh");
trigger1.setRouterExpression("HashSet set = new HashSet(2); set.add(ROUTING_VARCHAR); set.add(OLD_ROUTING_VARCHAR); return set;");

getTriggerService().saveTrigger(trigger1);
getTriggerService().syncTriggers();

NodeChannel testChannel = getConfigurationService().getChannel(TestConstants.TEST_CHANNEL_ID);
testChannel.setMaxBatchToSend(1000);
testChannel.setMaxBatchSize(5);
testChannel.setBatchAlgorithm("transactional");
getConfigurationService().saveChannel(testChannel);

insert(TEST_TABLE_1, 5, true);
getRoutingService().routeData();

List<OutgoingBatch> batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1);
filterForChannels(batches, testChannel);
Assert.assertEquals(1, batches.size());
Assert.assertEquals(5, (int)batches.get(0).getDataEventCount());

batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2);
filterForChannels(batches, testChannel);
// Node 2 has sync disabled
Assert.assertEquals(0, batches.size());

batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3);
filterForChannels(batches, testChannel);
// Batch was targeted only at node 1
Assert.assertEquals(0, batches.size());

resetBatches();
}

protected Trigger getTestRoutingTableTrigger(String tableName) {
Trigger trigger = getTriggerService().getTriggerFor(tableName, TestConstants.TEST_ROOT_NODE_GROUP);
Expand Down Expand Up @@ -314,5 +393,7 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
callback.doInTransaction(null);
}
}



}

0 comments on commit 6c09b8a

Please sign in to comment.