Skip to content

Commit

Permalink
0003514: add outgoing batch first pass, honor retention time on sym_data
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Aug 9, 2018
1 parent 4e6e75e commit 2ad90c1
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
Expand Up @@ -53,7 +53,7 @@
public class PurgeService extends AbstractService implements IPurgeService {

enum MinMaxDeleteSql {
DATA, DATA_RANGE, DATA_EVENT, DATA_EVENT_RANGE, OUTGOING_BATCH, STRANDED_DATA, STRANDED_DATA_EVENT
DATA, DATA_RANGE, DATA_EVENT, DATA_EVENT_RANGE, OUTGOING_BATCH, OUTGOING_BATCH_RANGE, STRANDED_DATA, STRANDED_DATA_EVENT
};

private IClusterService clusterService;
Expand Down Expand Up @@ -171,6 +171,7 @@ private long purgeOutgoingBatch(final Calendar time) {
int maxNumOfDataEventsToPurgeInTx = parameterService
.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_EVENT_BATCH_IDS);
int dataEventsPurgedCount = 0;
int outgoingbatchPurgedCount = 0;

if (parameterService.is(ParameterConstants.PURGE_FIRST_PASS)) {
log.info("Getting first batch_id for outstanding batches");
Expand All @@ -183,13 +184,15 @@ private long purgeOutgoingBatch(final Calendar time) {
}
dataEventsPurgedCount = purgeByMinMax(rangeMinMax, minGapStartId, MinMaxDeleteSql.DATA_EVENT_RANGE,
time.getTime(), maxNumOfDataEventsToPurgeInTx);
outgoingbatchPurgedCount = purgeByMinMax(rangeMinMax, minGapStartId, MinMaxDeleteSql.OUTGOING_BATCH_RANGE,
time.getTime(), maxNumOfBatchIdsToPurgeInTx);
}

dataEventsPurgedCount += purgeByMinMax(minMax, minGapStartId, MinMaxDeleteSql.DATA_EVENT,
time.getTime(), maxNumOfDataEventsToPurgeInTx);
statisticManager.incrementPurgedDataEventRows(dataEventsPurgedCount);

int outgoingbatchPurgedCount = purgeByMinMax(minMax, minGapStartId, MinMaxDeleteSql.OUTGOING_BATCH,
outgoingbatchPurgedCount += purgeByMinMax(minMax, minGapStartId, MinMaxDeleteSql.OUTGOING_BATCH,
time.getTime(), maxNumOfBatchIdsToPurgeInTx);
statisticManager.incrementPurgedBatchOutgoingRows(outgoingbatchPurgedCount);

Expand Down Expand Up @@ -352,8 +355,8 @@ private int purgeByMinMax(long[] minMax, long minGapStartId, MinMaxDeleteSql ide
break;
case DATA_RANGE:
deleteSql = getSql("deleteDataByRangeSql");
args = new Object[] { minId, maxId };
argTypes = new int[] { idSqlType, idSqlType };
args = new Object[] { minId, maxId, cutoffTime };
argTypes = new int[] { idSqlType, idSqlType, Types.TIMESTAMP };
break;
case DATA_EVENT:
deleteSql = getSql("deleteDataEventSql");
Expand All @@ -373,6 +376,11 @@ private int purgeByMinMax(long[] minMax, long minGapStartId, MinMaxDeleteSql ide
maxId };
argTypes = new int[] {Types.VARCHAR, idSqlType, idSqlType, idSqlType, idSqlType};

break;
case OUTGOING_BATCH_RANGE:
deleteSql = getSql("deleteOutgoingBatchByRangeSql");
args = new Object[] { minId, maxId };
argTypes = new int[] { idSqlType, idSqlType };
break;
case STRANDED_DATA:
deleteSql = getSql("deleteStrandedData");
Expand Down
Expand Up @@ -130,13 +130,15 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac

putSql("deleteDataEventByRangeSql", "delete from $(data_event) where batch_id between ? and ?");

putSql("deleteOutgoingBatchByRangeSql", "delete from $(outgoing_batch) where batch_id between ? and ?");

putSql("countOutgoingBatchNotStatusSql",
"select count(*) count from $(outgoing_batch) where status != ?");

putSql("selectDataEventMinNotStatusSql", "select min(data_id) from $(data_event) " +
"where batch_id in (select batch_id from $(outgoing_batch) where status != ?)");

putSql("deleteDataByRangeSql", "delete from $(data) where data_id between ? and ?");
putSql("deleteDataByRangeSql", "delete from $(data) where data_id between ? and ? and create_time < ?");

}

Expand Down

0 comments on commit 2ad90c1

Please sign in to comment.