Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
0003692: Purge stranded data and batches after channel is deleted
  • Loading branch information
erilong committed Aug 28, 2018
1 parent 4985860 commit fac0fa3
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
Expand Up @@ -33,6 +33,7 @@
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.mapper.StringMapper;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.ext.IPurgeListener;
Expand Down Expand Up @@ -130,7 +131,8 @@ public long purgeOutgoing(Calendar retentionCutoff, boolean force) {
rowsPurged += purgeDataRows(retentionCutoff);
rowsPurged += purgeOutgoingBatch(retentionCutoff);
rowsPurged += purgeStranded(retentionCutoff);
rowsPurged += purgeExtractRequests();
rowsPurged += purgeExtractRequests();
rowsPurged += purgeStrandedChannels();
}
} finally {
if (!force) {
Expand Down Expand Up @@ -208,9 +210,34 @@ private long purgeStrandedBatches() {
OutgoingBatch.Status.OK.name(), updateStrandedBatchesCount);
statisticManager.incrementPurgedBatchOutgoingRows(updateStrandedBatchesCount);
}

updateStrandedBatchesCount = sqlTemplate.update(getSql("updateStrandedBatchesByChannel"),
OutgoingBatch.Status.OK.name(), OutgoingBatch.Status.OK.name());
if (updateStrandedBatchesCount > 0) {
log.info("Set the status to {} for {} batches that no longer are associated with valid channels",
OutgoingBatch.Status.OK.name(), updateStrandedBatchesCount);
statisticManager.incrementPurgedBatchOutgoingRows(updateStrandedBatchesCount);
}

return updateStrandedBatchesCount;
}

private long purgeStrandedChannels() {
int rowsPurged = 0;
log.info("Looking for old channels in data");
List<String> channels = sqlTemplateDirty.query(getSql("selectOldChannelsForData"), new StringMapper());
if (channels.size() > 0) {
log.info("Found {} old channels", channels.size());
for (String channelId : channels) {
log.info("Purging data for channel {}", channelId);
rowsPurged += sqlTemplate.update(getSql("deleteDataByChannel"), channelId);
}
statisticManager.incrementPurgedDataRows(rowsPurged);
log.info("Done purging {} rows", rowsPurged);
}
return rowsPurged;
}

private long purgeDataRows(final Calendar time) {
log.info("Getting range for data");
long[] minMax = queryForMinMax(getSql("selectDataRangeSql"), new Object[0]);
Expand Down
Expand Up @@ -61,6 +61,10 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac
"update $(outgoing_batch) set status=? where node_id not " +
" in (select node_id from $(node) where sync_enabled=?) and status != ? " );

putSql("updateStrandedBatchesByChannel",
"update $(outgoing_batch) set status=? where channel_id not " +
"in (select channel_id from $(channel)) and status != ?");

putSql("deleteStrandedData" ,
"delete from $(data) where " +
" data_id between ? and ? and " +
Expand Down Expand Up @@ -139,7 +143,11 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac
"where batch_id in (select batch_id from $(outgoing_batch) where status != ?)");

putSql("deleteDataByRangeSql", "delete from $(data) where data_id between ? and ? and create_time < ?");

putSql("selectOldChannelsForData", "select distinct channel_id from $(data) where channel_id not in (select channel_id from $(channel))");

putSql("deleteDataByChannel", "delete from $(data) where channel_id = ?");

}

}

0 comments on commit fac0fa3

Please sign in to comment.