Skip to content

Commit

Permalink
0003514: Improve purge performance of sym_data and sym_data_event
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Apr 9, 2018
1 parent bccdeea commit 6dc7ee2
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 10 deletions.
Expand Up @@ -297,7 +297,9 @@ private ParameterConstants() {
public final static String PURGE_MAX_NUMBER_OF_DATA_IDS = "job.purge.max.num.data.to.delete.in.tx";
public final static String PURGE_MAX_NUMBER_OF_BATCH_IDS = "job.purge.max.num.batches.to.delete.in.tx";
public final static String PURGE_MAX_NUMBER_OF_EVENT_BATCH_IDS = "job.purge.max.num.data.event.batches.to.delete.in.tx";

public final static String PURGE_FIRST_PASS = "job.purge.first.pass";
public final static String PURGE_FIRST_PASS_OUTSTANDING_BATCHES_THRESHOLD = "job.purge.first.pass.outstanding.batches.threshold";

public final static String JMX_LINE_FEED = "jmx.line.feed";

public final static String IP_FILTERS = "ip.filters";
Expand Down
Expand Up @@ -53,7 +53,7 @@
public class PurgeService extends AbstractService implements IPurgeService {

enum MinMaxDeleteSql {
DATA, DATA_EVENT, OUTGOING_BATCH, STRANDED_DATA, STRANDED_DATA_EVENT
DATA, DATA_RANGE, DATA_EVENT, DATA_EVENT_RANGE, OUTGOING_BATCH, STRANDED_DATA, STRANDED_DATA_EVENT
};

private IClusterService clusterService;
Expand Down Expand Up @@ -163,15 +163,30 @@ private long purgeOutgoingBatch(final Calendar time) {
int maxNumOfBatchIdsToPurgeInTx = parameterService
.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_BATCH_IDS);
int maxNumOfDataEventsToPurgeInTx = parameterService
.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_EVENT_BATCH_IDS);
int dataEventsPurgedCount = purgeByMinMax(minMax, minGapStartId, MinMaxDeleteSql.DATA_EVENT,
.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_EVENT_BATCH_IDS);
int dataEventsPurgedCount = 0;

if (parameterService.is(ParameterConstants.PURGE_FIRST_PASS)) {
log.info("Getting first batch_id for outstanding batches");
long minBatchId = sqlTemplateDirty.queryForLong(getSql("minOutgoingBatchNotStatusSql"), OutgoingBatch.Status.OK.name());
long rangeMinMax[] = new long[] { minMax[0], Math.min(minBatchId > 0 ? minBatchId - 1 : minMax[1], minMax[1]) };
if (rangeMinMax[1] == minMax[1]) {
minMax[1] = -1;
} else {
minMax[0] = minBatchId + 1;
}
dataEventsPurgedCount = purgeByMinMax(rangeMinMax, minGapStartId, MinMaxDeleteSql.DATA_EVENT_RANGE,
time.getTime(), maxNumOfDataEventsToPurgeInTx);
}

dataEventsPurgedCount += purgeByMinMax(minMax, minGapStartId, MinMaxDeleteSql.DATA_EVENT,
time.getTime(), maxNumOfDataEventsToPurgeInTx);
statisticManager.incrementPurgedDataEventRows(dataEventsPurgedCount);

int outgoingbatchPurgedCount = purgeByMinMax(minMax, minGapStartId, MinMaxDeleteSql.OUTGOING_BATCH,
time.getTime(), maxNumOfBatchIdsToPurgeInTx);
statisticManager.incrementPurgedBatchOutgoingRows(outgoingbatchPurgedCount);


return dataEventsPurgedCount + outgoingbatchPurgedCount;
}

