Skip to content

Commit

Permalink
0003454: Purge of stranded data_events on PostgreSQL is too slow
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Feb 23, 2018
1 parent a08f071 commit f97048d
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
Expand Up @@ -118,6 +118,7 @@ public long purgeOutgoing(Calendar retentionCutoff, boolean force) {
rowsPurged += purgeStrandedBatches();
rowsPurged += purgeDataRows(retentionCutoff);
rowsPurged += purgeOutgoingBatch(retentionCutoff);
rowsPurged += purgeStranded(retentionCutoff);
rowsPurged += purgeExtractRequests();
}
} finally {
Expand Down Expand Up @@ -195,17 +196,28 @@ private long purgeDataRows(final Calendar time) {
maxNumOfDataIdsToPurgeInTx);
statisticManager.incrementPurgedDataRows(dataDeletedCount);

return dataDeletedCount;
}

private long purgeStranded(final Calendar time) {
log.info("Getting range for stranded data events");
long[] minMaxEvent = queryForMinMax(getSql("selectStrandedDataEventRangeSql"), new Object[0]);
int maxNumOfDataEventsToPurgeInTx = parameterService
.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_EVENT_BATCH_IDS);
long minGapStartId = sqlTemplate.queryForLong(getSql("minDataGapStartId"));
long[] minMaxEvent = queryForMinMax(getSql("selectStrandedDataEventRangeSql"), new Object[] { time.getTime(),
OutgoingBatch.Status.OK.name() });
int strandedEventDeletedCount = purgeByMinMax(minMaxEvent, minGapStartId, MinMaxDeleteSql.STRANDED_DATA_EVENT,
time.getTime(), maxNumOfDataIdsToPurgeInTx);
time.getTime(), maxNumOfDataEventsToPurgeInTx);
statisticManager.incrementPurgedDataEventRows(strandedEventDeletedCount);

log.info("Getting range for stranded data");
int maxNumOfDataIdsToPurgeInTx = parameterService
.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
long[] minMax = queryForMinMax(getSql("selectDataRangeSql"), new Object[0]);
int strandedDeletedCount = purgeByMinMax(minMax, minGapStartId, MinMaxDeleteSql.STRANDED_DATA,
time.getTime(), maxNumOfDataIdsToPurgeInTx);
statisticManager.incrementPurgedDataRows(strandedDeletedCount);
return dataDeletedCount + strandedDeletedCount;

return strandedEventDeletedCount + strandedDeletedCount;
}

private long[] queryForMinMax(String sql, Object... params) {
Expand Down Expand Up @@ -267,7 +279,7 @@ private int purgeByMinMax(long[] minMax, long minGapStartId, MinMaxDeleteSql ide
int totalDeleteStmts = 0;
int idSqlType = symmetricDialect.getSqlTypeForIds();
Timestamp cutoffTime = new Timestamp(retentionTime.getTime());
log.info("About to purge {}", identifier.toString().toLowerCase());
log.info("About to purge {} using range {} through {}", identifier.toString().toLowerCase(), minMax[0], minMax[1]);

while (minId <= purgeUpToId) {
totalDeleteStmts++;
Expand Down Expand Up @@ -309,7 +321,7 @@ private int purgeByMinMax(long[] minMax, long minGapStartId, MinMaxDeleteSql ide
break;
case STRANDED_DATA_EVENT:
deleteSql = getSql("deleteStrandedDataEvent");
args = new Object[] { minId, maxId, cutoffTime };
args = new Object[] { minId, maxId, cutoffTime, minId, maxId };
argTypes = new int[] { idSqlType, idSqlType, Types.TIMESTAMP };
break;
}
Expand Down
Expand Up @@ -115,15 +115,16 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac
putSql("deleteExtractRequestByCreateTimeSql", "delete from $(extract_request) where create_time < ?");

putSql("selectStrandedDataEventRangeSql" ,
"select min(data_id) as min_id, max(data_id) as max_id from $(data_event) " +
"where batch_id not in (select batch_id from $(outgoing_batch))");
"select min(batch_id) as min_id, max(batch_id) as max_id from $(data_event) " +
"where create_time < ? " +
"and batch_id < (select min(batch_id) from $(outgoing_batch) where status != ?)");

putSql("deleteStrandedDataEvent",
"delete from $(data_event) " +
"where data_id between ? and ? " +
"where batch_id between ? and ? " +
"and create_time < ? " +
"and batch_id not in (select batch_id from $(outgoing_batch))");

"and batch_id not in (select batch_id from $(outgoing_batch) where batch_id between ? and ?)");
}

}

0 comments on commit f97048d

Please sign in to comment.