diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/NonTransactionalBatchAlgorithm.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/NonTransactionalBatchAlgorithm.java new file mode 100644 index 0000000000..3475f64172 --- /dev/null +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/NonTransactionalBatchAlgorithm.java @@ -0,0 +1,37 @@ +/* + * SymmetricDS is an open source database synchronization solution. + * + * Copyright (C) Chris Henson + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see + * . + */ +package org.jumpmind.symmetric.route; + +import org.jumpmind.symmetric.model.DataMetaData; +import org.jumpmind.symmetric.model.OutgoingBatch; +import org.jumpmind.symmetric.model.OutgoingBatchHistory; + +public class NonTransactionalBatchAlgorithm implements IBatchAlgorithm { + + public boolean isBatchComplete(OutgoingBatchHistory history, OutgoingBatch batch, DataMetaData dataMetaData, + IRoutingContext routingContext) { + return history.getDataEventCount() >= dataMetaData.getChannel().getMaxBatchSize(); + } + + public boolean isAutoRegister() { + return true; + } + +} diff --git a/symmetric/src/main/resources/symmetric-routers.xml b/symmetric/src/main/resources/symmetric-routers.xml index 31668ea62c..cfb3471b63 100644 --- a/symmetric/src/main/resources/symmetric-routers.xml +++ b/symmetric/src/main/resources/symmetric-routers.xml @@ -10,7 +10,7 @@ + value="select c.node_id from $[sym.sync.table.prefix]_node c where c.node_group_id=:NODE_GROUP_ID and c.sync_enabled=1 and " /> @@ -30,9 +30,12 @@ - + + + + \ No newline at end of file diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RoutingServiceTest.java b/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RoutingServiceTest.java index 25a8c78b38..f768b7e06f 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RoutingServiceTest.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/RoutingServiceTest.java @@ -111,14 +111,13 @@ public void testColumnMatchTransactionalOnlyRoutingToNode1() { getBootstrapService().syncTriggers(); NodeChannel testChannel = getConfigurationService().getChannel(TestConstants.TEST_CHANNEL_ID); testChannel.setMaxBatchToSend(100); - testChannel.setBatchAlgorithm("transaction"); + testChannel.setBatchAlgorithm("transactional"); getConfigurationService().saveChannel(testChannel); // should be 51 batches for table 1 insert(TEST_TABLE_1, 500, true); insert(TEST_TABLE_1, 50, false); getRoutingService().routeData(); - getRoutingService().routeData(); final int EXPECTED_BATCHES = 51; @@ -139,6 +138,45 @@ public void testColumnMatchTransactionalOnlyRoutingToNode1() { resetBatches(); } + + @Test + public void testSubSelectNonTransactionalRoutingToNode1() { + resetBatches(); + + Trigger trigger1 = getTestRoutingTableTrigger(TEST_TABLE_1); + trigger1.setRouterName("subselect"); + trigger1.setRouterExpression("c.node_id=:ROUTING_VARCHAR"); + getConfigurationService().saveTrigger(trigger1); + getBootstrapService().syncTriggers(); + NodeChannel testChannel = getConfigurationService().getChannel(TestConstants.TEST_CHANNEL_ID); + testChannel.setMaxBatchToSend(1000); + testChannel.setMaxBatchSize(5); + testChannel.setBatchAlgorithm("nontransactional"); + getConfigurationService().saveChannel(testChannel); + + // should be 51 batches for table 1 + insert(TEST_TABLE_1, 500, true); + getRoutingService().routeData(); + + final int EXPECTED_BATCHES = 100; + + List batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1); + filterForChannels(batches, testChannel); + Assert.assertEquals(EXPECTED_BATCHES, batches.size()); + Assert.assertEquals(EXPECTED_BATCHES, countBatchesForChannel(batches, testChannel)); + + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2); + filterForChannels(batches, testChannel); + // Node 2 has sync disabled + Assert.assertEquals(0, batches.size()); + + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3); + filterForChannels(batches, testChannel); + // Batch was targeted only at node 1 + Assert.assertEquals(0, batches.size()); + + resetBatches(); + } @Test public void syncIncomingBatchTest() throws Exception {