Skip to content

Commit

Permalink
0002275: Common batch mode not detected if trigger router is defined …
Browse files Browse the repository at this point in the history
…that does not use the default router and is on the same link
  • Loading branch information
chenson42 committed Apr 16, 2015
1 parent 651d901 commit 6f8aa01
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 14 deletions.
Expand Up @@ -72,6 +72,11 @@ public class Router implements Serializable {

public Router() {
routerId = Integer.toString(maxRouterId++);
}

public Router(String id, String sourceNodeGroupId, String targetNodeGroupId, String routerType) {
this(id, new NodeGroupLink(sourceNodeGroupId, targetNodeGroupId));
this.routerType = routerType;
}

public Router(String id, NodeGroupLink link) {
Expand Down
Expand Up @@ -120,6 +120,11 @@ public Trigger(String tableName, String channelId) {
this.sourceTableName = tableName;
this.channelId = channelId;
}

public Trigger(String tableName, String channelId, boolean syncOnIncomingBatch) {
this(tableName, channelId);
this.syncOnIncomingBatch = syncOnIncomingBatch;
}

final public String qualifiedSourceTableName() {
return qualifiedSourceTablePrefix() + sourceTableName;
Expand Down
Expand Up @@ -331,7 +331,8 @@ protected int routeDataForEachChannel(DataGapDetector gapDetector) {
dataCount += routeDataForChannel(processInfo,
nodeChannel,
sourceNode,
producesCommonBatches(nodeChannel.getChannel()), gapDetector);
producesCommonBatches(nodeChannel.getChannel(), parameterService.getNodeGroupId(),
engine.getTriggerRouterService().getTriggerRouters(false)), gapDetector);
} else {
if (log.isDebugEnabled()) {
log.debug(
Expand All @@ -348,18 +349,39 @@ protected int routeDataForEachChannel(DataGapDetector gapDetector) {
return dataCount;
}

protected boolean producesCommonBatches(Channel channel) {
protected boolean producesCommonBatches(Channel channel, String nodeGroupId, List<TriggerRouter> triggerRouters) {
String channelId = channel.getChannelId();
Boolean producesCommonBatches = !Constants.CHANNEL_CONFIG.equals(channelId)
&& !channel.isFileSyncFlag()
&& !channel.isReloadFlag()
&& !Constants.CHANNEL_HEARTBEAT.equals(channelId) ? true : false;
String nodeGroupId = parameterService.getNodeGroupId();
List<TriggerRouter> triggerRouters = engine.getTriggerRouterService()
.getTriggerRouters(false);
if (triggerRouters != null) {
if (producesCommonBatches && triggerRouters != null) {
List<TriggerRouter> testableTriggerRouters = new ArrayList<TriggerRouter>();
for (TriggerRouter triggerRouter : triggerRouters) {
IDataRouter dataRouter = getDataRouter(triggerRouter.getRouter());
if (triggerRouter.getTrigger().getChannelId().equals(channel.getChannelId())) {
testableTriggerRouters.add(triggerRouter);
} else {
/*
* Add any trigger router that is in another channel, but is
* for a table that is in the current channel
*/
String anotherChannelTableName = triggerRouter.getTrigger()
.getFullyQualifiedSourceTableName();
for (TriggerRouter triggerRouter2 : triggerRouters) {
String currentTableName = triggerRouter2
.getTrigger()
.getFullyQualifiedSourceTableName();
String currentChannelId = triggerRouter2.getTrigger().getChannelId();
if (anotherChannelTableName
.equals(currentTableName) && currentChannelId.equals(channelId)) {
testableTriggerRouters.add(triggerRouter);
}
}
}
}

for (TriggerRouter triggerRouter : testableTriggerRouters) {
boolean isDefaultRouter = triggerRouter.getRouter().getRouterType().equals("default");
/*
* If the data router is not a default data router or there will
* be incoming data on the channel where sync_on_incoming_batch
Expand All @@ -370,18 +392,20 @@ protected boolean producesCommonBatches(Channel channel) {
*/
if (triggerRouter.getRouter().getNodeGroupLink().getSourceNodeGroupId()
.equals(nodeGroupId)) {
if (!(dataRouter instanceof DefaultDataRouter)) {
if (!isDefaultRouter) {
producesCommonBatches = false;
break;
} else {
if (triggerRouter.getTrigger().isSyncOnIncomingBatch()) {
String tableName = triggerRouter.getTrigger()
String outgoingTableName = triggerRouter.getTrigger()
.getFullyQualifiedSourceTableName();
for (TriggerRouter triggerRouter2 : triggerRouters) {
if (triggerRouter2.getTrigger().getFullyQualifiedSourceTableName()
.equals(tableName)
&& triggerRouter2.getRouter().getNodeGroupLink()
.getTargetNodeGroupId().equals(nodeGroupId)) {
for (TriggerRouter triggerRouter2 : testableTriggerRouters) {
String incomingTableName = triggerRouter2.getTrigger().getFullyQualifiedSourceTableName();
String targetNodeGroupId = triggerRouter2.getRouter().getNodeGroupLink()
.getTargetNodeGroupId();
if (incomingTableName
.equals(outgoingTableName)
&& targetNodeGroupId.equals(nodeGroupId)) {
producesCommonBatches = false;
break;
}
Expand Down
@@ -0,0 +1,111 @@
package org.jumpmind.symmetric.service.impl;

import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.List;

import org.jumpmind.db.platform.DatabaseInfo;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.Router;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.service.IExtensionService;
import org.jumpmind.symmetric.service.IParameterService;
import org.junit.Before;
import org.junit.Test;

public class RouterServiceTest {

final static Channel CHANNEL_2_TEST = new Channel("test", 1);

final static String SOURCE_NODE_GROUP = "source";

final static String TARGET_NODE_GROUP = "target";

RouterService routerService;

@Before
public void setup() {
ISymmetricEngine engine = mock(ISymmetricEngine.class);
IParameterService parameterService = mock(IParameterService.class);
ISymmetricDialect symmetricDialect = mock(ISymmetricDialect.class);
IDatabasePlatform databasePlatform = mock(IDatabasePlatform.class);
IExtensionService extensionService = mock(IExtensionService.class);
when(databasePlatform.getDatabaseInfo()).thenReturn(new DatabaseInfo());
when(symmetricDialect.getPlatform()).thenReturn(databasePlatform);
when(engine.getDatabasePlatform()).thenReturn(databasePlatform);
when(engine.getParameterService()).thenReturn(parameterService);
when(engine.getSymmetricDialect()).thenReturn(symmetricDialect);
when(engine.getExtensionService()).thenReturn(extensionService);
routerService = new RouterService(engine);
}

@Test
public void testProducesCommonBatchesOneTableOneChannelDefaultRouter() {
List<TriggerRouter> triggerRouters = new ArrayList<TriggerRouter>();
triggerRouters.add(new TriggerRouter(new Trigger("a", CHANNEL_2_TEST.getChannelId()), new Router("test", SOURCE_NODE_GROUP, TARGET_NODE_GROUP, "default")));
assertTrue(routerService.producesCommonBatches(CHANNEL_2_TEST, SOURCE_NODE_GROUP, triggerRouters));
}

@Test
public void testNotProducesCommonBatchesOneTableOneChannelNonDefaultRouter() {
List<TriggerRouter> triggerRouters = new ArrayList<TriggerRouter>();
triggerRouters.add(new TriggerRouter(new Trigger("a", CHANNEL_2_TEST.getChannelId()), new Router("test", SOURCE_NODE_GROUP, TARGET_NODE_GROUP, "column")));
assertTrue(!routerService.producesCommonBatches(CHANNEL_2_TEST, SOURCE_NODE_GROUP, triggerRouters));
}

@Test
public void testProducesCommonBatchesMultipleTablesTwoChannelsMultipleRouters() {
List<TriggerRouter> triggerRouters = new ArrayList<TriggerRouter>();
triggerRouters.add(new TriggerRouter(new Trigger("a", CHANNEL_2_TEST.getChannelId()), new Router("test1", SOURCE_NODE_GROUP, TARGET_NODE_GROUP, "default")));
triggerRouters.add(new TriggerRouter(new Trigger("b", "anotherchannel"), new Router("test2", SOURCE_NODE_GROUP, TARGET_NODE_GROUP, "column")));
assertTrue(routerService.producesCommonBatches(CHANNEL_2_TEST, SOURCE_NODE_GROUP, triggerRouters));
}

@Test
public void testProducesCommonBatchesMultipleTablesTwoChannelsMultipleRoutersBidirectional() {
List<TriggerRouter> triggerRouters = new ArrayList<TriggerRouter>();
triggerRouters.add(new TriggerRouter(new Trigger("a", CHANNEL_2_TEST.getChannelId()), new Router("test", SOURCE_NODE_GROUP, TARGET_NODE_GROUP, "default")));
triggerRouters.add(new TriggerRouter(new Trigger("a", CHANNEL_2_TEST.getChannelId()), new Router("test", TARGET_NODE_GROUP, SOURCE_NODE_GROUP, "default")));
assertTrue(routerService.producesCommonBatches(CHANNEL_2_TEST, SOURCE_NODE_GROUP, triggerRouters));
}

@Test
public void testNotProducesCommonBatchesMultipleTablesTwoChannelsMultipleRoutersSyncOnIncoming() {
List<TriggerRouter> triggerRouters = new ArrayList<TriggerRouter>();
Trigger tableTrigger = new Trigger("a", CHANNEL_2_TEST.getChannelId(), true);
triggerRouters.add(new TriggerRouter(tableTrigger, new Router("test", SOURCE_NODE_GROUP, TARGET_NODE_GROUP, "default")));
triggerRouters.add(new TriggerRouter(tableTrigger, new Router("test", TARGET_NODE_GROUP, SOURCE_NODE_GROUP, "default")));
assertTrue(!routerService.producesCommonBatches(CHANNEL_2_TEST, SOURCE_NODE_GROUP, triggerRouters));
}

@Test
public void testNotProducesCommonBatchesSameTablesTwoChannelsMultipleRoutersSameTableIncomingOnAnotherChannel() {
List<TriggerRouter> triggerRouters = new ArrayList<TriggerRouter>();
Trigger tableTrigger1 = new Trigger("a", CHANNEL_2_TEST.getChannelId(), true);
Trigger tableTrigger2= new Trigger("a", "anotherchannel");
triggerRouters.add(new TriggerRouter(tableTrigger1, new Router("test", SOURCE_NODE_GROUP, TARGET_NODE_GROUP, "default")));
triggerRouters.add(new TriggerRouter(tableTrigger2, new Router("test", TARGET_NODE_GROUP, SOURCE_NODE_GROUP, "default")));
assertTrue(!routerService.producesCommonBatches(CHANNEL_2_TEST, SOURCE_NODE_GROUP, triggerRouters));
}

@Test
public void testProducesCommonBatchesSameTablesTwoChannelsMultipleRoutersDifferentTableIncomingOnAnotherChannel() {
List<TriggerRouter> triggerRouters = new ArrayList<TriggerRouter>();
Trigger tableTrigger1 = new Trigger("a", CHANNEL_2_TEST.getChannelId(), true);
Trigger tableTrigger2= new Trigger("b", "anotherchannel");
Trigger tableTrigger3= new Trigger("c", CHANNEL_2_TEST.getChannelId());
triggerRouters.add(new TriggerRouter(tableTrigger1, new Router("test", SOURCE_NODE_GROUP, TARGET_NODE_GROUP, "default")));
triggerRouters.add(new TriggerRouter(tableTrigger2, new Router("test", TARGET_NODE_GROUP, SOURCE_NODE_GROUP, "default")));
triggerRouters.add(new TriggerRouter(tableTrigger3, new Router("test", TARGET_NODE_GROUP, SOURCE_NODE_GROUP, "default")));
assertTrue(routerService.producesCommonBatches(CHANNEL_2_TEST, SOURCE_NODE_GROUP, triggerRouters));
}


}

0 comments on commit 6f8aa01

Please sign in to comment.