diff --git a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/SymmetricAdmin.java b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/SymmetricAdmin.java index 69ca3d5c59..d0974ee982 100644 --- a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/SymmetricAdmin.java +++ b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/SymmetricAdmin.java @@ -328,13 +328,13 @@ private void runPurge(CommandLine line, List args) { IPurgeService purgeService = getSymmetricEngine().getPurgeService(); boolean all = args.contains("all") || args.size() == 0; if (args.contains("outgoing") || all) { - purgeService.purgeOutgoing(); + purgeService.purgeOutgoing(true); } if (args.contains("incoming") || all) { - purgeService.purgeIncoming(); + purgeService.purgeIncoming(true); } if (args.contains("data-gaps") || all) { - purgeService.purgeDataGaps(); + purgeService.purgeDataGaps(true); } } diff --git a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java index f9b6370177..879e44c21f 100644 --- a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java +++ b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java @@ -159,7 +159,7 @@ public boolean invoke(boolean force) { .getRegistrationService() .isRegisteredWithServer())) { hasNotRegisteredMessageBeenLogged = false; - doJob(); + doJob(force); } else { if (!hasNotRegisteredMessageBeenLogged) { log.warn( @@ -202,7 +202,7 @@ public void run() { invoke(false); } - abstract void doJob() throws Exception; + abstract void doJob(boolean force) throws Exception; @ManagedOperation(description = "Pause this job") public void pause() { diff --git a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/DataGapPurgeJob.java b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/DataGapPurgeJob.java index 73e6981b33..d5c87cbea9 100644 --- a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/DataGapPurgeJob.java +++ b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/DataGapPurgeJob.java @@ -36,8 +36,8 @@ public DataGapPurgeJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSche } @Override - public void doJob() throws Exception { - engine.getPurgeService().purgeDataGaps(); + public void doJob(boolean force) throws Exception { + engine.getPurgeService().purgeDataGaps(force); } public String getClusterLockName() { diff --git a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/HeartbeatJob.java b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/HeartbeatJob.java index e206e1d774..8dd8c84602 100644 --- a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/HeartbeatJob.java +++ b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/HeartbeatJob.java @@ -36,7 +36,7 @@ public HeartbeatJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSchedul } @Override - public void doJob() throws Exception { + public void doJob(boolean force) throws Exception { if (engine.getClusterService().lock(getClusterLockName())) { try { engine.getDataService().heartbeat(false); diff --git a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/IncomingPurgeJob.java b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/IncomingPurgeJob.java index 66376d933e..563f268c30 100644 --- a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/IncomingPurgeJob.java +++ b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/IncomingPurgeJob.java @@ -36,8 +36,8 @@ public IncomingPurgeJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSch } @Override - public void doJob() throws Exception { - engine.getPurgeService().purgeIncoming(); + public void doJob(boolean force) throws Exception { + engine.getPurgeService().purgeIncoming(force); } public String getClusterLockName() { diff --git a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/OutgoingPurgeJob.java b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/OutgoingPurgeJob.java index f4e1e147c3..c020b5eae1 100644 --- a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/OutgoingPurgeJob.java +++ b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/OutgoingPurgeJob.java @@ -35,8 +35,8 @@ public OutgoingPurgeJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSch engine, taskScheduler); } @Override - public void doJob() throws Exception { - engine.getPurgeService().purgeOutgoing(); + public void doJob(boolean force) throws Exception { + engine.getPurgeService().purgeOutgoing(force); } public String getClusterLockName() { diff --git a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/PullJob.java b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/PullJob.java index c49482d445..cfdd42b564 100644 --- a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/PullJob.java +++ b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/PullJob.java @@ -34,8 +34,8 @@ public PullJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) { } @Override - public void doJob() throws Exception { - engine.getPullService().pullData(); + public void doJob(boolean force) throws Exception { + engine.getPullService().pullData(force); } public String getClusterLockName() { diff --git a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/PushJob.java b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/PushJob.java index 7b195bfcbd..9dffd5a18e 100644 --- a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/PushJob.java +++ b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/PushJob.java @@ -36,9 +36,9 @@ public PushJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) { } @Override - public void doJob() throws Exception { + public void doJob(boolean force) throws Exception { if (engine != null) { - engine.getPushService().pushData().getDataProcessedCount(); + engine.getPushService().pushData(force).getDataProcessedCount(); } } diff --git a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/RouterJob.java b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/RouterJob.java index c7b9287a95..8dab703317 100644 --- a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/RouterJob.java +++ b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/RouterJob.java @@ -36,8 +36,8 @@ public RouterJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) } @Override - void doJob() throws Exception { - engine.getRouterService().routeData(); + void doJob(boolean force) throws Exception { + engine.getRouterService().routeData(force); } public String getClusterLockName() { diff --git a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StageManagementJob.java b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StageManagementJob.java index e63e3ce119..83a03e3ebe 100644 --- a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StageManagementJob.java +++ b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StageManagementJob.java @@ -24,7 +24,7 @@ public boolean isClusterable() { } @Override - void doJob() throws Exception { + void doJob(boolean force) throws Exception { if (stagingManager != null) { stagingManager.clean(); } diff --git a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StatisticFlushJob.java b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StatisticFlushJob.java index 7d9731b456..b90402b4bd 100644 --- a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StatisticFlushJob.java +++ b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/StatisticFlushJob.java @@ -36,7 +36,7 @@ public StatisticFlushJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSc } @Override - public void doJob() throws Exception { + public void doJob(boolean force) throws Exception { engine.getStatisticManager().flush(); } diff --git a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/SyncTriggersJob.java b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/SyncTriggersJob.java index ee530d21a9..1bb88a39e7 100644 --- a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/SyncTriggersJob.java +++ b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/SyncTriggersJob.java @@ -36,7 +36,7 @@ public SyncTriggersJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSche } @Override - public void doJob() throws Exception { + public void doJob(boolean force) throws Exception { engine.getTriggerRouterService().syncTriggers(); } diff --git a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/WatchdogJob.java b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/WatchdogJob.java index 93eda4e429..7bd198e747 100644 --- a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/WatchdogJob.java +++ b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/job/WatchdogJob.java @@ -37,7 +37,7 @@ public WatchdogJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskSchedule } @Override - public void doJob() throws Exception { + public void doJob(boolean force) throws Exception { if (engine.getClusterService().lock(ClusterConstants.WATCHDOG)) { synchronized (this) { try { diff --git a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/service/jmx/NodeManagementService.java b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/service/jmx/NodeManagementService.java index 8539e4da50..f3f32dec69 100644 --- a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/service/jmx/NodeManagementService.java +++ b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/service/jmx/NodeManagementService.java @@ -100,7 +100,7 @@ public void stop() { @ManagedOperation(description = "Run the purge process") public void purge() { - engine.getPurgeService().purgeOutgoing(); + engine.getPurgeService().purgeOutgoing(true); } @ManagedOperation(description = "Force the channel settings to be read from the database") diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java index e7d267c7cf..2672ea5615 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java @@ -519,7 +519,7 @@ public String sendSQL(String nodeId, String catalogName, String schemaName, Stri public RemoteNodeStatuses push() { MDC.put("engineName", getEngineName()); - return pushService.pushData(); + return pushService.pushData(true); } public void syncTriggers() { @@ -533,19 +533,19 @@ public NodeStatus getNodeStatus() { public RemoteNodeStatuses pull() { MDC.put("engineName", getEngineName()); - return pullService.pullData(); + return pullService.pullData(true); } public void route() { MDC.put("engineName", getEngineName()); - routerService.routeData(); + routerService.routeData(true); } public void purge() { MDC.put("engineName", getEngineName()); - purgeService.purgeOutgoing(); - purgeService.purgeIncoming(); - purgeService.purgeDataGaps(); + purgeService.purgeOutgoing(true); + purgeService.purgeIncoming(true); + purgeService.purgeDataGaps(true); } public boolean isConfigured() { @@ -621,7 +621,7 @@ public boolean isConfigured() { public void heartbeat(boolean force) { MDC.put("engineName", getEngineName()); - dataService.heartbeat(force); + dataService.heartbeat(true); } public void openRegistration(String nodeGroupId, String externalId) { diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java index 655f43fbf8..0a046c9576 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java @@ -104,7 +104,7 @@ public interface ISymmetricEngine { /** * Will perform a push the same way the {@link PushJob} would have. * - * @see IPushService#pushData() + * @see IPushService#pushData(boolean) * @return {@link RemoteNodeStatuses} */ public RemoteNodeStatuses push(); @@ -126,7 +126,7 @@ public interface ISymmetricEngine { /** * Will perform a pull the same way the {@link PullJob} would have. * - * @see IPullService#pullData() + * @see IPullService#pullData(boolean) * @return {@link RemoteNodeStatuses} */ public RemoteNodeStatuses pull(); @@ -140,7 +140,7 @@ public interface ISymmetricEngine { * This can be called to do a purge. It may be called only if the * {@link OutgoingPurgeJob} has not been enabled. * - * @see IPurgeService#purgeOutgoing() + * @see IPurgeService#purgeOutgoing(boolean) */ public void purge(); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ChannelRouterContext.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ChannelRouterContext.java index 2b055978e9..3bababb4c8 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ChannelRouterContext.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ChannelRouterContext.java @@ -57,8 +57,7 @@ public class ChannelRouterContext extends SimpleRouterContext { private ISqlTransaction sqlTransaction; private boolean needsCommitted = false; private long createdTimeInMs = System.currentTimeMillis(); - private long lastDataIdProcessed; - private Map transactionIdDataIds = new HashMap(); + private Data lastDataProcessed; private List dataEventsToSend = new ArrayList(); private boolean produceCommonBatches = false; @@ -109,7 +108,7 @@ public void rollback() { try { sqlTransaction.rollback(); } catch (SqlException e) { - log.warn(e.getMessage(),e); + log.warn(e.getMessage(), e); } finally { clearState(); } @@ -119,7 +118,7 @@ public void cleanup() { try { this.sqlTransaction.commit(); } catch (Exception ex) { - log.warn(ex.getMessage(),ex); + log.warn(ex.getMessage(), ex); } finally { this.sqlTransaction.close(); } @@ -149,35 +148,24 @@ public long getCreatedTimeInMs() { return createdTimeInMs; } - public void setLastDataIdForTransactionId(Data data) { - if (data.getTransactionId() != null) { - this.transactionIdDataIds.put(data.getTransactionId(), data.getDataId()); - } - } - - public void recordTransactionBoundaryEncountered(Data data) { - Long dataId = transactionIdDataIds.get(data.getTransactionId()); - setEncountedTransactionBoundary(dataId == null ? true : dataId == data.getDataId()); + public void setLastDataProcessed(Data lastDataProcessed) { + this.lastDataProcessed = lastDataProcessed; } - - public void setLastDataIdProcessed(long lastDataIdProcessed) { - this.lastDataIdProcessed = lastDataIdProcessed; + + public Data getLastDataProcessed() { + return lastDataProcessed; } - public long getLastDataIdProcessed() { - return lastDataIdProcessed; - } - public ISqlTransaction getSqlTransaction() { return sqlTransaction; } - + public void setProduceCommonBatches(boolean defaultRoutersOnly) { this.produceCommonBatches = defaultRoutersOnly; } - + public boolean isProduceCommonBatches() { return produceCommonBatches; } - + } diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java index 0405d0b0d0..cf79ad8f7d 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java @@ -279,7 +279,6 @@ protected boolean fillPeekAheadQueue(List peekAheadQueue, int peekAheadCou Data data = cursor.next(); if (data != null) { if (process(data)) { - context.setLastDataIdForTransactionId(data); peekAheadQueue.add(data); dataCount++; context.incrementStat(System.currentTimeMillis() - ts, diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPullService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPullService.java index 38b612f01a..ae4e60b54b 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPullService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPullService.java @@ -31,6 +31,6 @@ */ public interface IPullService extends IOfflineDetectorService { - public RemoteNodeStatuses pullData(); + public RemoteNodeStatuses pullData(boolean force); } \ No newline at end of file diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPurgeService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPurgeService.java index bb8d61d0a9..1ab54cccd0 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPurgeService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPurgeService.java @@ -30,17 +30,17 @@ */ public interface IPurgeService { - public long purgeOutgoing(); + public long purgeOutgoing(boolean force); - public long purgeIncoming(); + public long purgeIncoming(boolean force); - public long purgeDataGaps(); + public long purgeDataGaps(boolean force); - public long purgeDataGaps(Calendar retentionCutoff); + public long purgeDataGaps(Calendar retentionCutoff, boolean force); - public long purgeOutgoing(Calendar retentionCutoff); + public long purgeOutgoing(Calendar retentionCutoff, boolean force); - public long purgeIncoming(Calendar retentionCutoff); + public long purgeIncoming(Calendar retentionCutoff, boolean force); public void purgeAllIncomingEventsForNode(String nodeId); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPushService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPushService.java index a272ed1db7..a187146249 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPushService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPushService.java @@ -37,10 +37,11 @@ public interface IPushService extends IOfflineDetectorService { /** * Attempt to push data, if any has been captured, to nodes that the * captured data is targeted for. + * @param force TODO * * @return RemoteNodeStatuses the status of the push attempt(s) */ - public RemoteNodeStatuses pushData(); + public RemoteNodeStatuses pushData(boolean force); public Map getStartTimesOfNodesBeingPushedTo(); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IRouterService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IRouterService.java index 0972e9b62d..673fcbdbc3 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IRouterService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IRouterService.java @@ -38,7 +38,7 @@ */ public interface IRouterService extends IService { - public long routeData(); + public long routeData(boolean force); public long getUnroutedDataCount(); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index cf2ae50408..2a377bc8a9 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -285,7 +285,7 @@ public List extract(Node targetNode, IOutgoingTransport targetTra // make sure that data is routed before extracting if the route // job is not configured to start automatically if (!parameterService.is(ParameterConstants.START_ROUTE_JOB)) { - routerService.routeData(); + routerService.routeData(true); } OutgoingBatches batches = outgoingBatchService.getOutgoingBatches(targetNode, false); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java index d85cc6701a..b851cdb0c0 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java @@ -229,6 +229,7 @@ public void run() { boolean failed = false; try { executor.execute(nodeCommunication, status); + failed = status.failed(); } catch (Throwable ex) { failed = true; log.error(String.format("Failed to execute %s for node %s", diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java index 0aaac9e907..84514ffe61 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java @@ -74,11 +74,11 @@ public PullService(IParameterService parameterService, ISymmetricDialect symmetr this.dataLoaderService = dataLoaderService; } - synchronized public RemoteNodeStatuses pullData() { + synchronized public RemoteNodeStatuses pullData(boolean force) { final RemoteNodeStatuses statuses = new RemoteNodeStatuses(); Node identity = nodeService.findIdentity(false); if (identity == null || identity.isSyncEnabled()) { - if (clusterService.lock(ClusterConstants.PULL)) { + if (force || clusterService.lock(ClusterConstants.PULL)) { try { // register if we haven't already been registered registrationService.registerWithServer(); @@ -97,7 +97,9 @@ synchronized public RemoteNodeStatuses pullData() { } } } finally { - clusterService.unlock(ClusterConstants.PULL); + if (!force) { + clusterService.unlock(ClusterConstants.PULL); + } } } else { log.info("Did not run the pull process because the cluster service has it locked"); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java index 1894488c0d..2f1e3a5256 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java @@ -63,44 +63,46 @@ public PurgeService(IParameterService parameterService, ISymmetricDialect symmet createSqlReplacementTokens())); } - public long purgeOutgoing() { + public long purgeOutgoing(boolean force) { long rowsPurged = 0; Calendar retentionCutoff = Calendar.getInstance(); retentionCutoff.add(Calendar.MINUTE, -parameterService.getInt(ParameterConstants.PURGE_RETENTION_MINUTES)); - rowsPurged += purgeOutgoing(retentionCutoff); + rowsPurged += purgeOutgoing(retentionCutoff, force); return rowsPurged; } - public long purgeIncoming() { + public long purgeIncoming(boolean force) { long rowsPurged = 0; Calendar retentionCutoff = Calendar.getInstance(); retentionCutoff.add(Calendar.MINUTE, -parameterService.getInt(ParameterConstants.PURGE_RETENTION_MINUTES)); - rowsPurged += purgeIncoming(retentionCutoff); + rowsPurged += purgeIncoming(retentionCutoff, force); return rowsPurged; } - public long purgeDataGaps() { + public long purgeDataGaps(boolean force) { long rowsPurged = 0; Calendar retentionCutoff = Calendar.getInstance(); retentionCutoff.add(Calendar.MINUTE, -parameterService .getInt(ParameterConstants.ROUTING_DATA_READER_TYPE_GAP_RETENTION_MINUTES)); - rowsPurged += purgeDataGaps(retentionCutoff); + rowsPurged += purgeDataGaps(retentionCutoff, force); return rowsPurged; } - public long purgeDataGaps(Calendar retentionCutoff) { + public long purgeDataGaps(Calendar retentionCutoff, boolean force) { long rowsPurged = -1l; try { - if (clusterService.lock(ClusterConstants.PURGE_DATA_GAPS)) { + if (force || clusterService.lock(ClusterConstants.PURGE_DATA_GAPS)) { try { log.info("The data gap purge process is about to run"); rowsPurged = sqlTemplate.update(getSql("deleteFromDataGapsSql"), new Object[] { retentionCutoff.getTime(), DataGap.Status.GP.name() }); log.info("Purged {} data gap rows", rowsPurged); } finally { - clusterService.unlock(ClusterConstants.PURGE_DATA_GAPS); + if (!force) { + clusterService.unlock(ClusterConstants.PURGE_DATA_GAPS); + } log.info("The data gap purge process has completed"); } @@ -113,10 +115,10 @@ public long purgeDataGaps(Calendar retentionCutoff) { return rowsPurged; } - public long purgeOutgoing(Calendar retentionCutoff) { + public long purgeOutgoing(Calendar retentionCutoff, boolean force) { long rowsPurged = 0; try { - if (clusterService.lock(ClusterConstants.PURGE_OUTGOING)) { + if (force || clusterService.lock(ClusterConstants.PURGE_OUTGOING)) { try { log.info("The outgoing purge process is about to run for data older than {}", SimpleDateFormat.getDateTimeInstance() @@ -125,7 +127,9 @@ public long purgeOutgoing(Calendar retentionCutoff) { rowsPurged += purgeDataRows(retentionCutoff); rowsPurged += purgeOutgoingBatch(retentionCutoff); } finally { - clusterService.unlock(ClusterConstants.PURGE_OUTGOING); + if (!force) { + clusterService.unlock(ClusterConstants.PURGE_OUTGOING); + } log.info("The outgoing purge process has completed"); } } else { @@ -244,15 +248,17 @@ private int purgeByMinMax(long[] minMax, MinMaxDeleteSql identifier, Date retent return totalCount; } - public long purgeIncoming(Calendar retentionCutoff) { + public long purgeIncoming(Calendar retentionCutoff, boolean force) { long purgedRowCount = 0; try { - if (clusterService.lock(ClusterConstants.PURGE_INCOMING)) { + if (force || clusterService.lock(ClusterConstants.PURGE_INCOMING)) { try { log.info("The incoming purge process is about to run"); purgedRowCount = purgeIncomingBatch(retentionCutoff); } finally { - clusterService.unlock(ClusterConstants.PURGE_INCOMING); + if (!force) { + clusterService.unlock(ClusterConstants.PURGE_INCOMING); + } log.info("The incoming purge process has completed"); } } else { diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java index 8a8b25c91a..906ea96159 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java @@ -93,12 +93,12 @@ public Map getStartTimesOfNodesBeingPushedTo() { return new HashMap(startTimesOfNodesBeingPushedTo); } - synchronized public RemoteNodeStatuses pushData() { + synchronized public RemoteNodeStatuses pushData(boolean force) { RemoteNodeStatuses statuses = new RemoteNodeStatuses(); Node identity = nodeService.findIdentity(false); if (identity != null && identity.isSyncEnabled()) { - if (clusterService.lock(ClusterConstants.PUSH)) { + if (force || clusterService.lock(ClusterConstants.PUSH)) { try { NodeSecurity identitySecurity = nodeService.findNodeSecurity(identity .getNodeId()); @@ -119,7 +119,9 @@ synchronized public RemoteNodeStatuses pushData() { identity.getNodeId()); } } finally { - clusterService.unlock(ClusterConstants.PUSH); + if (!force) { + clusterService.unlock(ClusterConstants.PUSH); + } } } else { log.info("Did not run the push process because the cluster service has it locked"); @@ -150,7 +152,7 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status new Object[] { node, status.getDataProcessed(), status.getBatchesProcessed() }); } else if (status.failed()) { - log.warn("There was an error while pushing data to the server"); + log.warn("There was an error while pushing data to the server"); } log.debug("Push completed for {}", node); pushCount++; diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java index 4757289a3e..9dc354fd0d 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java @@ -133,9 +133,9 @@ public synchronized void stop() { /** * This method will route data to specific nodes. */ - synchronized public long routeData() { + synchronized public long routeData(boolean force) { long dataCount = -1l; - if (engine.getClusterService().lock(ClusterConstants.ROUTE)) { + if (force || engine.getClusterService().lock(ClusterConstants.ROUTE)) { try { engine.getOutgoingBatchService().updateAbandonedRoutingBatches(); insertInitialLoadEvents(); @@ -150,7 +150,9 @@ synchronized public long routeData() { log.info("Routed {} data events in {} ms", dataCount, ts); } } finally { - engine.getClusterService().unlock(ClusterConstants.ROUTE); + if (!force) { + engine.getClusterService().unlock(ClusterConstants.ROUTE); + } } } return dataCount; @@ -260,7 +262,6 @@ protected int routeDataForChannel(final NodeChannel nodeChannel, final Node sour long ts = System.currentTimeMillis(); int dataCount = -1; try { - context = new ChannelRouterContext(sourceNode.getNodeId(), nodeChannel, symmetricDialect.getPlatform().getSqlTemplate().startSqlTransaction()); context.setProduceCommonBatches(produceCommonBatches); @@ -271,7 +272,7 @@ protected int routeDataForChannel(final NodeChannel nodeChannel, final Node sour if (context != null) { context.rollback(); } - return 0; + return 0; } catch (Exception ex) { log.error( String.format("Failed to route and batch data on '%s' channel", @@ -290,12 +291,13 @@ protected int routeDataForChannel(final NodeChannel nodeChannel, final Node sour completeBatchesAndCommit(context); context.incrementStat(System.currentTimeMillis() - insertTs, ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS); - if (context.getLastDataIdProcessed() > 0) { + Data lastDataProcessed = context.getLastDataProcessed(); + if (lastDataProcessed != null && lastDataProcessed.getDataId() > 0) { String channelId = nodeChannel.getChannelId(); long queryTs = System.currentTimeMillis(); long dataLeftToRoute = sqlTemplate.queryForInt( getSql("selectUnroutedCountForChannelSql"), channelId, - context.getLastDataIdProcessed()); + lastDataProcessed.getDataId()); queryTs = System.currentTimeMillis() - queryTs; if (queryTs > Constants.LONG_OPERATION_THRESHOLD) { log.warn("Unrouted query for channel {} took {} ms", channelId, queryTs); @@ -405,6 +407,7 @@ public Thread newThread(Runnable r) { protected int selectDataAndRoute(ChannelRouterContext context) throws InterruptedException { IDataToRouteReader reader = startReading(context); Data data = null; + Data nextData = null; int totalDataCount = 0; int totalDataEventCount = 0; int statsDataCount = 0; @@ -412,39 +415,53 @@ protected int selectDataAndRoute(ChannelRouterContext context) throws Interrupte final int maxNumberOfEventsBeforeFlush = parameterService .getInt(ParameterConstants.ROUTING_FLUSH_JDBC_BATCH_SIZE); try { + nextData = reader.take(); do { - data = reader.take(); - if (data != null) { - context.setLastDataIdProcessed(data.getDataId()); - statsDataCount++; - totalDataCount++; - int dataEventsInserted = routeData(data, context); - statsDataEventCount += dataEventsInserted; - totalDataEventCount += dataEventsInserted; - long insertTs = System.currentTimeMillis(); - try { - if (maxNumberOfEventsBeforeFlush <= context.getDataEventList().size() - || context.isNeedsCommitted()) { - engine.getDataService().insertDataEvents(context.getSqlTransaction(), - context.getDataEventList()); - context.clearDataEventsList(); - } - if (context.isNeedsCommitted()) { - completeBatchesAndCommit(context); + if (nextData != null) { + data = nextData; + nextData = reader.take(); + if (data != null) { + boolean atTransactionBoundary = false; + if (nextData != null) { + String nextTxId = nextData.getTransactionId(); + atTransactionBoundary = nextTxId == null + || !nextTxId.equals(data.getTransactionId()); } - } finally { - context.incrementStat(System.currentTimeMillis() - insertTs, - ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS); - - if (statsDataCount > StatisticConstants.FLUSH_SIZE_ROUTER_DATA) { - engine.getStatisticManager().incrementDataRouted( - context.getChannel().getChannelId(), statsDataCount); - statsDataCount = 0; - engine.getStatisticManager().incrementDataEventInserted( - context.getChannel().getChannelId(), statsDataEventCount); - statsDataEventCount = 0; + context.setEncountedTransactionBoundary(atTransactionBoundary); + statsDataCount++; + totalDataCount++; + int dataEventsInserted = routeData(data, context); + statsDataEventCount += dataEventsInserted; + totalDataEventCount += dataEventsInserted; + long insertTs = System.currentTimeMillis(); + try { + if (maxNumberOfEventsBeforeFlush <= context.getDataEventList().size() + || context.isNeedsCommitted()) { + engine.getDataService().insertDataEvents( + context.getSqlTransaction(), context.getDataEventList()); + context.clearDataEventsList(); + } + if (context.isNeedsCommitted()) { + completeBatchesAndCommit(context); + } + } finally { + context.incrementStat(System.currentTimeMillis() - insertTs, + ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS); + + if (statsDataCount > StatisticConstants.FLUSH_SIZE_ROUTER_DATA) { + engine.getStatisticManager().incrementDataRouted( + context.getChannel().getChannelId(), statsDataCount); + statsDataCount = 0; + engine.getStatisticManager().incrementDataEventInserted( + context.getChannel().getChannelId(), statsDataEventCount); + statsDataEventCount = 0; + } } + + context.setLastDataProcessed(data); } + } else { + data = null; } } while (data != null); @@ -466,7 +483,6 @@ protected int selectDataAndRoute(ChannelRouterContext context) throws Interrupte protected int routeData(Data data, ChannelRouterContext context) { int numberOfDataEventsInserted = 0; - context.recordTransactionBoundaryEncountered(data); List triggerRouters = getTriggerRoutersForData(data); if (triggerRouters != null && triggerRouters.size() > 0) { for (TriggerRouter triggerRouter : triggerRouters) { @@ -485,7 +501,7 @@ protected int routeData(Data data, ChannelRouterContext context) { ChannelRouterContext.STAT_DATA_ROUTER_MS); if (nodeIds != null) { - // never need to + // should never route to self nodeIds.remove(engine.getNodeService().findIdentityNodeId()); if (!triggerRouter.isPingBackEnabled() && data.getSourceNodeId() != null) { nodeIds.remove(data.getSourceNodeId()); diff --git a/symmetric/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric/symmetric-core/src/main/resources/symmetric-default.properties index 6a2339a1a6..8691a5c101 100644 --- a/symmetric/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric/symmetric-core/src/main/resources/symmetric-default.properties @@ -594,11 +594,6 @@ routing.flush.jdbc.batch.size=50000 # Tags: routing routing.stale.dataid.gap.time.ms=7200000 -# This is the type of algorithm that will be used by SymmetricDS to select captured data for routing. -# The two possible values are ref and gap. -# Tags: routing -routing.data.reader.type=gap - # Tags: routing routing.data.reader.type.gap.retention.period.minutes=1440 diff --git a/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractRouterServiceTest.java b/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractRouterServiceTest.java index 00f411cda0..6dcd340c7a 100644 --- a/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractRouterServiceTest.java +++ b/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractRouterServiceTest.java @@ -54,14 +54,13 @@ public void testMultiChannelRoutingToEveryone() { TestConstants.TEST_CHANNEL_ID_OTHER, false); Assert.assertEquals(50, testChannel.getMaxBatchSize()); Assert.assertEquals(1, otherChannel.getMaxBatchSize()); - // should be 1 batch for table 1 on the testchannel w/ max batch size of - // 50 + // should be 1 batch for table 1 on the testchannel w/ max batch size of 50 insert(TEST_TABLE_1, 5, false); // this should generate 15 batches because the max batch size is 1 insert(TEST_TABLE_2, 15, false); insert(TEST_TABLE_1, 50, true); - getRouterService().routeData(); + getRouterService().routeData(true); final int EXPECTED_BATCHES = getDbDialect().supportsTransactionId() ? 16 : 17; @@ -91,7 +90,7 @@ public void testMultiChannelRoutingToEveryone() { // the batch is transactional insert(TEST_TABLE_2, 15, true); insert(TEST_TABLE_1, 50, false); - getRouterService().routeData(); + getRouterService().routeData(true); batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false); filterForChannels(batches, testChannel, otherChannel); @@ -127,7 +126,7 @@ public void testLookupTableRouting() { getTriggerRouterService().saveTriggerRouter(triggerRouter); getTriggerRouterService().syncTriggers(); - getRouterService().routeData(); + getRouterService().routeData(true); resetBatches(); @@ -135,7 +134,7 @@ public void testLookupTableRouting() { int unroutedCount = countUnroutedBatches(); - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals( 1, @@ -151,7 +150,7 @@ public void testLookupTableRouting() { insert(TEST_TABLE_1, 5, true, null, "B"); - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals( 1, @@ -167,7 +166,7 @@ public void testLookupTableRouting() { insert(TEST_TABLE_1, 10, true, null, "C"); - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals( 1, @@ -183,7 +182,7 @@ public void testLookupTableRouting() { insert(TEST_TABLE_1, 5, true, null, "D"); - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals( 1, @@ -205,7 +204,7 @@ public void testLookupTableRouting() { insert(TEST_TABLE_1, 1, true, null, "F"); - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals( 0, @@ -240,7 +239,7 @@ public void testColumnMatchTransactionalOnlyRoutingToNode1() { // should be 51 batches for table 1 insert(TEST_TABLE_1, 500, true); insert(TEST_TABLE_1, 50, false); - getRouterService().routeData(); + getRouterService().routeData(true); final int EXPECTED_BATCHES = getDbDialect().supportsTransactionId() ? 51 : 550; @@ -273,7 +272,7 @@ public void testColumnMatchTransactionalOnlyRoutingToNode1() { countBatchesForChannel( getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals( getDbDialect().supportsTransactionId() ? 1 : 705, countBatchesForChannel( @@ -303,7 +302,7 @@ public void testSubSelectNonTransactionalRoutingToNode1() { // should be 100 batches for table 1, even though we committed the // changes as part of a transaction insert(TEST_TABLE_1, 500, true); - getRouterService().routeData(); + getRouterService().routeData(true); final int EXPECTED_BATCHES = 100; @@ -347,7 +346,7 @@ public void testSyncIncomingBatch() throws Exception { insert(TEST_TABLE_1, 10, true, NODE_GROUP_NODE_1.getNodeId()); - getRouterService().routeData(); + getRouterService().routeData(true); OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false); @@ -399,7 +398,7 @@ public void testLargeNumberOfEventsToManyNodes() { logger.info(String.format("Done inserting %s rows", ROWS_TO_INSERT)); logger.info("About to route data"); - getRouterService().routeData(); + getRouterService().routeData(true); logger.info("Done routing data"); } @@ -439,7 +438,7 @@ public void testBshTransactionalRoutingOnUpdate() { logger.info("Just recorded a change to " + count + " rows in " + TEST_TABLE_1 + " in " + (System.currentTimeMillis() - ts) + "ms"); ts = System.currentTimeMillis(); - getRouterService().routeData(); + getRouterService().routeData(true); logger.info("Just routed " + count + " rows in " + TEST_TABLE_1 + " in " + (System.currentTimeMillis() - ts) + "ms"); @@ -485,7 +484,7 @@ public void testBshRoutingDeletesToNode3() { getConfigurationService().saveChannel(testChannel, true); int count = getSqlTemplate().update(String.format("delete from %s", TEST_TABLE_1)); - getRouterService().routeData(); + getRouterService().routeData(true); OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3, false); @@ -532,7 +531,7 @@ public void testColumnMatchSubtableRoutingToNode1() { getTriggerRouterService().syncTriggers(); insert(TEST_TABLE_1, 1, true); - getRouterService().routeData(); + getRouterService().routeData(true); resetBatches(); int pk = getSqlTemplate().queryForInt( @@ -546,7 +545,7 @@ public void testColumnMatchSubtableRoutingToNode1() { getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals( 1, @@ -590,7 +589,7 @@ public void testColumnMatchSubtableRoutingToNode1() { getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3, false), testChannel)); - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals( 1, countBatchesForChannel( @@ -633,7 +632,7 @@ public void testColumnMatchOnNull() { getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals( 0, @@ -651,7 +650,7 @@ public void testColumnMatchOnNull() { getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals( 1, @@ -683,7 +682,7 @@ public void testColumnMatchOnNotNull() { getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals( 1, @@ -701,7 +700,7 @@ public void testColumnMatchOnNotNull() { getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals( 0, @@ -736,7 +735,7 @@ public void testSyncOnColumnChange() { testChannel)); insert(TEST_TABLE_1, 1, true); - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals( 0, @@ -751,7 +750,7 @@ public void testSyncOnColumnChange() { + NODE_GROUP_NODE_1.getNodeId() + "'"); getSqlTemplate().update("update " + TEST_TABLE_1 + " set ROUTING_INT=1 where PK=?", pk); - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals( 1, @@ -763,7 +762,7 @@ public void testSyncOnColumnChange() { getSqlTemplate().update("update " + TEST_TABLE_1 + " set ROUTING_INT=1 where PK=?", new Object[] { pk }); - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals( 0, @@ -775,7 +774,7 @@ public void testSyncOnColumnChange() { getSqlTemplate().update("update " + TEST_TABLE_1 + " set ROUTING_INT=10 where PK=?", new Object[] { pk }); - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals( 1, @@ -808,7 +807,7 @@ public void testSyncIncomingBatchWhenUnrouted() throws Exception { int unroutedCount = countUnroutedBatches(); - getRouterService().routeData(); + getRouterService().routeData(true); OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false); @@ -832,7 +831,7 @@ public void testDefaultRouteToTargetNodeGroupOnly() throws Exception { insert(TEST_TABLE_1, 1, true); - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals(1, getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false).getBatches() @@ -865,16 +864,16 @@ public void testGapRouting() throws Exception { Assert.assertEquals(1, getDataService().findDataGaps().size()); // route again to make sure we still only have one gap - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals(1, getDataService().findDataGaps().size()); insertGaps(2, 1, 2); - getRouterService().routeData(); + getRouterService().routeData(true); // route again to calculate gaps - getRouterService().routeData(); + getRouterService().routeData(true); Assert.assertEquals(1, getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false) @@ -887,8 +886,8 @@ public void testGapRouting() throws Exception { Assert.assertEquals(0, gap.getEndId() - gap.getStartId()); // route again to make sure the gaps don't disappear - getRouterService().routeData(); - getRouterService().routeData(); + getRouterService().routeData(true); + getRouterService().routeData(true); gaps = getDataService().findDataGaps(); Assert.assertEquals(2, gaps.size()); @@ -1074,7 +1073,7 @@ public void testDontSelectOldDataDuringRouting() throws Exception { // clean setup deleteAll(TEST_TABLE_1); insert(TEST_TABLE_1, 100, true); - getRouterService().routeData(); + getRouterService().routeData(true); resetBatches(); // delete diff --git a/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractServiceTest.java b/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractServiceTest.java index 84cea9b0a1..416849d2ae 100644 --- a/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractServiceTest.java +++ b/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractServiceTest.java @@ -222,9 +222,9 @@ protected void assertNumberOfLinesThatStartWith(int expected, String startsWith, protected void routeAndCreateGaps() { // one to route unrouted data - getRouterService().routeData(); + getRouterService().routeData(true); // one to create gaps - getRouterService().routeData(); + getRouterService().routeData(true); } protected void resetGaps() {