diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java index 9116d6257a..eee051b6d3 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java @@ -331,7 +331,7 @@ protected void init() { this.concurrentConnectionManager = new ConcurrentConnectionManager(parameterService, statisticManager); this.purgeService = new PurgeService(parameterService, symmetricDialect, clusterService, - statisticManager, extensionService); + statisticManager, extensionService, stagingManager); this.transformService = new TransformService(parameterService, symmetricDialect, configurationService, extensionService); this.loadFilterService = new LoadFilterService(parameterService, symmetricDialect, diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java index 51eb6d4dc4..78679952c6 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java @@ -33,9 +33,12 @@ import org.jumpmind.db.platform.DatabaseNamesConstants; import org.jumpmind.db.sql.ISqlRowMapper; import org.jumpmind.db.sql.Row; +import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.ext.IPurgeListener; +import org.jumpmind.symmetric.io.stage.IStagingManager; +import org.jumpmind.symmetric.io.stage.StagingManager; import org.jumpmind.symmetric.model.ExtractRequest; import org.jumpmind.symmetric.model.IncomingBatch; import org.jumpmind.symmetric.model.OutgoingBatch; @@ -62,12 +65,15 @@ enum MinMaxDeleteSql { private IExtensionService extensionService; + private IStagingManager stagingManager; + public PurgeService(IParameterService parameterService, ISymmetricDialect symmetricDialect, - IClusterService clusterService, IStatisticManager statisticManager, IExtensionService extensionService) { + IClusterService clusterService, IStatisticManager statisticManager, IExtensionService extensionService, IStagingManager stagingManager) { super(parameterService, symmetricDialect); this.clusterService = clusterService; this.statisticManager = statisticManager; this.extensionService = extensionService; + this.stagingManager = stagingManager; setSqlMap(new PurgeServiceSqlMap(symmetricDialect.getPlatform(), createSqlReplacementTokens())); @@ -84,6 +90,15 @@ public long purgeOutgoing(boolean force) { for (IPurgeListener purgeListener : purgeListeners) { rowsPurged += purgeListener.purgeOutgoing(force); } + + List remainingBatches = sqlTemplate.query(getSql("getAllOutgoingBatches"), new ISqlRowMapper() { + @Override + public Long mapRow(Row row) { + return row.getLong("batch_id"); + } + }); + long stagingFilesPurged = stagingManager.cleanExcessBatches(remainingBatches, Constants.STAGING_CATEGORY_OUTGOING); + log.info("The outgoing purge process removed " + stagingFilesPurged + " outgoing staging files."); return rowsPurged; } @@ -98,6 +113,16 @@ public long purgeIncoming(boolean force) { for (IPurgeListener purgeListener : purgeListeners) { rowsPurged += purgeListener.purgeIncoming(force); } + + List remainingBatches = sqlTemplate.query(getSql("getAllIncomingBatches"), new ISqlRowMapper() { + @Override + public Long mapRow(Row row) { + return row.getLong("batch_id"); + } + }); + long stagingFilesPurged = stagingManager.cleanExcessBatches(remainingBatches, Constants.STAGING_CATEGORY_INCOMING); + log.info("The incoming purge process removed " + stagingFilesPurged + " incoming staging files."); + return rowsPurged; } @@ -167,6 +192,8 @@ private long purgeOutgoingBatch(final Calendar time) { int outgoingbatchPurgedCount = purgeByMinMax(minMax, minGapStartId, MinMaxDeleteSql.OUTGOING_BATCH, time.getTime(), maxNumOfBatchIdsToPurgeInTx); statisticManager.incrementPurgedBatchOutgoingRows(outgoingbatchPurgedCount); + + return dataEventsPurgedCount + outgoingbatchPurgedCount; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java index c46c24f44f..a6cf67c064 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java @@ -113,6 +113,9 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map replac putSql("deleteDataEventByCreateTimeSql", "delete from sym_data_event where create_time < ?"); putSql("deleteDataByCreateTimeSql", "delete from sym_data where create_time < ?"); putSql("deleteExtractRequestByCreateTimeSql", "delete from sym_extract_request where create_time < ?"); + + putSql("getAllOutgoingBatches", "select batch_id from $(outgoing_batch)"); + putSql("getAllIncomingBatches", "select batch_id from $(incoming_batch)"); } } \ No newline at end of file diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/IStagingManager.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/IStagingManager.java index ac9fdf8c7f..26bf0d4a79 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/IStagingManager.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/IStagingManager.java @@ -21,6 +21,7 @@ package org.jumpmind.symmetric.io.stage; import java.util.Collection; +import java.util.List; public interface IStagingManager { @@ -32,6 +33,8 @@ public interface IStagingManager { public long clean(long timeToLiveInMs); + public long cleanExcessBatches(List currentBatchesList, String type); + public Collection getResourceReferences(); } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java index 2c76f40802..395a1405ed 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java @@ -23,6 +23,7 @@ import java.io.File; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -33,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class StagingManager implements IStagingManager { protected static final Logger log = LoggerFactory.getLogger(StagingManager.class); @@ -145,6 +147,76 @@ public long clean(long ttlInMs) { } } + @Override + public long cleanExcessBatches(List currentBatchesList, String type) { + log.info("Cleaning staging area for inactive batches"); + + Set currentBatches = new HashSet(currentBatchesList); + Set keys = new HashSet(resourceList.keySet()); + String common = "common"; + + long purgedFileCount = 0; + long purgedFileSize = 0; + long purgedMemCount = 0; + long purgedMemSize = 0; + + for (String key : keys) { + IStagedResource resource = resourceList.get(key); + if (resource.getPath().contains(type) || resource.getPath().contains(common)) { + String fileName = key.substring(key.lastIndexOf("/") + 1); + + try { + Long currentFileBatchId = new Long(fileName); + if(!currentBatches.contains(currentFileBatchId)) { + if (!resource.isInUse()) { + long size = resource.getSize(); + if (resource.delete()) { + boolean file = resource.isFileResource(); + if (file) { + purgedFileCount++; + purgedFileSize += size; + } else { + purgedMemCount++; + purgedMemSize += size; + } + resourceList.remove(key); + } else { + log.warn("Failed to delete the '{}' staging resource", + resource.getPath()); + } + } else { + log.info( + "The '{}' staging resource qualified for being cleaned, but was in use. It will not be cleaned right now", + resource.getPath()); + } + } + } + catch (Exception e) { + log.warn("Failed to delete the '{}' staging resource due to unexpected error.", e); + } + } + } + if (purgedFileCount > 0) { + if (purgedFileSize < 1000) { + log.debug("Purged {} staged files, freeing {} bytes of disk space", + purgedFileCount, (int) (purgedFileSize)); + } else { + log.debug("Purged {} staged files, freeing {} kbytes of disk space", + purgedFileCount, (int) (purgedFileSize / 1000)); + } + } + if (purgedMemCount > 0) { + if (purgedMemSize < 1000) { + log.debug("Purged {} staged memory buffers, freeing {} bytes of memory", + purgedMemCount, (int) (purgedMemSize)); + } else { + log.debug("Purged {} staged memory buffers, freeing {} kbytes of memory", + purgedMemCount, (int) (purgedMemSize / 1000)); + } + } + return purgedFileCount + purgedMemCount; + } + /** * Create a handle that can be written to */