From 6c09b8a4e04fe66c376e7c05b257ace9fd9a4658 Mon Sep 17 00:00:00 2001 From: chenson42 Date: Wed, 12 Aug 2009 18:03:20 +0000 Subject: [PATCH] Tested BshDataRouter. Added new, current and old data. Always capture old data. --- .../symmetric/route/AbstractDataRouter.java | 76 +++++++++++++---- .../symmetric/route/BshDataRouter.java | 53 ++++++++---- .../symmetric/route/SubSelectDataRouter.java | 2 +- .../src/main/resources/symmetric-routers.xml | 5 ++ .../service/impl/RouterServiceTest.java | 81 +++++++++++++++++++ 5 files changed, 186 insertions(+), 31 deletions(-) diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/AbstractDataRouter.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/AbstractDataRouter.java index 25dc06996a..d94e2352e3 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/AbstractDataRouter.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/AbstractDataRouter.java @@ -19,6 +19,7 @@ */ package org.jumpmind.symmetric.route; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -27,13 +28,16 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jumpmind.symmetric.db.IDbDialect; +import org.jumpmind.symmetric.model.DataEventType; import org.jumpmind.symmetric.model.DataMetaData; import org.jumpmind.symmetric.model.Node; public abstract class AbstractDataRouter implements IDataRouter { + private static final String OLD_ = "OLD_"; + protected final Log logger = LogFactory.getLog(getClass()); - + private boolean autoRegister = true; public boolean isAutoRegister() { @@ -43,7 +47,7 @@ public boolean isAutoRegister() { public void setAutoRegister(boolean autoRegister) { this.autoRegister = autoRegister; } - + protected Map getNewDataAsString(DataMetaData dataMetaData) { String[] rowData = dataMetaData.getData().getParsedRowData(); String[] columns = dataMetaData.getTriggerHistory().getParsedColumnNames(); @@ -52,21 +56,66 @@ protected Map getNewDataAsString(DataMetaData dataMetaData) { String name = columns[i]; map.put(name, rowData[i]); } - return map; + return map; } - - protected Map getNewData(DataMetaData dataMetaData, IDbDialect dbDialect) { - Map newData = new HashMap(); - String[] rowData = dataMetaData.getData().getParsedRowData(); - String[] columnNames = dataMetaData.getTriggerHistory().getParsedColumnNames(); - Object[] objects = dbDialect.getObjectValues(dbDialect.getBinaryEncoding(), dataMetaData.getTable(), columnNames, rowData); - for (int i = 0; i < columnNames.length; i++) { - newData.put(columnNames[i], objects[i]); + + protected Map getDataObjectMap(DataMetaData dataMetaData, IDbDialect dbDialect) { + Map data = null; + DataEventType dml = dataMetaData.getData().getEventType(); + switch (dml) { + case UPDATE: + data = new HashMap(dataMetaData.getTable().getColumnCount() * 2); + data.putAll(getNewData(null, dataMetaData, dbDialect)); + data.putAll(getOldData(OLD_, dataMetaData, dbDialect)); + break; + case INSERT: + data = new HashMap(dataMetaData.getTable().getColumnCount() * 2); + data.putAll(getNewData(null, dataMetaData, dbDialect)); + data.putAll(getNullData(OLD_, dataMetaData)); + break; + case DELETE: + data = new HashMap(dataMetaData.getTable().getColumnCount() * 2); + data.putAll(getOldData(null, dataMetaData, dbDialect)); + data.putAll(getOldData(OLD_, dataMetaData, dbDialect)); + break; + default: + break; } - return newData; + return data; + } + + protected Map getNewData(String prefix, DataMetaData dataMetaData, IDbDialect dbDialect) { + return getData(prefix, dataMetaData, dbDialect, dataMetaData.getData().getParsedRowData()); + } + + protected Map getOldData(String prefix, DataMetaData dataMetaData, IDbDialect dbDialect) { + return getData(prefix, dataMetaData, dbDialect, dataMetaData.getData().getParsedOldData()); } - + protected Map getNullData(String prefix, DataMetaData dataMetaData) { + String[] columnNames = dataMetaData.getTriggerHistory().getParsedColumnNames(); + Map data = new HashMap(columnNames.length); + for (String columnName : columnNames) { + data.put(prefix != null ? prefix + columnName : columnName, null); + } + return data; + } + + protected Map getData(String prefix, DataMetaData dataMetaData, IDbDialect dbDialect, + String[] rowData) { + if (rowData != null) { + Map data = new HashMap(rowData.length); + String[] columnNames = dataMetaData.getTriggerHistory().getParsedColumnNames(); + Object[] objects = dbDialect.getObjectValues(dbDialect.getBinaryEncoding(), dataMetaData.getTable(), + columnNames, rowData); + for (int i = 0; i < columnNames.length; i++) { + data.put(prefix != null ? prefix + columnNames[i] : columnNames[i], objects[i]); + } + return data; + } else { + return Collections.emptyMap(); + } + } protected Set toNodeIds(Set nodes) { Set nodeIds = new HashSet(nodes.size()); @@ -75,6 +124,5 @@ protected Set toNodeIds(Set nodes) { } return nodeIds; } - } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/BshDataRouter.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/BshDataRouter.java index 4f66648d9f..ac41506e63 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/BshDataRouter.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/BshDataRouter.java @@ -3,8 +3,10 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Set; +import org.jumpmind.symmetric.db.IDbDialect; import org.jumpmind.symmetric.model.DataMetaData; import org.jumpmind.symmetric.model.Node; @@ -12,11 +14,12 @@ import bsh.Interpreter; /** - * In Progress ... - * TODO javadoc and unit test + * In Progress ... TODO javadoc and unit test */ public class BshDataRouter extends AbstractDataRouter { + protected IDbDialect dbDialect; + public void completeBatch(IRouterContext context) { } @@ -24,25 +27,43 @@ public Collection routeToNodes(IRouterContext context, DataMetaData data boolean initialLoad) { try { Interpreter interpreter = new Interpreter(); - interpreter.set("nodes", nodes); - // set old and new and cur column values + bind(interpreter, dataMetaData, nodes); Object value = interpreter.eval(dataMetaData.getTrigger().getRouterExpression()); - if (value instanceof Boolean && value.equals(Boolean.TRUE)) { - return toNodeIds(nodes); - } else if (value instanceof Collection) { - Collection values = (Collection) value; - Set nodeIds = new HashSet(values.size()); - for (Object v : values) { - if (v != null) { - nodeIds.add(v.toString()); - } - } - return nodeIds; - } + return eval(value, nodes); } catch (EvalError e) { logger.error(e, e); } return Collections.emptySet(); } + protected Collection eval(Object value, Set nodes) { + if (value instanceof Boolean && value.equals(Boolean.TRUE)) { + return toNodeIds(nodes); + } else if (value instanceof Collection) { + Collection values = (Collection) value; + Set nodeIds = new HashSet(values.size()); + for (Object v : values) { + if (v != null) { + nodeIds.add(v.toString()); + } + } + return nodeIds; + } else { + return Collections.emptySet(); + } + } + + protected void bind(Interpreter interpreter, DataMetaData dataMetaData, Set nodes) throws EvalError { + interpreter.set("nodes", nodes); + Map params = getDataObjectMap(dataMetaData, dbDialect); + if (params != null) { + for (String param : params.keySet()) { + interpreter.set(param, params.get(param)); + } + } + } + + public void setDbDialect(IDbDialect dbDialect) { + this.dbDialect = dbDialect; + } } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/SubSelectDataRouter.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/SubSelectDataRouter.java index 4cf21a4efa..c0a3a3d217 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/SubSelectDataRouter.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/SubSelectDataRouter.java @@ -49,7 +49,7 @@ public Collection routeToNodes(IRouterContext routingContext, DataMetaDa Collection nodeIds = null; if (!StringUtils.isBlank(subSelect)) { SimpleJdbcTemplate simpleTemplate = new SimpleJdbcTemplate(jdbcTemplate); - Map sqlParams = getNewData(dataMetaData, dbDialect); + Map sqlParams = getDataObjectMap(dataMetaData, dbDialect); sqlParams.put("NODE_GROUP_ID", trigger.getTargetGroupId()); nodeIds = simpleTemplate.query(String.format("%s%s", sql, subSelect), new ParameterizedSingleColumnRowMapper(), sqlParams); } else { diff --git a/symmetric/src/main/resources/symmetric-routers.xml b/symmetric/src/main/resources/symmetric-routers.xml index cfb3471b63..c8f1abb84e 100644 --- a/symmetric/src/main/resources/symmetric-routers.xml +++ b/symmetric/src/main/resources/symmetric-routers.xml @@ -15,6 +15,11 @@ + + + + + diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RouterServiceTest.java b/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RouterServiceTest.java index 29b4e67e10..23caca5845 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RouterServiceTest.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RouterServiceTest.java @@ -236,6 +236,85 @@ public void testLargeNumberOfEventsToManyNodes() { getRoutingService().routeData(); logger.info("Done routing data"); } + + @Test + public void testBshTransactionalRoutingOnUpdate() { + resetBatches(); + + Trigger trigger1 = getTestRoutingTableTrigger(TEST_TABLE_1); + trigger1.setRouterName("bsh"); + trigger1.setRouterExpression("HashSet set = new HashSet(2); set.add(ROUTING_VARCHAR); set.add(OLD_ROUTING_VARCHAR); return set;"); + + getTriggerService().saveTrigger(trigger1); + getTriggerService().syncTriggers(); + + NodeChannel testChannel = getConfigurationService().getChannel(TestConstants.TEST_CHANNEL_ID); + testChannel.setMaxBatchToSend(1000); + testChannel.setMaxBatchSize(5); + testChannel.setBatchAlgorithm("transactional"); + getConfigurationService().saveChannel(testChannel); + + SimpleJdbcTemplate t = new SimpleJdbcTemplate(getJdbcTemplate()); + int count = t.update(String.format("update %s set ROUTING_VARCHAR=?", TEST_TABLE_1), + NODE_GROUP_NODE_3); + logger.info("Just recorded a change to " + count + " rows in " + TEST_TABLE_1); + getRoutingService().routeData(); + + List batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1); + filterForChannels(batches, testChannel); + Assert.assertEquals(1, batches.size()); + Assert.assertEquals(count, (int)batches.get(0).getDataEventCount()); + + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2); + filterForChannels(batches, testChannel); + // Node 2 has sync disabled + Assert.assertEquals(0, batches.size()); + + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3); + filterForChannels(batches, testChannel); + Assert.assertEquals(1, batches.size()); + Assert.assertEquals(count, (int)batches.get(0).getDataEventCount()); + + resetBatches(); + } + + @Test + public void testBshTransactionalRoutingInsert() { + resetBatches(); + + Trigger trigger1 = getTestRoutingTableTrigger(TEST_TABLE_1); + trigger1.setRouterName("bsh"); + trigger1.setRouterExpression("HashSet set = new HashSet(2); set.add(ROUTING_VARCHAR); set.add(OLD_ROUTING_VARCHAR); return set;"); + + getTriggerService().saveTrigger(trigger1); + getTriggerService().syncTriggers(); + + NodeChannel testChannel = getConfigurationService().getChannel(TestConstants.TEST_CHANNEL_ID); + testChannel.setMaxBatchToSend(1000); + testChannel.setMaxBatchSize(5); + testChannel.setBatchAlgorithm("transactional"); + getConfigurationService().saveChannel(testChannel); + + insert(TEST_TABLE_1, 5, true); + getRoutingService().routeData(); + + List batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1); + filterForChannels(batches, testChannel); + Assert.assertEquals(1, batches.size()); + Assert.assertEquals(5, (int)batches.get(0).getDataEventCount()); + + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2); + filterForChannels(batches, testChannel); + // Node 2 has sync disabled + Assert.assertEquals(0, batches.size()); + + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3); + filterForChannels(batches, testChannel); + // Batch was targeted only at node 1 + Assert.assertEquals(0, batches.size()); + + resetBatches(); + } protected Trigger getTestRoutingTableTrigger(String tableName) { Trigger trigger = getTriggerService().getTriggerFor(tableName, TestConstants.TEST_ROOT_NODE_GROUP); @@ -314,5 +393,7 @@ protected void doInTransactionWithoutResult(TransactionStatus status) { callback.doInTransaction(null); } } + + }