diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/common/Constants.java b/symmetric/src/main/java/org/jumpmind/symmetric/common/Constants.java index 09fe489229..49100df3d7 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/common/Constants.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/common/Constants.java @@ -140,7 +140,7 @@ private Constants() { public static final String OUTGOING_BATCH_SERVICE = "outgoingBatchService"; - public static final String OUTGOING_BATCH_HISTORY_SERVICE = "outgoingBatchHistoryService"; + public static final String TRANSACTION_TEMPLATE = "transactionTemplate"; public static final String PURGE_SERVICE = "purgeService"; diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/db/SqlTemplate.java b/symmetric/src/main/java/org/jumpmind/symmetric/db/SqlTemplate.java index 40fcc8a1c4..e8c77bc3ff 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/db/SqlTemplate.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/db/SqlTemplate.java @@ -119,12 +119,7 @@ public String createInitalLoadSql(Node node, IDbDialect dialect, Trigger trig, T } public String createPurgeSql(Node node, IDbDialect dialect, Trigger trig, TriggerHistory hist) { - // TODO: during reload, purge table using initial_load_select clause - String sql = "delete from " + getDefaultTargetTableName(trig, hist); - // + " where " + trig.getInitialLoadSelect(); - // sql = replace("groupId", node.getNodeGroupId(), sql); - // sql = replace("externalId", node.getExternalId(), sql); - return sql; + return "delete from " + getDefaultTargetTableName(trig, hist); } public String createCsvDataSql(IDbDialect dialect, Trigger trig, Table metaData, String whereClause) { diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/IDataRouter.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/IDataRouter.java index b794310b04..78834211fa 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/IDataRouter.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/IDataRouter.java @@ -36,6 +36,8 @@ * each data router is configured using the routing_expression according to its implementation. * * @since 2.0 + * @see SubSelectDataRouter + * @see ColumnMatchDataRouter */ public interface IDataRouter extends IExtensionPoint { diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/IRoutingContext.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/IRoutingContext.java index 63570aacb2..9806dc3eb8 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/IRoutingContext.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/IRoutingContext.java @@ -32,6 +32,9 @@ public interface IRoutingContext { + /** + * Get the same template that is being used for inserts into data_event for routing. + */ public JdbcTemplate getJdbcTemplate(); public NodeChannel getChannel(); @@ -57,9 +60,9 @@ public interface IRoutingContext { public void setNeedsCommitted(boolean b); public void resetForNextData(); - + public void setEncountedTransactionBoundary(boolean encountedTransactionBoundary); - - public boolean isEncountedTransactionBoundary(); + + public boolean isEncountedTransactionBoundary(); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java index e3f8ba7afc..616a8721e3 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java @@ -88,6 +88,6 @@ public interface IConfigurationService { public Map getHistoryRecords(); - public void insert(Trigger trigger); + public void saveTrigger(Trigger trigger); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java index 376ff6e614..1c374ce1b0 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java @@ -52,7 +52,7 @@ public class ConfigurationService extends AbstractService implements IConfigurat private static final long MAX_CHANNEL_CACHE_TIME = 60000; - private static List channelCache; + private static List channelCache; private static long channelCacheTime; @@ -63,10 +63,9 @@ public class ConfigurationService extends AbstractService implements IConfigurat private IDbDialect dbDialect; private String tablePrefix; - + /** - * Cache the history for performance. History never changes and does not - * grow big so this should be OK. + * Cache the history for performance. History never changes and does not grow big so this should be OK. */ private HashMap historyMap = new HashMap(); @@ -90,7 +89,8 @@ public List getRootConfigChannelTableNames() { public void saveChannel(Channel channel) { if (0 == jdbcTemplate.update(getSql("updateChannelSql"), new Object[] { channel.getProcessingOrder(), - channel.getMaxBatchSize(), channel.getMaxBatchToSend(), channel.isEnabled() ? 1 : 0, channel.getId(), channel.getBatchAlgorithm() })) { + channel.getMaxBatchSize(), channel.getMaxBatchToSend(), channel.isEnabled() ? 1 : 0, channel.getId(), + channel.getBatchAlgorithm() })) { jdbcTemplate.update(getSql("insertChannelSql"), new Object[] { channel.getId(), channel.getProcessingOrder(), channel.getMaxBatchSize(), channel.getMaxBatchToSend(), channel.isEnabled() ? 1 : 0, channel.getBatchAlgorithm() }); @@ -111,12 +111,12 @@ protected List getConfigurationTriggers(String sourceGroupId, String ta List triggers = new ArrayList(tables.size()); for (int j = 0; j < tables.size(); j++) { String tableName = tables.get(j); - boolean syncChanges = !TableConstants.getNodeTablesAsSet(tablePrefix).contains(tableName); + boolean syncChanges = !TableConstants.getNodeTablesAsSet(tablePrefix).contains(tableName); Trigger trigger = buildConfigTrigger(tableName, syncChanges, sourceGroupId, targetGroupId); trigger.setInitialLoadOrder(initialLoadOrder++); // TODO Set data router to replace the routing done by the node select - //String initialLoadSelect = rootConfigChannelInitialLoadSelect.get(tableName); - //trigger.setInitialLoadSelect(initialLoadSelect); + // String initialLoadSelect = rootConfigChannelInitialLoadSelect.get(tableName); + // trigger.setInitialLoadSelect(initialLoadSelect); triggers.add(trigger); } return triggers; @@ -138,11 +138,10 @@ protected Trigger buildConfigTrigger(String tableName, boolean syncChanges, Stri trigger.setChannelId(Constants.CHANNEL_CONFIG); // little trick to force the rebuild of sym triggers every time // there is a new version of symmetricds - trigger.setLastModifiedTime(new Date(Version.version().hashCode())); + trigger.setLastModifiedTime(new Date(Version.version().hashCode())); return trigger; } - public NodeChannel getChannel(String channelId) { List channels = getChannels(); for (NodeChannel nodeChannel : channels) { @@ -152,7 +151,7 @@ public NodeChannel getChannel(String channelId) { } return null; } - + @SuppressWarnings("unchecked") public List getChannels() { if (System.currentTimeMillis() - channelCacheTime >= MAX_CHANNEL_CACHE_TIME || channelCache == null) { @@ -196,8 +195,7 @@ public DataEventAction getDataEventActionsByGroupId(String sourceGroupId, String } /** - * Create triggers on SymmetricDS tables so changes to configuration can be - * synchronized. + * Create triggers on SymmetricDS tables so changes to configuration can be synchronized. */ protected List getConfigurationTriggers(String sourceNodeGroupId) { List triggers = new ArrayList(); @@ -209,10 +207,11 @@ protected List getConfigurationTriggers(String sourceNodeGroupId) { } else if (nodeGroupLink.getDataEventAction().equals(DataEventAction.PUSH)) { triggers.add(buildConfigTrigger(TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE), false, nodeGroupLink.getSourceGroupId(), nodeGroupLink.getTargetGroupId())); - logger.info("Creating trigger hist entry for " + TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE)); + logger.info("Creating trigger hist entry for " + + TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE)); } else { logger.warn("Unexpected node group link while creating configuration triggers: source_node_group_id=" - + sourceNodeGroupId + ", action=" + nodeGroupLink.getDataEventAction()); + + sourceNodeGroupId + ", action=" + nodeGroupLink.getDataEventAction()); } } return triggers; @@ -325,8 +324,8 @@ public void insert(TriggerHistory newHistRecord) { Types.VARCHAR, Types.VARCHAR, Types.BIGINT }); } - public void insert(Trigger trigger) { - jdbcTemplate.update(getSql("insertTriggerSql"), new Object[] { trigger.getSourceCatalogName(), + public void saveTrigger(Trigger trigger) { + if (0 == jdbcTemplate.update(getSql("updateTriggerSql"), new Object[] { trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), trigger.getSourceTableName(), trigger.getTargetCatalogName(), trigger.getTargetSchemaName(), trigger.getTargetTableName(), trigger.getSourceGroupId(), trigger.getTargetGroupId(), trigger.getChannelId(), trigger.isSyncOnUpdate() ? 1 : 0, @@ -334,14 +333,32 @@ public void insert(Trigger trigger) { trigger.isSyncOnIncomingBatch() ? 1 : 0, trigger.getNameForUpdateTrigger(), trigger.getNameForInsertTrigger(), trigger.getNameForDeleteTrigger(), trigger.getSyncOnUpdateCondition(), trigger.getSyncOnInsertCondition(), - trigger.getSyncOnDeleteCondition(), trigger.getRouterExpression(), - trigger.getTxIdExpression(), trigger.getExcludedColumnNames(), trigger.getIntialLoadSelect(), trigger.getInitialLoadOrder(), - new Date(), null, trigger.getUpdatedBy(), new Date() }, new int[] { Types.VARCHAR, Types.VARCHAR, + trigger.getSyncOnDeleteCondition(), trigger.getRouterExpression(), trigger.getTxIdExpression(), + trigger.getExcludedColumnNames(), trigger.getIntialLoadSelect(), trigger.getInitialLoadOrder(), + new Date(), null, trigger.getUpdatedBy(), new Date(), trigger.getTriggerId() }, new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, - Types.VARCHAR, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.VARCHAR, - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP, - Types.VARCHAR, Types.TIMESTAMP }); + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, + Types.SMALLINT, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.INTEGER, + Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER })) { + jdbcTemplate.update(getSql("insertTriggerSql"), new Object[] { trigger.getSourceCatalogName(), + trigger.getSourceSchemaName(), trigger.getSourceTableName(), trigger.getTargetCatalogName(), + trigger.getTargetSchemaName(), trigger.getTargetTableName(), trigger.getSourceGroupId(), + trigger.getTargetGroupId(), trigger.getChannelId(), trigger.isSyncOnUpdate() ? 1 : 0, + trigger.isSyncOnInsert() ? 1 : 0, trigger.isSyncOnDelete() ? 1 : 0, + trigger.isSyncOnIncomingBatch() ? 1 : 0, trigger.getNameForUpdateTrigger(), + trigger.getNameForInsertTrigger(), trigger.getNameForDeleteTrigger(), + trigger.getSyncOnUpdateCondition(), trigger.getSyncOnInsertCondition(), + trigger.getSyncOnDeleteCondition(), trigger.getRouterExpression(), trigger.getTxIdExpression(), + trigger.getExcludedColumnNames(), trigger.getIntialLoadSelect(), trigger.getInitialLoadOrder(), + new Date(), null, trigger.getUpdatedBy(), new Date() }, new int[] { Types.VARCHAR, Types.VARCHAR, + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, + Types.VARCHAR, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.VARCHAR, + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP, + Types.VARCHAR, Types.TIMESTAMP }); + } + } public Map getHistoryRecords() { @@ -445,7 +462,7 @@ public Object mapRow(java.sql.ResultSet rs, int arg1) throws java.sql.SQLExcepti trig.setNameForDeleteTrigger(rs.getString("name_for_delete_trigger")); trig.setNameForInsertTrigger(rs.getString("name_for_insert_trigger")); trig.setNameForUpdateTrigger(rs.getString("name_for_update_trigger")); - String schema = rs.getString("source_schema_name"); + String schema = rs.getString("source_schema_name"); trig.setSourceSchemaName(schema); String catalog = rs.getString("source_catalog_name"); if (catalog == null && schema != null && dbDialect instanceof MySqlDbDialect) { diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/web/AuthenticationFilter.java b/symmetric/src/main/java/org/jumpmind/symmetric/web/AuthenticationFilter.java index 56e14da669..13455f8cdd 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/web/AuthenticationFilter.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/web/AuthenticationFilter.java @@ -36,9 +36,7 @@ import org.jumpmind.symmetric.transport.handler.AuthenticationResourceHandler.AuthenticationStatus; /** - * This better be the first filter that executes ! TODO: if this thing fails, - * should it prevent further processing of the request? - * + * This better be the first filter that executes! */ public class AuthenticationFilter extends AbstractTransportFilter { diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/web/InetAddressFilter.java b/symmetric/src/main/java/org/jumpmind/symmetric/web/InetAddressFilter.java index def4620f5d..fcb4824c66 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/web/InetAddressFilter.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/web/InetAddressFilter.java @@ -38,8 +38,7 @@ import org.jumpmind.symmetric.transport.InetAddressResourceHandler; /** - * This better be the first filter that executes ! TODO: if this thing fails, - * should it prevent further processing of the request? + * This better be the first filter that executes! */ public class InetAddressFilter extends AbstractTransportFilter { public static final String INET_ADDRESS_FILTERS = "inetAddressFilters"; @@ -76,8 +75,6 @@ public boolean isContainerCompatible() { public void doFilter(final ServletRequest req, final ServletResponse resp, final FilterChain chain) throws IOException, ServletException { - // final IInetAddressAuthorizer authorizer = - // getTransportResourceHandler(); final HttpServletRequest httpRequest = (HttpServletRequest) req; final String sourceAddrString = httpRequest.getRemoteAddr(); try { diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/web/PushServlet.java b/symmetric/src/main/java/org/jumpmind/symmetric/web/PushServlet.java index 75c04e80b8..a20ab679a1 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/web/PushServlet.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/web/PushServlet.java @@ -59,7 +59,8 @@ protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws Se getTransportResourceHandler().push(inputStream, outputStream); - outputStream.flush(); // TODO: why is this necessary? + // Not sure if this is necessary, but it's been here and it hasn't hurt anything ... + outputStream.flush(); if (logger.isDebugEnabled()) { logger.debug(String.format("Done with Push request from %s", nodeId)); diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/web/SymmetricServlet.java b/symmetric/src/main/java/org/jumpmind/symmetric/web/SymmetricServlet.java index c94af962fb..45767b69eb 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/web/SymmetricServlet.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/web/SymmetricServlet.java @@ -88,7 +88,7 @@ public void init(ServletConfig config) throws ServletException { if (ctx.getParent() != null) { servletBeans.putAll(ctx.getParent().getBeansOfType( IServletExtension.class)); - } + } // TODO order using initOrder for (final Map.Entry servletEntry : servletBeans .entrySet()) { diff --git a/symmetric/src/main/resources/ddl-config.xml b/symmetric/src/main/resources/ddl-config.xml index a40569087b..5d96b7c36e 100644 --- a/symmetric/src/main/resources/ddl-config.xml +++ b/symmetric/src/main/resources/ddl-config.xml @@ -21,7 +21,7 @@ - diff --git a/symmetric/src/main/resources/org/jumpmind/symmetric/services/impl/configuration-service-sql.xml b/symmetric/src/main/resources/org/jumpmind/symmetric/services/impl/configuration-service-sql.xml index 0b3468fb82..aa010a2d3d 100644 --- a/symmetric/src/main/resources/org/jumpmind/symmetric/services/impl/configuration-service-sql.xml +++ b/symmetric/src/main/resources/org/jumpmind/symmetric/services/impl/configuration-service-sql.xml @@ -161,7 +161,20 @@ (source_catalog_name,source_schema_name,source_table_name,target_catalog_name,target_schema_name,target_table_name,source_node_group_id,target_node_group_id,channel_id,sync_on_update,sync_on_insert,sync_on_delete,sync_on_incoming_batch,name_for_update_trigger,name_for_insert_trigger,name_for_delete_trigger,sync_on_update_condition,sync_on_insert_condition,sync_on_delete_condition,router_expression,tx_id_expression,excluded_column_names,initial_load_select,initial_load_order,create_time,inactive_time,last_updated_by,last_updated_time) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) - + + + + update $[sym.sync.table.prefix]_trigger + set source_catalog_name=?,source_schema_name=?,source_table_name=?, + target_catalog_name=?,target_schema_name=?,target_table_name=?,source_node_group_id=?, + target_node_group_id=?,channel_id=?,sync_on_update=?,sync_on_insert=?,sync_on_delete=?, + sync_on_incoming_batch=?,name_for_update_trigger=?,name_for_insert_trigger=?, + name_for_delete_trigger=?,sync_on_update_condition=?,sync_on_insert_condition=?, + sync_on_delete_condition=?,router_expression=?,tx_id_expression=?,excluded_column_names=?, + initial_load_select=?,initial_load_order=?,create_time=?,inactive_time=?,last_updated_by=?,last_updated_time=? + where trigger_id=? + + select * from $[sym.sync.table.prefix]_trigger where source_table_name = ? and diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RoutingServiceTest.java b/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RoutingServiceTest.java index fb7dda2c47..38e8f9c0e5 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RoutingServiceTest.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RoutingServiceTest.java @@ -1,15 +1,29 @@ package org.jumpmind.symmetric.service.impl; +import java.util.Iterator; +import java.util.List; + +import junit.framework.Assert; + +import org.jumpmind.symmetric.model.NodeChannel; +import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.Trigger; import org.jumpmind.symmetric.test.AbstractDatabaseTest; import org.jumpmind.symmetric.test.TestConstants; import org.jumpmind.symmetric.test.ParameterizedSuite.ParameterExcluder; import org.junit.Test; +import org.springframework.jdbc.core.simple.SimpleJdbcTemplate; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; public class RoutingServiceTest extends AbstractDatabaseTest { final static String TEST_TABLE_1 = "TEST_ROUTING_DATA_1"; final static String TEST_TABLE_2 = "TEST_ROUTING_DATA_2"; + + final static String NODE_GROUP_NODE_1 = "00001"; + final static String NODE_GROUP_NODE_2 = "00002"; + final static String NODE_GROUP_NODE_3 = "00003"; public RoutingServiceTest(String dbName) { super(dbName); @@ -35,8 +49,57 @@ protected Trigger getTestRoutingTableTrigger(String tableName) { @Test public void testMultiChannelRoutingToEveryone() { - Trigger trigger = getTestRoutingTableTrigger(TEST_TABLE_1); - getConfigurationService().insert(trigger); + Trigger trigger1 = getTestRoutingTableTrigger(TEST_TABLE_1); + getConfigurationService().saveTrigger(trigger1); + Trigger trigger2 = getTestRoutingTableTrigger(TEST_TABLE_2); + getConfigurationService().saveTrigger(trigger2); + getBootstrapService().syncTriggers(); + NodeChannel testChannel = getConfigurationService().getChannel(TestConstants.TEST_CHANNEL_ID); + NodeChannel otherChannel = getConfigurationService().getChannel(TestConstants.TEST_CHANNEL_ID_OTHER); + Assert.assertEquals(50, testChannel.getMaxBatchSize()); + Assert.assertEquals(1, otherChannel.getMaxBatchSize()); + // should be 1 batch for table 1 on the testchannel w/ max batch size of 50 + insert(TEST_TABLE_1, 5, false); + // this should generate 15 batches because the max batch size is 1 + insert(TEST_TABLE_2, 15, false); + insert(TEST_TABLE_1, 50, true); + getRoutingService().routeData(); + + final int EXPECTED_BATCHES = 16; + + List batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1); + filterForChannels(batches, testChannel, otherChannel); + Assert.assertEquals(EXPECTED_BATCHES, batches.size()); + Assert.assertEquals(1, countBatchesForChannel(batches, testChannel)); + Assert.assertEquals(15, countBatchesForChannel(batches, otherChannel)); + + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2); + filterForChannels(batches, testChannel, otherChannel); + // Node 2 has sync disabled + Assert.assertEquals(0, batches.size()); + + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3); + filterForChannels(batches, testChannel, otherChannel); + Assert.assertEquals(EXPECTED_BATCHES, batches.size()); + + getOutgoingBatchService().markAllAsSentForNode(NODE_GROUP_NODE_1); + getOutgoingBatchService().markAllAsSentForNode(NODE_GROUP_NODE_2); + getOutgoingBatchService().markAllAsSentForNode(NODE_GROUP_NODE_3); + + + // should be 2 batches for table 1 on the testchannel w/ max batch size of 50 + insert(TEST_TABLE_1, 50, false); + // this should generate 1 batches because the max batch size is 1, but the batch is transactional + insert(TEST_TABLE_2, 15, true); + insert(TEST_TABLE_1, 50, false); + getRoutingService().routeData(); + + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1); + filterForChannels(batches, testChannel, otherChannel); + Assert.assertEquals(3, batches.size()); + Assert.assertEquals(2, countBatchesForChannel(batches, testChannel)); + Assert.assertEquals(1, countBatchesForChannel(batches, otherChannel)); + } @Test @@ -53,5 +116,47 @@ public void testSyncBackToNode() { @ParameterExcluder("postgres") public void validateTransactionFunctionailty() throws Exception { } + + protected void filterForChannels(List batches, NodeChannel... channels) { + for (Iterator iterator = batches.iterator(); iterator.hasNext();) { + OutgoingBatch outgoingBatch = iterator.next(); + boolean foundChannel = false; + for (NodeChannel nodeChannel : channels) { + if (outgoingBatch.getChannelId().equals(nodeChannel.getId())) { + foundChannel =true; + } + } + + if (!foundChannel) { + iterator.remove(); + } + } + } + + protected int countBatchesForChannel(List batches, NodeChannel channel) { + int count = 0; + for (Iterator iterator = batches.iterator(); iterator.hasNext();) { + OutgoingBatch outgoingBatch = iterator.next(); + count += outgoingBatch.getChannelId().equals(channel.getId()) ? 1 : 0; + } + return count; + } + + protected void insert(final String tableName, final int count, boolean transactional) { + TransactionCallbackWithoutResult callback = new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + SimpleJdbcTemplate t = new SimpleJdbcTemplate(getJdbcTemplate()); + for (int i = 0; i < count; i++) { + t.update(String.format("insert into %s (ROUTING_INT) values(?)", tableName), 1); + } + } + }; + + if (transactional) { + getTransactionTemplate().execute(callback); + } else { + callback.doInTransaction(null); + } + } } diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockNodePasswordFilter.java b/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockNodePasswordFilter.java index 46d902ce93..0f4e3be2ed 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockNodePasswordFilter.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockNodePasswordFilter.java @@ -35,7 +35,6 @@ public String onNodeSecuritySave(String password) { } public boolean isAutoRegister() { - // TODO Auto-generated method stub return false; } diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockNodeService.java b/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockNodeService.java index 66e6a2a817..f2039fe2a0 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockNodeService.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockNodeService.java @@ -144,7 +144,5 @@ public NodeStatus getNodeStatus() { } public void setNodePasswordFilter(INodePasswordFilter nodePasswordFilter) { - // TODO Auto-generated method stub - } } \ No newline at end of file diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/test/AbstractDatabaseTest.java b/symmetric/src/test/java/org/jumpmind/symmetric/test/AbstractDatabaseTest.java index 7d33ad3dd1..e9137020fd 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/test/AbstractDatabaseTest.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/test/AbstractDatabaseTest.java @@ -28,12 +28,14 @@ import org.jumpmind.symmetric.db.IDbDialect; import org.jumpmind.symmetric.service.IBootstrapService; import org.jumpmind.symmetric.service.IConfigurationService; +import org.jumpmind.symmetric.service.IOutgoingBatchService; import org.jumpmind.symmetric.service.IParameterService; import org.jumpmind.symmetric.service.IRoutingService; import org.jumpmind.symmetric.util.AppUtils; import org.junit.AfterClass; import org.junit.Assert; import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.support.TransactionTemplate; public class AbstractDatabaseTest { @@ -79,10 +81,18 @@ protected IBootstrapService getBootstrapService() { protected IRoutingService getRoutingService() { return AppUtils.find(Constants.ROUTING_SERVICE, getSymmetricEngine()); } - + + protected IOutgoingBatchService getOutgoingBatchService() { + return AppUtils.find(Constants.OUTGOING_BATCH_SERVICE, getSymmetricEngine()); + } + protected DataSource getDataSource() { return AppUtils.find(Constants.DATA_SOURCE, getSymmetricEngine()); } + + protected TransactionTemplate getTransactionTemplate() { + return AppUtils.find(Constants.TRANSACTION_TEMPLATE, getSymmetricEngine()); + } protected JdbcTemplate getJdbcTemplate() { return new JdbcTemplate((DataSource) AppUtils.find(Constants.DATA_SOURCE, getSymmetricEngine())); diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/test/CrossCatalogSyncTest.java b/symmetric/src/test/java/org/jumpmind/symmetric/test/CrossCatalogSyncTest.java index efaf8a259e..fd4f9fc7ca 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/test/CrossCatalogSyncTest.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/test/CrossCatalogSyncTest.java @@ -82,7 +82,7 @@ protected void testCrossCatalogSyncOnMySQL(boolean schema, boolean catalog) { trigger.setSyncOnInsert(true); trigger.setSyncOnUpdate(true); trigger.setSyncOnDelete(true); - configService.insert(trigger); + configService.saveTrigger(trigger); getSymmetricEngine().syncTriggers(); jdbcTemplate.update("insert into other.other_table values('00000','first row')"); Assert.assertEquals("The data event from the other database's other_table was not captured.", jdbcTemplate @@ -113,7 +113,7 @@ public void testCrossCatalogSyncOnMsSql() { trigger.setSyncOnInsert(true); trigger.setSyncOnUpdate(true); trigger.setSyncOnDelete(true); - configService.insert(trigger); + configService.saveTrigger(trigger); getSymmetricEngine().syncTriggers(); jdbcTemplate.update("insert into other.dbo.other_table values('00000','first row')"); Assert.assertEquals("The data event from the other database's other_table was not captured.", 1, jdbcTemplate @@ -143,7 +143,7 @@ public void testCrossSchemaSyncOnMsSql() { trigger.setSyncOnInsert(true); trigger.setSyncOnUpdate(true); trigger.setSyncOnDelete(true); - configService.insert(trigger); + configService.saveTrigger(trigger); getSymmetricEngine().syncTriggers(); jdbcTemplate.update("insert into other.other_table2 values('00000','first row')"); Assert.assertEquals("The data event from the other database's other_table was not captured.", 1, jdbcTemplate diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/test/FunkyDataTypesTest.java b/symmetric/src/test/java/org/jumpmind/symmetric/test/FunkyDataTypesTest.java index 4a416b92f2..7269488a53 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/test/FunkyDataTypesTest.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/test/FunkyDataTypesTest.java @@ -61,7 +61,7 @@ public void testOraclePrecisionTimestamp() { trigger.setSyncOnInsert(true); trigger.setSyncOnUpdate(true); trigger.setSyncOnDelete(true); - configService.insert(trigger); + configService.saveTrigger(trigger); getSymmetricEngine().syncTriggers(); jdbcTemplate.update("insert into " + TABLE_NAME + " values('00000',timestamp'2008-01-01 00:00:00.000',timestamp'2008-01-01 00:00:00.000')"); diff --git a/symmetric/src/test/resources/test-database-setup.sql b/symmetric/src/test/resources/test-database-setup.sql index bbf167517c..69e820c1ac 100644 --- a/symmetric/src/test/resources/test-database-setup.sql +++ b/symmetric/src/test/resources/test-database-setup.sql @@ -6,8 +6,9 @@ insert into sym_node_group values ('test-node-group','a test config'); insert into sym_node_group values ('test-node-group2','another test config'); insert into sym_node_group values ('unit-test-only','a group used for unit testing'); insert into sym_node_group_link values ('test-root-group','test-root-group', 'P'); -insert into sym_node_group_link values ('test-root-group','test-root-group2', 'P'); -insert into sym_node_group_link values ('test-node-group','test-root-group', 'W'); +insert into sym_node_group_link values ('test-root-group','test-node-group2', 'P'); +insert into sym_node_group_link values ('test-root-group','test-node-group', 'W'); +insert into sym_node_group_link values ('test-node-group','test-root-group', 'P'); insert into sym_node_group_link values ('symmetric','test-root-group', 'P'); insert into sym_node values ('00000', 'test-root-group', '00000', 1, 'internal://root', '1', '1.4.0-SNAPSHOT','H2', '1.1', current_timestamp, null, '00000'); insert into sym_node values ('1', 'test-node-group', '1', 1, 'internal://root', '1', '1.4.0-SNAPSHOT','H2', '5.0', current_timestamp, null, '00000'); diff --git a/symmetric/src/test/resources/test-tables-ddl.xml b/symmetric/src/test/resources/test-tables-ddl.xml index e3f706ffb7..9c7f7d6235 100644 --- a/symmetric/src/test/resources/test-tables-ddl.xml +++ b/symmetric/src/test/resources/test-tables-ddl.xml @@ -154,19 +154,19 @@ - + - +
- + - +