Expand All @@ -191,12 +206,41 @@ private long purgeDataRows(final Calendar time) {
log.info("Getting range for data");
long[] minMax = queryForMinMax(getSql("selectDataRangeSql"), new Object[0]);
long minGapStartId = sqlTemplateDirty.queryForLong(getSql("minDataGapStartId"));
int maxNumOfDataIdsToPurgeInTx = parameterService
.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
int dataDeletedCount = purgeByMinMax(minMax, minGapStartId, MinMaxDeleteSql.DATA, time.getTime(),
int maxNumOfDataIdsToPurgeInTx = parameterService.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
long dataDeletedCount = 0;

if (parameterService.is(ParameterConstants.PURGE_FIRST_PASS)) {
log.info("Getting count of outstanding batches");
long outstandingCount = sqlTemplateDirty.queryForLong(getSql("countOutgoingBatchNotStatusSql"),
OutgoingBatch.Status.OK.name());
long maxOutstandingCount = parameterService.getLong(ParameterConstants.PURGE_FIRST_PASS_OUTSTANDING_BATCHES_THRESHOLD);
log.info("Found "+ outstandingCount + " outstanding batches, threshold is " + maxOutstandingCount);

if (outstandingCount <= maxOutstandingCount) {
long minDataId = 0;
if (outstandingCount > 0) {
log.info("Getting first data_id for outstanding batches");
minDataId = sqlTemplateDirty.queryForLong(getSql("selectDataEventMinNotStatusSql"),
OutgoingBatch.Status.OK.name());
}
long rangeMinMax[] = new long[] { minMax[0], Math.min(Math.min(minDataId > 0 ? minDataId - 1 : minMax[1],
minMax[1]), minGapStartId - 1) };
if (rangeMinMax[1] == minMax[1]) {
minMax[1] = -1;
} else if (rangeMinMax[1] == minDataId - 1) {
minMax[0] = minDataId + 1;
} else if (rangeMinMax[1] == minGapStartId - 1) {
minMax[0] = minGapStartId + 1;
}
dataDeletedCount = purgeByMinMax(rangeMinMax, minGapStartId, MinMaxDeleteSql.DATA_RANGE,
time.getTime(), maxNumOfDataIdsToPurgeInTx);
}
}

dataDeletedCount += purgeByMinMax(minMax, minGapStartId, MinMaxDeleteSql.DATA, time.getTime(),
maxNumOfDataIdsToPurgeInTx);
statisticManager.incrementPurgedDataRows(dataDeletedCount);

return dataDeletedCount;
}

Expand Down Expand Up @@ -300,12 +344,22 @@ private int purgeByMinMax(long[] minMax, long minGapStartId, MinMaxDeleteSql ide
argTypes = new int[] { idSqlType, idSqlType, Types.TIMESTAMP,
idSqlType, idSqlType, idSqlType, idSqlType, Types.VARCHAR};
break;
case DATA_RANGE:
deleteSql = getSql("deleteDataByRangeSql");
args = new Object[] { minId, maxId };
argTypes = new int[] { idSqlType, idSqlType };
break;
case DATA_EVENT:
deleteSql = getSql("deleteDataEventSql");
args = new Object[] { minId, maxId, OutgoingBatch.Status.OK.name(), minId,
maxId };
argTypes = new int[] { idSqlType, idSqlType, Types.VARCHAR, idSqlType, idSqlType};

break;
case DATA_EVENT_RANGE:
deleteSql = getSql("deleteDataEventByRangeSql");
args = new Object[] { minId, maxId };
argTypes = new int[] { idSqlType, idSqlType };
break;
case OUTGOING_BATCH:
deleteSql = getSql("deleteOutgoingBatchSql");
Expand Down
Expand Up @@ -124,7 +124,20 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac
"where batch_id between ? and ? " +
"and create_time < ? " +
"and batch_id not in (select batch_id from $(outgoing_batch) where batch_id between ? and ?)");


putSql("minOutgoingBatchNotStatusSql",
"select min(batch_id) from $(outgoing_batch) where status != ?");

putSql("deleteDataEventByRangeSql", "delete from $(data_event) where batch_id between ? and ?");

putSql("countOutgoingBatchNotStatusSql",
"select count(*) count from $(outgoing_batch) where status != ?");

putSql("selectDataEventMinNotStatusSql", "select min(data_id) from $(data_event) " +
"where batch_id in (select batch_id from $(outgoing_batch) where status != ?)");

putSql("deleteDataByRangeSql", "delete from $(data) where data_id between ? and ?");

}

}
16 changes: 16 additions & 0 deletions symmetric-core/src/main/resources/symmetric-default.properties
Expand Up @@ -914,6 +914,22 @@ job.purge.max.num.batches.to.delete.in.tx=5000
# Tags: purge
job.purge.max.num.data.to.delete.in.tx=5000

# Enables a first pass purge for sym_data and sym_data_event that quickly purges
# the beginning of the table that precedes outstanding batches.
# These delete statements don't use joins, so they run quicker.
#
# DatabaseOverridable: true
# Tags: purge
job.purge.first.pass=false

# The maximum number of outstanding batches allowed for running the
# first pass purge. If there are too many outstanding batches, it will
# take too long to find their first data_id, so it shouldn't be run.
#
# DatabaseOverridable: true
# Tags: purge
job.purge.first.pass.outstanding.batches.threshold=1000

# Whether the refresh cache job is enabled for this node.
#
# Tags: jobs
Expand Down

0 comments on commit 6dc7ee2

Please sign in to comment.