Skip to content

Commit

Permalink
0001325: Add sym_registration_request to the purge process. Also adde…
Browse files Browse the repository at this point in the history
…d extract_request table
  • Loading branch information
chenson42 committed Jul 16, 2013
1 parent bf219e4 commit 4ab9bc3
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 0 deletions.
Expand Up @@ -198,6 +198,8 @@ private ParameterConstants() {
public final static String CLUSTER_LOCK_TIMEOUT_MS = "cluster.lock.timeout.ms";

public final static String PURGE_RETENTION_MINUTES = "purge.retention.minutes";
public final static String PURGE_EXTRACT_REQUESTS_RETENTION_MINUTES = "purge.extract.request.retention.minutes";
public final static String PURGE_REGISTRATION_REQUEST_RETENTION_MINUTES = "purge.registration.request.retention.minutes";
public final static String PURGE_STATS_RETENTION_MINUTES = "purge.stats.retention.minutes";
public final static String PURGE_MAX_NUMBER_OF_DATA_IDS = "job.purge.max.num.data.to.delete.in.tx";
public final static String PURGE_MAX_NUMBER_OF_BATCH_IDS = "job.purge.max.num.batches.to.delete.in.tx";
Expand Down
Expand Up @@ -34,8 +34,10 @@
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.model.DataGap;
import org.jumpmind.symmetric.model.ExtractRequest;
import org.jumpmind.symmetric.model.IncomingBatch;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.RegistrationRequest;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.jumpmind.symmetric.service.IClusterService;
import org.jumpmind.symmetric.service.IParameterService;
Expand Down Expand Up @@ -127,6 +129,7 @@ public long purgeOutgoing(Calendar retentionCutoff, boolean force) {
rowsPurged += purgeStrandedBatches();
rowsPurged += purgeDataRows(retentionCutoff);
rowsPurged += purgeOutgoingBatch(retentionCutoff);
rowsPurged += purgeExtractRequests();
} finally {
if (!force) {
clusterService.unlock(ClusterConstants.PURGE_OUTGOING);
Expand Down Expand Up @@ -195,6 +198,35 @@ public long[] mapRow(Row rs) {
return minMax;
}

private long purgeExtractRequests() {
Calendar retentionCutoff = Calendar.getInstance();
retentionCutoff.add(Calendar.MINUTE, -parameterService
.getInt(ParameterConstants.PURGE_EXTRACT_REQUESTS_RETENTION_MINUTES));
log.info("Purging extract requests that are older than {}", retentionCutoff.getTime());
long count = sqlTemplate.update(getSql("deleteExtractRequestSql"),
ExtractRequest.ExtractStatus.OK.name(), retentionCutoff.getTime());
if (count > 0) {
log.info("Purged {} extract requests", count);
}
return count;

}

private long purgeRegistrationRequests() {
Calendar retentionCutoff = Calendar.getInstance();
retentionCutoff.add(Calendar.MINUTE, -parameterService
.getInt(ParameterConstants.PURGE_REGISTRATION_REQUEST_RETENTION_MINUTES));
log.info("Purging registration requests that are older than {}", retentionCutoff.getTime());
long count = sqlTemplate.update(getSql("deleteRegistrationRequestSql"),
RegistrationRequest.RegistrationStatus.OK.name(),
RegistrationRequest.RegistrationStatus.IG.name(),
RegistrationRequest.RegistrationStatus.RR.name(), retentionCutoff.getTime());
if (count > 0) {
log.info("Purged {} registration requests", count);
}
return count;
}

private int purgeByMinMax(long[] minMax, MinMaxDeleteSql identifier, Date retentionTime,
int maxNumtoPurgeinTx) {
long minId = minMax[0];
Expand Down Expand Up @@ -271,6 +303,7 @@ public long purgeIncoming(Calendar retentionCutoff, boolean force) {
log.info("The incoming purge process is about to run");
purgedRowCount = purgeIncomingBatch(retentionCutoff);
purgedRowCount += purgeIncomingError();
purgedRowCount += purgeRegistrationRequests();
} finally {
if (!force) {
clusterService.unlock(ClusterConstants.PURGE_INCOMING);
Expand Down
Expand Up @@ -30,6 +30,10 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac
super(platform, replacementTokens);

// @formatter:off

putSql("deleteExtractRequestSql", "delete from $(extract_request) where status=? and last_update_time < ?");

putSql("deleteRegistrationRequestSql", "delete from $(registration_request) where status in (?,?,?) and last_update_time < ?");

putSql("selectOutgoingBatchRangeSql" ,
"select min(batch_id) as min_id, max(batch_id) as max_id from $(outgoing_batch) where " +
Expand Down
12 changes: 12 additions & 0 deletions symmetric-core/src/main/resources/symmetric-default.properties
Expand Up @@ -929,6 +929,18 @@ purge.retention.minutes=7200
# Tags: purge
purge.stats.retention.minutes=7200

# This is the retention time for how long a extract request will be retained
#
# DatabaseOverridable: true
# Tags: purge
purge.extract.request.retention.minutes=7200

# This is the retention time for how long a registration request will be retained
#
# DatabaseOverridable: true
# Tags: purge
purge.registration.request.retention.minutes=7200

# If using the HsqlDbDialect, this property indicates whether Symmetric should setup the embedded database properties or if an
# external application will be doing so.
# Tags: other
Expand Down

0 comments on commit 4ab9bc3

Please sign in to comment.