diff --git a/symmetric-assemble/src/asciidoc/manage/node-add.ad b/symmetric-assemble/src/asciidoc/manage/node-add.ad index b6e0af764c..ad20ad7e88 100644 --- a/symmetric-assemble/src/asciidoc/manage/node-add.ad +++ b/symmetric-assemble/src/asciidoc/manage/node-add.ad @@ -1,6 +1,4 @@ -Multiple nodes can be hosted in a single SymmetricDS instance. SymmetricDS will start a node for each properties file it finds in the engines directory. -Multiple nodes can be hosted in a single SymmetricDS instance. -SymmetricDS will start a node for each properties file it finds in the engines directory. +Multiple nodes can be hosted in a single SymmetricDS instance. SymmetricDS will start a node for each properties file it finds in the engines directory. ifndef::pro[] Additional nodes can be added to the same SymmetricDS instance that the master node is running in or they can be added to a different diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java index 567adead64..5b5327f0c5 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java @@ -29,6 +29,7 @@ import java.util.Date; import java.util.List; +import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateUtils; import org.jumpmind.db.platform.DatabaseNamesConstants; import org.jumpmind.db.sql.ISqlReadCursor; @@ -364,9 +365,8 @@ private long purgeDataRows(final Calendar time) { if (startDataId == 0) { startDataId = sqlTemplateDirty.queryForLong(getSql("minDataId")); } - long lastBatchId = contextService.getLong(ContextConstants.PURGE_LAST_BATCH_ID); - long endDataId = sqlTemplateDirty.queryForLong(getSql("maxDataId"), lastBatchId, new Timestamp(time.getTime().getTime())); - long[] minMax = { startDataId, endDataId }; + long[] minMax = { startDataId, getMaxDataIdEligibleToPurge(time) }; + log.info("Found range for data of {} through {}", minMax[0], minMax[1]); long minGapStartId = sqlTemplateDirty.queryForLong(getSql("minDataGapStartId")); int maxNumOfDataIdsToPurgeInTx = parameterService.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS); @@ -406,7 +406,28 @@ private long purgeDataRows(final Calendar time) { return dataDeletedCount; } - + + private long getMaxDataIdEligibleToPurge(Calendar time) { + long lastBatchId = contextService.getLong(ContextConstants.PURGE_LAST_BATCH_ID); + long maxDataId = 0; + List batchIds = sqlTemplateDirty.query(getSql("maxBatchIdByChannel"), new LongMapper(), + new Object[] { lastBatchId, new Timestamp(time.getTime().getTime()) }, + new int[] { symmetricDialect.getSqlTypeForIds(), Types.TIMESTAMP }); + + if (batchIds != null && batchIds.size() > 0) { + String sql = getSql("maxDataIdForBatches").replace("?", StringUtils.repeat("?", ",", batchIds.size())); + int[] types = new int[batchIds.size()]; + for (int i = 0; i < batchIds.size(); i++) { + types[i] = symmetricDialect.getSqlTypeForIds(); + } + List ids = sqlTemplateDirty.query(sql, new LongMapper(), batchIds.toArray(), types); + if (ids != null && ids.size() > 0) { + maxDataId = ids.get(0); + } + } + return maxDataId; + } + private long purgeStranded(final Calendar time) { log.info("Getting range for stranded data events"); int maxNumOfDataEventsToPurgeInTx = parameterService.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_EVENT_BATCH_IDS); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java index d0391e50e1..86c97c9c5a 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java @@ -56,8 +56,9 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map replac putSql("minDataId", "select min(data_id) from $(data)"); - putSql("maxDataId", "select max(data_id) from $(data_event) where batch_id in (select max(batch_id) from $(outgoing_batch) " + - "where batch_id > ? and create_time < ? group by channel_id)"); + putSql("maxBatchIdByChannel", "select max(batch_id) from $(outgoing_batch) where batch_id > ? and create_time < ? group by channel_id"); + + putSql("maxDataIdForBatches", "select max(data_id) from $(data_event) where batch_id in (?)"); putSql("selectNodesWithStrandedBatches", "select distinct node_id from $(outgoing_batch) " + "where node_id not in (select node_id from $(node) where sync_enabled = ?) and status != ?");