Skip to content

Commit

Permalink
Merge branch '3.8' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.8
  • Loading branch information
mmichalek committed Jul 11, 2016
2 parents 0400095 + 8b799ed commit a99713a
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 2 deletions.
Expand Up @@ -322,7 +322,7 @@ protected void init() {
this.concurrentConnectionManager = new ConcurrentConnectionManager(parameterService,
statisticManager);
this.purgeService = new PurgeService(parameterService, symmetricDialect, clusterService,
statisticManager);
statisticManager, extensionService);
this.transformService = new TransformService(parameterService, symmetricDialect,
configurationService, extensionService);
this.loadFilterService = new LoadFilterService(parameterService, symmetricDialect,
Expand Down
@@ -0,0 +1,10 @@
package org.jumpmind.symmetric.ext;

import org.jumpmind.extension.IExtensionPoint;

public interface IPurgeListener extends IExtensionPoint {

public long purgeOutgoing(boolean force);

public long purgeIncoming(boolean force);
}
Expand Up @@ -35,12 +35,14 @@
import org.jumpmind.db.sql.Row;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.ext.IPurgeListener;
import org.jumpmind.symmetric.model.ExtractRequest;
import org.jumpmind.symmetric.model.IncomingBatch;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.RegistrationRequest;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.jumpmind.symmetric.service.IClusterService;
import org.jumpmind.symmetric.service.IExtensionService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IPurgeService;
import org.jumpmind.symmetric.statistic.IStatisticManager;
Expand All @@ -58,11 +60,15 @@ enum MinMaxDeleteSql {

private IStatisticManager statisticManager;

private IExtensionService extensionService;

public PurgeService(IParameterService parameterService, ISymmetricDialect symmetricDialect,
IClusterService clusterService, IStatisticManager statisticManager) {
IClusterService clusterService, IStatisticManager statisticManager, IExtensionService extensionService) {
super(parameterService, symmetricDialect);
this.clusterService = clusterService;
this.statisticManager = statisticManager;
this.extensionService = extensionService;

setSqlMap(new PurgeServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens()));
}
Expand All @@ -73,6 +79,11 @@ public long purgeOutgoing(boolean force) {
retentionCutoff.add(Calendar.MINUTE,
-parameterService.getInt(ParameterConstants.PURGE_RETENTION_MINUTES));
rowsPurged += purgeOutgoing(retentionCutoff, force);

List<IPurgeListener> purgeListeners = extensionService.getExtensionPointList(IPurgeListener.class);
for (IPurgeListener purgeListener : purgeListeners) {
rowsPurged += purgeListener.purgeOutgoing(force);
}
return rowsPurged;
}

Expand All @@ -82,6 +93,11 @@ public long purgeIncoming(boolean force) {
retentionCutoff.add(Calendar.MINUTE,
-parameterService.getInt(ParameterConstants.PURGE_RETENTION_MINUTES));
rowsPurged += purgeIncoming(retentionCutoff, force);

List<IPurgeListener> purgeListeners = extensionService.getExtensionPointList(IPurgeListener.class);
for (IPurgeListener purgeListener : purgeListeners) {
rowsPurged += purgeListener.purgeIncoming(force);
}
return rowsPurged;
}

Expand Down

0 comments on commit a99713a

Please sign in to comment.