Skip to content

Commit

Permalink
comment
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jul 18, 2009
1 parent 813532f commit 5bf1a01
Showing 1 changed file with 37 additions and 37 deletions.
Expand Up @@ -20,7 +20,7 @@ public class RoutingServiceTest extends AbstractDatabaseTest {

final static String TEST_TABLE_1 = "TEST_ROUTING_DATA_1";
final static String TEST_TABLE_2 = "TEST_ROUTING_DATA_2";

final static String NODE_GROUP_NODE_1 = "00001";
final static String NODE_GROUP_NODE_2 = "00002";
final static String NODE_GROUP_NODE_3 = "00003";
Expand Down Expand Up @@ -50,7 +50,7 @@ protected Trigger getTestRoutingTableTrigger(String tableName) {
@Test
public void testMultiChannelRoutingToEveryone() {
resetBatches();

Trigger trigger1 = getTestRoutingTableTrigger(TEST_TABLE_1);
getConfigurationService().saveTrigger(trigger1);
Trigger trigger2 = getTestRoutingTableTrigger(TEST_TABLE_2);
Expand All @@ -66,26 +66,26 @@ public void testMultiChannelRoutingToEveryone() {
insert(TEST_TABLE_2, 15, false);
insert(TEST_TABLE_1, 50, true);
getRoutingService().routeData();

final int EXPECTED_BATCHES = 16;

List<OutgoingBatch> batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1);
filterForChannels(batches, testChannel, otherChannel);
Assert.assertEquals(EXPECTED_BATCHES, batches.size());
Assert.assertEquals(1, countBatchesForChannel(batches, testChannel));
Assert.assertEquals(15, countBatchesForChannel(batches, otherChannel));

batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2);
filterForChannels(batches, testChannel, otherChannel);
// Node 2 has sync disabled
Assert.assertEquals(0, batches.size());

batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3);
filterForChannels(batches, testChannel, otherChannel);
Assert.assertEquals(EXPECTED_BATCHES, batches.size());
resetBatches();

resetBatches();

// should be 2 batches for table 1 on the testchannel w/ max batch size of 50
insert(TEST_TABLE_1, 50, false);
// this should generate 1 batches because the max batch size is 1, but the batch is transactional
Expand All @@ -99,11 +99,11 @@ public void testMultiChannelRoutingToEveryone() {
Assert.assertEquals(2, countBatchesForChannel(batches, testChannel));
Assert.assertEquals(1, countBatchesForChannel(batches, otherChannel));
}

@Test
public void testColumnMatchTransactionalOnlyRoutingToNode1() {
resetBatches();

Trigger trigger1 = getTestRoutingTableTrigger(TEST_TABLE_1);
trigger1.setRouterName("column");
trigger1.setRouterExpression("ROUTING_VARCHAR=:NODE_ID");
Expand All @@ -113,36 +113,36 @@ public void testColumnMatchTransactionalOnlyRoutingToNode1() {
testChannel.setMaxBatchToSend(100);
testChannel.setBatchAlgorithm("transactional");
getConfigurationService().saveChannel(testChannel);

// should be 51 batches for table 1
insert(TEST_TABLE_1, 500, true);
insert(TEST_TABLE_1, 50, false);
getRoutingService().routeData();

final int EXPECTED_BATCHES = 51;

List<OutgoingBatch> batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1);
filterForChannels(batches, testChannel);
Assert.assertEquals(EXPECTED_BATCHES, batches.size());
Assert.assertEquals(EXPECTED_BATCHES, countBatchesForChannel(batches, testChannel));

batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2);
filterForChannels(batches, testChannel);
// Node 2 has sync disabled
Assert.assertEquals(0, batches.size());

batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3);
filterForChannels(batches, testChannel);
// Batch was targeted only at node 1
Assert.assertEquals(0, batches.size());
resetBatches();
}

resetBatches();
}

@Test
public void testSubSelectNonTransactionalRoutingToNode1() {
resetBatches();

Trigger trigger1 = getTestRoutingTableTrigger(TEST_TABLE_1);
trigger1.setRouterName("subselect");
trigger1.setRouterExpression("c.node_id=:ROUTING_VARCHAR");
Expand All @@ -153,30 +153,30 @@ public void testSubSelectNonTransactionalRoutingToNode1() {
testChannel.setMaxBatchSize(5);
testChannel.setBatchAlgorithm("nontransactional");
getConfigurationService().saveChannel(testChannel);
// should be 51 batches for table 1

// should be 100 batches for table 1, even though we committed the changes as part of a transaction
insert(TEST_TABLE_1, 500, true);
getRoutingService().routeData();

final int EXPECTED_BATCHES = 100;

List<OutgoingBatch> batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1);
filterForChannels(batches, testChannel);
Assert.assertEquals(EXPECTED_BATCHES, batches.size());
Assert.assertEquals(EXPECTED_BATCHES, countBatchesForChannel(batches, testChannel));

batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2);
filterForChannels(batches, testChannel);
// Node 2 has sync disabled
Assert.assertEquals(0, batches.size());

batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3);
filterForChannels(batches, testChannel);
// Batch was targeted only at node 1
Assert.assertEquals(0, batches.size());
resetBatches();
}

resetBatches();
}

@Test
public void syncIncomingBatchTest() throws Exception {
Expand All @@ -192,36 +192,36 @@ public void testSyncBackToNode() {
@ParameterExcluder("postgres")
public void validateTransactionFunctionailty() throws Exception {
}

protected void filterForChannels(List<OutgoingBatch> batches, NodeChannel... channels) {
for (Iterator<OutgoingBatch> iterator = batches.iterator(); iterator.hasNext();) {
OutgoingBatch outgoingBatch = iterator.next();
boolean foundChannel = false;
for (NodeChannel nodeChannel : channels) {
if (outgoingBatch.getChannelId().equals(nodeChannel.getId())) {
foundChannel =true;
foundChannel = true;
}
}

if (!foundChannel) {
iterator.remove();
}
}
}

protected void resetBatches() {
getOutgoingBatchService().markAllAsSentForNode(NODE_GROUP_NODE_1);
getOutgoingBatchService().markAllAsSentForNode(NODE_GROUP_NODE_2);
getOutgoingBatchService().markAllAsSentForNode(NODE_GROUP_NODE_3);
getOutgoingBatchService().markAllAsSentForNode(NODE_GROUP_NODE_3);
}

protected int countBatchesForChannel(List<OutgoingBatch> batches, NodeChannel channel) {
int count = 0;
for (Iterator<OutgoingBatch> iterator = batches.iterator(); iterator.hasNext();) {
OutgoingBatch outgoingBatch = iterator.next();
count += outgoingBatch.getChannelId().equals(channel.getId()) ? 1 : 0;
}
return count;
return count;
}

protected void insert(final String tableName, final int count, boolean transactional) {
Expand Down

0 comments on commit 5bf1a01

Please sign in to comment.