Skip to content

Commit

Permalink
0004110: Improve performance of purge stranded data
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Oct 11, 2019
1 parent 30323fb commit bdcd514
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 27 deletions.
Expand Up @@ -294,20 +294,20 @@ private long purgeDataRows(final Calendar time) {
return dataDeletedCount;
}

private long purgeStranded(final Calendar time) {
private long purgeStranded(final Calendar time) {
log.info("Getting range for stranded data events");
int maxNumOfDataEventsToPurgeInTx = parameterService
.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_EVENT_BATCH_IDS);
int maxNumOfDataEventsToPurgeInTx = parameterService.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_EVENT_BATCH_IDS);
long minGapStartId = sqlTemplateDirty.queryForLong(getSql("minDataGapStartId"));
long[] minMaxEvent = queryForMinMax(getSql("selectStrandedDataEventRangeSql"), new Object[] { time.getTime() });
long[] minMaxEvent = queryForMinMax(getSql("selectStrandedDataEventRangeSql"), new Object[0]);
int strandedEventDeletedCount = purgeByMinMax(minMaxEvent, minGapStartId, MinMaxDeleteSql.STRANDED_DATA_EVENT,
time.getTime(), maxNumOfDataEventsToPurgeInTx);
statisticManager.incrementPurgedDataEventRows(strandedEventDeletedCount);

log.info("Getting range for stranded data");
int maxNumOfDataIdsToPurgeInTx = parameterService
.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
long[] minMax = queryForMinMax(getSql("selectDataRangeSql"), new Object[0]);
int maxNumOfDataIdsToPurgeInTx = parameterService.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
long minDataId = sqlTemplateDirty.queryForLong(getSql("selectDataMinSql"));
long minDataEventId = sqlTemplateDirty.queryForLong(getSql("selectDataEventMinSql"));
long[] minMax = new long[] { minDataId, Math.min(minDataEventId, minGapStartId)-1 };
int strandedDeletedCount = purgeByMinMax(minMax, minGapStartId, MinMaxDeleteSql.STRANDED_DATA,
time.getTime(), maxNumOfDataIdsToPurgeInTx);
statisticManager.incrementPurgedDataRows(strandedDeletedCount);
Expand Down Expand Up @@ -395,6 +395,7 @@ private int purgeByMinMax(long[] minMax, long minGapStartId, MinMaxDeleteSql ide
idSqlType, idSqlType, idSqlType, idSqlType, Types.VARCHAR};
break;
case DATA_RANGE:
case STRANDED_DATA:
deleteSql = getSql("deleteDataByRangeSql");
args = new Object[] { minId, maxId, cutoffTime };
argTypes = new int[] { idSqlType, idSqlType, Types.TIMESTAMP };
Expand Down Expand Up @@ -423,15 +424,10 @@ private int purgeByMinMax(long[] minMax, long minGapStartId, MinMaxDeleteSql ide
args = new Object[] { minId, maxId };
argTypes = new int[] { idSqlType, idSqlType };
break;
case STRANDED_DATA:
deleteSql = getSql("deleteStrandedData");
args = new Object[] { minId, maxId, minGapStartId, cutoffTime, minId, maxId };
argTypes = new int[] { idSqlType, idSqlType, idSqlType, Types.TIMESTAMP, idSqlType, idSqlType};
break;
case STRANDED_DATA_EVENT:
deleteSql = getSql("deleteStrandedDataEvent");
args = new Object[] { minId, maxId, cutoffTime, minId, maxId };
argTypes = new int[] { idSqlType, idSqlType, Types.TIMESTAMP, idSqlType, idSqlType };
args = new Object[] { minId, maxId, cutoffTime };
argTypes = new int[] { idSqlType, idSqlType, Types.TIMESTAMP };
break;
}

Expand All @@ -440,11 +436,15 @@ private int purgeByMinMax(long[] minMax, long minGapStartId, MinMaxDeleteSql ide
log.debug("Deleted {} rows", count);
totalCount += count;

if (totalCount > 0
&& (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE * 5)) {
if (count == 0 && (identifier == MinMaxDeleteSql.STRANDED_DATA || identifier == MinMaxDeleteSql.STRANDED_DATA_EVENT)) {
break;
}

if (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE * 5) {
log.info("Purged {} of {} rows so far using {} statements", new Object[] {
totalCount, identifier.toString().toLowerCase(), totalDeleteStmts });
ts = System.currentTimeMillis();
clusterService.refreshLock(ClusterConstants.PURGE_OUTGOING);
}
minId = maxId + 1;
}
Expand Down
Expand Up @@ -67,13 +67,9 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac

putSql("updateStrandedBatchesByChannel", "update $(outgoing_batch) set status=? where channel_id=? and status != ?");

putSql("deleteStrandedData" ,
"delete from $(data) where " +
" data_id between ? and ? and " +
" data_id < ? and " +
" create_time < ? and " +
" data_id not in (select e.data_id from $(data_event) e where " +
" e.data_id between ? and ?) " );
putSql("selectDataEventMinSql", "select min(data_id) from $(data_event)");

putSql("selectDataMinSql", "select min(data_id) from $(data)");

putSql("deleteDataSql" ,
"delete from $(data) where " +
Expand Down Expand Up @@ -122,14 +118,12 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac

putSql("selectStrandedDataEventRangeSql" ,
"select min(batch_id) as min_id, max(batch_id)+1 as max_id from $(data_event) " +
"where create_time < ? " +
"and batch_id < (select min(batch_id) from $(outgoing_batch))");
"where batch_id < (select min(batch_id) from $(outgoing_batch))");

putSql("deleteStrandedDataEvent",
"delete from $(data_event) " +
"where batch_id between ? and ? " +
"and create_time < ? " +
"and batch_id not in (select batch_id from $(outgoing_batch) where batch_id between ? and ?)");
"and create_time < ? ");

putSql("minOutgoingBatchNotStatusSql",
"select min(batch_id) from $(outgoing_batch) where status != ?");
Expand Down

0 comments on commit bdcd514

Please sign in to comment.