diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java index 049b43a9a4..db505d8028 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java @@ -202,24 +202,38 @@ private long purgeOutgoingBatch(final Calendar time) { } private long purgeStrandedBatches() { - int updateStrandedBatchesCount = sqlTemplate.update(getSql("updateStrandedBatches"), - OutgoingBatch.Status.OK.name(), 1, OutgoingBatch.Status.OK.name()); - if (updateStrandedBatchesCount > 0) { - log.info( - "Set the status to {} for {} batches that no longer are associated with valid nodes", - OutgoingBatch.Status.OK.name(), updateStrandedBatchesCount); - statisticManager.incrementPurgedBatchOutgoingRows(updateStrandedBatchesCount); + int totalRowsPurged = 0; + log.info("Looking for old nodes in batches"); + List nodes = sqlTemplateDirty.query(getSql("selectNodesWithStrandedBatches"), + new StringMapper(), 1, OutgoingBatch.Status.OK.name()); + if (nodes.size() > 0) { + log.info("Found {} old nodes in batches", nodes.size()); + for (String nodeId : nodes) { + int rowsPurged = sqlTemplate.update(getSql("updateStrandedBatches"), + OutgoingBatch.Status.OK.name(), nodeId, OutgoingBatch.Status.OK.name()); + log.info("Set the status to {} for {} batches associated with node ID {}", + OutgoingBatch.Status.OK.name(), rowsPurged, nodeId); + totalRowsPurged += rowsPurged; + statisticManager.incrementPurgedBatchOutgoingRows(rowsPurged); + } } - updateStrandedBatchesCount = sqlTemplate.update(getSql("updateStrandedBatchesByChannel"), - OutgoingBatch.Status.OK.name(), OutgoingBatch.Status.OK.name()); - if (updateStrandedBatchesCount > 0) { - log.info("Set the status to {} for {} batches that no longer are associated with valid channels", - OutgoingBatch.Status.OK.name(), updateStrandedBatchesCount); - statisticManager.incrementPurgedBatchOutgoingRows(updateStrandedBatchesCount); + log.info("Looking for old channels in batches"); + List channels = sqlTemplateDirty.query(getSql("selectChannelsWithStrandedBatches"), + new StringMapper(), OutgoingBatch.Status.OK.name()); + if (channels.size() > 0) { + log.info("Found {} old channels in batches", channels.size()); + for (String channelId : channels) { + int rowsPurged = sqlTemplate.update(getSql("updateStrandedBatchesByChannel"), + OutgoingBatch.Status.OK.name(), channelId, OutgoingBatch.Status.OK.name()); + log.info("Set the status to {} for {} batches associated with channel ID {}", + OutgoingBatch.Status.OK.name(), rowsPurged, channelId); + totalRowsPurged += rowsPurged; + statisticManager.incrementPurgedBatchOutgoingRows(rowsPurged); + } } - - return updateStrandedBatchesCount; + + return totalRowsPurged; } private long purgeStrandedChannels() { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java index 3d44b68bc9..55f504e8b2 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java @@ -57,13 +57,15 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map replac putSql("selectDataRangeSql" , "select min(data_id) as min_id, max(data_id) as max_id from $(data) where data_id < (select max(data_id) from $(data)) " ); - putSql("updateStrandedBatches" , -"update $(outgoing_batch) set status=? where node_id not " + -" in (select node_id from $(node) where sync_enabled=?) and status != ? " ); + putSql("selectNodesWithStrandedBatches", "select distinct node_id from $(outgoing_batch) " + +"where node_id not in (select node_id from $(node) where sync_enabled = ?) and status != ?"); - putSql("updateStrandedBatchesByChannel", -"update $(outgoing_batch) set status=? where channel_id not " + -"in (select channel_id from $(channel)) and status != ?"); + putSql("updateStrandedBatches", "update $(outgoing_batch) set status=? where node_id=? and status != ?"); + + putSql("selectChannelsWithStrandedBatches", "select distinct channel_id from $(outgoing_batch) " + +"where channel_id not in (select channel_id from $(channel)) and status != ?"); + + putSql("updateStrandedBatchesByChannel", "update $(outgoing_batch) set status=? where channel_id=? and status != ?"); putSql("deleteStrandedData" , "delete from $(data) where " +