Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Merge remote-tracking branch 'origin/3.11' into 3.12
  • Loading branch information
erilong committed Jun 11, 2020
2 parents 6efd03c + 3b3e124 commit 1903968
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 9 deletions.
4 changes: 1 addition & 3 deletions symmetric-assemble/src/asciidoc/manage/node-add.ad
@@ -1,6 +1,4 @@
Multiple nodes can be hosted in a single SymmetricDS instance. SymmetricDS will start a node for each properties file it finds in the engines directory.
Multiple nodes can be hosted in a single SymmetricDS instance.
SymmetricDS will start a node for each properties file it finds in the engines directory.
Multiple nodes can be hosted in a single SymmetricDS instance. SymmetricDS will start a node for each properties file it finds in the engines directory.

ifndef::pro[]
Additional nodes can be added to the same SymmetricDS instance that the master node is running in or they can be added to a different
Expand Down
Expand Up @@ -29,6 +29,7 @@
import java.util.Date;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils;
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.sql.ISqlReadCursor;
Expand Down Expand Up @@ -364,9 +365,8 @@ private long purgeDataRows(final Calendar time) {
if (startDataId == 0) {
startDataId = sqlTemplateDirty.queryForLong(getSql("minDataId"));
}
long lastBatchId = contextService.getLong(ContextConstants.PURGE_LAST_BATCH_ID);
long endDataId = sqlTemplateDirty.queryForLong(getSql("maxDataId"), lastBatchId, new Timestamp(time.getTime().getTime()));
long[] minMax = { startDataId, endDataId };
long[] minMax = { startDataId, getMaxDataIdEligibleToPurge(time) };
log.info("Found range for data of {} through {}", minMax[0], minMax[1]);

long minGapStartId = sqlTemplateDirty.queryForLong(getSql("minDataGapStartId"));
int maxNumOfDataIdsToPurgeInTx = parameterService.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
Expand Down Expand Up @@ -406,7 +406,28 @@ private long purgeDataRows(final Calendar time) {

return dataDeletedCount;
}


private long getMaxDataIdEligibleToPurge(Calendar time) {
long lastBatchId = contextService.getLong(ContextConstants.PURGE_LAST_BATCH_ID);
long maxDataId = 0;
List<Long> batchIds = sqlTemplateDirty.query(getSql("maxBatchIdByChannel"), new LongMapper(),
new Object[] { lastBatchId, new Timestamp(time.getTime().getTime()) },
new int[] { symmetricDialect.getSqlTypeForIds(), Types.TIMESTAMP });

if (batchIds != null && batchIds.size() > 0) {
String sql = getSql("maxDataIdForBatches").replace("?", StringUtils.repeat("?", ",", batchIds.size()));
int[] types = new int[batchIds.size()];
for (int i = 0; i < batchIds.size(); i++) {
types[i] = symmetricDialect.getSqlTypeForIds();
}
List<Long> ids = sqlTemplateDirty.query(sql, new LongMapper(), batchIds.toArray(), types);
if (ids != null && ids.size() > 0) {
maxDataId = ids.get(0);
}
}
return maxDataId;
}

private long purgeStranded(final Calendar time) {
log.info("Getting range for stranded data events");
int maxNumOfDataEventsToPurgeInTx = parameterService.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_EVENT_BATCH_IDS);
Expand Down
Expand Up @@ -56,8 +56,9 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac

putSql("minDataId", "select min(data_id) from $(data)");

putSql("maxDataId", "select max(data_id) from $(data_event) where batch_id in (select max(batch_id) from $(outgoing_batch) " +
"where batch_id > ? and create_time < ? group by channel_id)");
putSql("maxBatchIdByChannel", "select max(batch_id) from $(outgoing_batch) where batch_id > ? and create_time < ? group by channel_id");

putSql("maxDataIdForBatches", "select max(data_id) from $(data_event) where batch_id in (?)");

putSql("selectNodesWithStrandedBatches", "select distinct node_id from $(outgoing_batch) " +
"where node_id not in (select node_id from $(node) where sync_enabled = ?) and status != ?");
Expand Down

0 comments on commit 1903968

Please sign in to comment.