diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapDetector.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapDetector.java index 212d7d6cf5..8b4c799d7e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapDetector.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapDetector.java @@ -265,4 +265,8 @@ public void addDataIds(List dataIds) { public void setIsAllDataRead(boolean isAllDataRead) { } + + public void setFullGapAnalysis(boolean isFullGapAnalysis) { + } + } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java index 3ee9f57718..3227668a4c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java @@ -76,9 +76,11 @@ public class DataGapFastDetector extends DataGapDetector implements ISqlRowMappe protected boolean isAllDataRead = true; protected long maxDataToSelect; + + protected boolean isFullGapAnalysis = true; - protected boolean isBusyExpire; - + protected long lastBusyExpireRunTime; + protected Set gapsAll; protected Set gapsAdded; @@ -94,31 +96,32 @@ public DataGapFastDetector(IDataService dataService, IParameterService parameter this.symmetricDialect = symmetricDialect; this.statisticManager = statisticManager; this.nodeService = nodeService; - reset(); } public void beforeRouting() { + reset(); ProcessInfo processInfo = this.statisticManager.newProcessInfo(new ProcessInfoKey( nodeService.findIdentityNodeId(), null, ProcessType.GAP_DETECT)); processInfo.setStatus(Status.QUERYING); maxDataToSelect = parameterService.getLong(ParameterConstants.ROUTING_LARGEST_GAP_SIZE); gaps = dataService.findDataGaps(); - processInfo.setStatus(Status.OK); fixOverlappingGaps(gaps, processInfo); - - if (contextService.is(ContextConstants.ROUTING_FULL_GAP_ANALYSIS)) { + + if (isFullGapAnalysis()) { log.info("Full gap analysis is running"); long ts = System.currentTimeMillis(); queryDataIdMap(); log.info("Querying data in gaps from database took {} ms", System.currentTimeMillis() - ts); afterRouting(); - log.info("Full gap analysis is done after {} ms", System.currentTimeMillis() - ts); - gaps = dataService.findDataGaps(); reset(); + gaps = dataService.findDataGaps(); + log.info("Full gap analysis is done after {} ms", System.currentTimeMillis() - ts); } + processInfo.setStatus(Status.OK); } protected void reset() { + isAllDataRead = true; dataIds = new ArrayList(); gapsAll = new HashSet(); gapsAdded = new HashSet(); @@ -153,8 +156,9 @@ public void afterRouting() { } Date currentDate = new Date(currentTime); + boolean isBusyExpire = false; if (!isAllDataRead) { - long lastBusyExpireRunTime = contextService.getLong(ContextConstants.ROUTING_LAST_BUSY_EXPIRE_RUN_TIME); + long lastBusyExpireRunTime = getLastBusyExpireRunTime(); long busyExpireMillis = parameterService.getLong(ParameterConstants.ROUTING_STALE_GAP_BUSY_EXPIRE_TIME); isBusyExpire = lastBusyExpireRunTime == 0 || System.currentTimeMillis() - lastBusyExpireRunTime >= busyExpireMillis; } @@ -278,10 +282,10 @@ public void afterRouting() { dataService.insertDataGap(newGap); } } - - contextService.save(ContextConstants.ROUTING_FULL_GAP_ANALYSIS, "false"); + + setFullGapAnalysis(false); if (!isAllDataRead && expireChecked > 0) { - contextService.save(ContextConstants.ROUTING_LAST_BUSY_EXPIRE_RUN_TIME, String.valueOf(System.currentTimeMillis())); + setLastBusyExpireRunTime(System.currentTimeMillis()); } long updateTimeInMs = System.currentTimeMillis() - ts; @@ -435,4 +439,32 @@ public void setIsAllDataRead(boolean isAllDataRead) { this.isAllDataRead &= isAllDataRead; } + public boolean isFullGapAnalysis() { + if (parameterService.is(ParameterConstants.CLUSTER_LOCKING_ENABLED)) { + isFullGapAnalysis = contextService.is(ContextConstants.ROUTING_FULL_GAP_ANALYSIS); + } + return isFullGapAnalysis; + } + + public void setFullGapAnalysis(boolean isFullGapAnalysis) { + if (parameterService.is(ParameterConstants.CLUSTER_LOCKING_ENABLED)) { + contextService.save(ContextConstants.ROUTING_FULL_GAP_ANALYSIS, Boolean.toString(isFullGapAnalysis)); + } + this.isFullGapAnalysis = isFullGapAnalysis; + } + + protected long getLastBusyExpireRunTime() { + if (parameterService.is(ParameterConstants.CLUSTER_LOCKING_ENABLED)) { + lastBusyExpireRunTime = contextService.getLong(ContextConstants.ROUTING_LAST_BUSY_EXPIRE_RUN_TIME); + } + return lastBusyExpireRunTime; + } + + protected void setLastBusyExpireRunTime(long lastBusyExpireRunTime) { + if (parameterService.is(ParameterConstants.CLUSTER_LOCKING_ENABLED)) { + contextService.save(ContextConstants.ROUTING_LAST_BUSY_EXPIRE_RUN_TIME, String.valueOf(lastBusyExpireRunTime)); + } + this.lastBusyExpireRunTime = lastBusyExpireRunTime; + } + } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java index 05548a9d02..8066905a25 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java @@ -44,7 +44,6 @@ import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.SyntaxParsingException; import org.jumpmind.symmetric.common.Constants; -import org.jumpmind.symmetric.common.ContextConstants; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.io.data.DataEventType; import org.jumpmind.symmetric.model.Channel; @@ -114,6 +113,8 @@ public class RouterService extends AbstractService implements IRouterService { protected IExtensionService extensionService; + protected DataGapDetector gapDetector; + protected boolean syncTriggersBeforeInitialLoadAttempted = false; protected boolean firstTimeCheckForAbandonedBatches = true; @@ -141,6 +142,14 @@ public RouterService(ISymmetricEngine engine) { setSqlMap(new RouterServiceSqlMap(symmetricDialect.getPlatform(), createSqlReplacementTokens())); + + if (parameterService.is(ParameterConstants.ROUTING_USE_FAST_GAP_DETECTOR)) { + gapDetector = new DataGapFastDetector(engine.getDataService(), parameterService, engine.getContextService(), + symmetricDialect, this, engine.getStatisticManager(), engine.getNodeService()); + } else { + gapDetector = new DataGapDetector(engine.getDataService(), parameterService, symmetricDialect, + this, engine.getStatisticManager(), engine.getNodeService()); + } } /** @@ -185,18 +194,8 @@ synchronized public long routeData(boolean force) { insertInitialLoadEvents(); long ts = System.currentTimeMillis(); - DataGapDetector gapDetector = null; - if (parameterService.is(ParameterConstants.ROUTING_USE_FAST_GAP_DETECTOR)) { - gapDetector = new DataGapFastDetector( - engine.getDataService(), parameterService, engine.getContextService(), symmetricDialect, - this, engine.getStatisticManager(), engine.getNodeService()); - } else { - gapDetector = new DataGapDetector( - engine.getDataService(), parameterService, symmetricDialect, - this, engine.getStatisticManager(), engine.getNodeService()); - } gapDetector.beforeRouting(); - dataCount = routeDataForEachChannel(gapDetector); + dataCount = routeDataForEachChannel(); ts = System.currentTimeMillis() - ts; if (dataCount > 0 || ts > Constants.LONG_OPERATION_THRESHOLD) { log.info("Routed {} data events in {} ms", dataCount, ts); @@ -289,7 +288,7 @@ protected void insertInitialLoadEvents() { } } if (isInitialLoadQueued) { - engine.getContextService().save(ContextConstants.ROUTING_FULL_GAP_ANALYSIS, "true"); + gapDetector.setFullGapAnalysis(true); } } } @@ -345,7 +344,7 @@ protected void sendReverseInitialLoad() { * thread pool here and waiting for all channels to be processed. The other * reason is to reduce the number of connections we are required to have. */ - protected int routeDataForEachChannel(DataGapDetector gapDetector) { + protected int routeDataForEachChannel() { int dataCount = 0; Node sourceNode = engine.getNodeService().findIdentity(); ProcessInfo processInfo = engine.getStatisticManager().newProcessInfo( @@ -355,15 +354,12 @@ protected int routeDataForEachChannel(DataGapDetector gapDetector) { final List channels = engine.getConfigurationService().getNodeChannels(false); Set readyChannels = null; if (parameterService.is(ParameterConstants.ROUTING_QUERY_CHANNELS_FIRST)) { - readyChannels = getReadyChannels(gapDetector); + readyChannels = getReadyChannels(); } for (NodeChannel nodeChannel : channels) { if (nodeChannel.isEnabled() && (readyChannels == null || readyChannels.contains(nodeChannel.getChannelId()))) { processInfo.setCurrentChannelId(nodeChannel.getChannelId()); - dataCount += routeDataForChannel(processInfo, - nodeChannel, - sourceNode - , gapDetector); + dataCount += routeDataForChannel(processInfo, nodeChannel, sourceNode); } else { gapDetector.setIsAllDataRead(false); if (log.isDebugEnabled()) { @@ -382,7 +378,7 @@ protected int routeDataForEachChannel(DataGapDetector gapDetector) { return dataCount; } - protected Set getReadyChannels(DataGapDetector gapDetector) { + protected Set getReadyChannels() { List dataGaps = gapDetector.getDataGaps(); int dataIdSqlType = engine.getSymmetricDialect().getSqlTypeForIds(); int numberOfGapsToQualify = parameterService.getInt(ParameterConstants.ROUTING_MAX_GAPS_TO_QUALIFY_IN_SQL, 100); @@ -553,8 +549,7 @@ protected boolean onlyDefaultRoutersAssigned(Channel channel, String nodeGroupId return onlyDefaultRoutersAssigned; } - protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nodeChannel, final Node sourceNode, - DataGapDetector gapDetector) { + protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nodeChannel, final Node sourceNode) { ChannelRouterContext context = null; long ts = System.currentTimeMillis(); int dataCount = -1; @@ -651,9 +646,7 @@ protected void completeBatchesAndCommit(ChannelRouterContext context) { List batches = new ArrayList(context.getBatchesByNodes() .values()); - if (!engine.getContextService().is(ContextConstants.ROUTING_FULL_GAP_ANALYSIS)) { - engine.getContextService().save(ContextConstants.ROUTING_FULL_GAP_ANALYSIS, "true"); - } + gapDetector.setFullGapAnalysis(true); context.commit(); if (engine.getParameterService().is(ParameterConstants.ROUTING_LOG_STATS_ON_BATCH_ERROR)) { diff --git a/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric-core/src/main/resources/symmetric-default.properties index 05de0257fd..e3d886cba7 100644 --- a/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric-core/src/main/resources/symmetric-default.properties @@ -972,7 +972,7 @@ routing.query.channels.first=true # Use a faster method of gap detection that uses the output of the work from router service # instead of querying for it. # -# DatabaseOverridable: true +# DatabaseOverridable: false # Tags: routing # Type: boolean routing.use.fast.gap.detector=true diff --git a/symmetric-core/src/test/java/org/jumpmind/symmetric/route/DataGapDetectorTest.java b/symmetric-core/src/test/java/org/jumpmind/symmetric/route/DataGapDetectorTest.java index debc84d496..4efe00ddb7 100644 --- a/symmetric-core/src/test/java/org/jumpmind/symmetric/route/DataGapDetectorTest.java +++ b/symmetric-core/src/test/java/org/jumpmind/symmetric/route/DataGapDetectorTest.java @@ -67,6 +67,7 @@ public class DataGapDetectorTest { IRouterService routerService; IStatisticManager statisticManager; INodeService nodeService; + DataGapFastDetector detector; @Before public void setUp() throws Exception { @@ -108,6 +109,8 @@ public void setUp() throws Exception { nodeService = mock(NodeService.class); when(nodeService.findIdentity()).thenReturn(new Node(NODE_ID, NODE_GROUP_ID)); + detector = newGapDetector(); + detector.setFullGapAnalysis(false); } protected DataGapFastDetector newGapDetector() { @@ -116,7 +119,7 @@ protected DataGapFastDetector newGapDetector() { protected void runGapDetector(List dataGaps, List dataIds, boolean isAllDataRead) { when(dataService.findDataGaps()).thenReturn(dataGaps); - DataGapFastDetector detector = newGapDetector(); + detector.beforeRouting(); detector.addDataIds(dataIds); detector.setIsAllDataRead(isAllDataRead); @@ -131,7 +134,6 @@ public List answer(InvocationOnMock invocation) { } }); - DataGapFastDetector detector = newGapDetector(); detector.beforeRouting(); detector.addDataIds(dataIds); detector.setIsAllDataRead(isAllDataRead); @@ -158,6 +160,7 @@ public void testNewGap() throws Exception { @Test public void testNewGapFull() throws Exception { + detector.setFullGapAnalysis(true); when(contextService.is(ContextConstants.ROUTING_FULL_GAP_ANALYSIS)).thenReturn(true); List dataIds = new ArrayList(); @@ -203,6 +206,7 @@ public void testTwoNewGaps() throws Exception { @Test public void testTwoNewGapsFull() throws Exception { + detector.setFullGapAnalysis(true); when(contextService.is(ContextConstants.ROUTING_FULL_GAP_ANALYSIS)).thenReturn(true); List dataIds = new ArrayList(); @@ -257,7 +261,8 @@ public void testGapInGap() throws Exception { } @Test - public void testGapInGapFull() throws Exception { + public void testGapInGapFull() throws Exception { + detector.setFullGapAnalysis(true); when(contextService.is(ContextConstants.ROUTING_FULL_GAP_ANALYSIS)).thenReturn(true); @SuppressWarnings("unchecked") @@ -408,6 +413,7 @@ public void testGapExpireOracleBusyChannel() throws Exception { @Test public void testGapsBeforeAndAfterFull() throws Exception { + detector.setFullGapAnalysis(true); when(contextService.is(ContextConstants.ROUTING_FULL_GAP_ANALYSIS)).thenReturn(true); List dataIds = new ArrayList(); dataIds.add(843L); @@ -505,6 +511,7 @@ public void testGapsOverlapThenData() throws Exception { @Test public void testGapsOverlapThenDataFull() throws Exception { + detector.setFullGapAnalysis(true); when(contextService.is(ContextConstants.ROUTING_FULL_GAP_ANALYSIS)).thenReturn(true); List dataIds = new ArrayList(); dataIds.add(30953883L);