From 0fe37566afec44670b2b1256ef1afa6ff121f69c Mon Sep 17 00:00:00 2001 From: Chris Henson Date: Thu, 1 Dec 2016 14:23:33 -0500 Subject: [PATCH] 0002928: Staging purge process fails to clean registration batch. Purge staging hourly based on absence of batch in batch tables --- .../symmetric/ClientSymmetricEngine.java | 4 +- .../symmetric/AbstractSymmetricEngine.java | 2 +- .../symmetric/io/DbCompareDiffWriter.java | 1 - .../symmetric/io/DbCompareReport.java | 1 - .../symmetric/io/DbValueComparator.java | 4 +- .../io/stage/BatchStagingManager.java | 86 +++++++++++++++++++ .../org/jumpmind/symmetric/model/BatchId.java | 30 +++++++ .../service/IIncomingBatchService.java | 2 + .../service/IOutgoingBatchService.java | 2 + .../service/impl/AbstractService.java | 8 +- .../service/impl/IncomingBatchService.java | 12 ++- .../impl/IncomingBatchServiceSqlMap.java | 2 + .../service/impl/OutgoingBatchService.java | 7 ++ .../impl/OutgoingBatchServiceSqlMap.java | 2 + .../symmetric/service/impl/PurgeService.java | 29 +------ .../service/impl/PurgeServiceSqlMap.java | 2 - .../resources/symmetric-default.properties | 2 +- .../main/java/org/jumpmind/db/sql/Row.java | 14 +++ .../jumpmind/db/sql/mapper/LongMapper.java | 31 +++++++ .../symmetric/io/stage/IStagingManager.java | 3 - .../symmetric/io/stage/StagingManager.java | 71 --------------- 21 files changed, 199 insertions(+), 116 deletions(-) create mode 100644 symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java create mode 100644 symmetric-db/src/main/java/org/jumpmind/db/sql/mapper/LongMapper.java diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java index cfe9895fba..8ab1149e17 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java @@ -53,8 +53,8 @@ import org.jumpmind.symmetric.common.SystemConstants; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.db.JdbcSymmetricDialectFactory; +import org.jumpmind.symmetric.io.stage.BatchStagingManager; import org.jumpmind.symmetric.io.stage.IStagingManager; -import org.jumpmind.symmetric.io.stage.StagingManager; import org.jumpmind.symmetric.job.IJobManager; import org.jumpmind.symmetric.job.JobManager; import org.jumpmind.symmetric.service.IExtensionService; @@ -349,7 +349,7 @@ protected IJobManager createJobManager() { @Override protected IStagingManager createStagingManager() { String directory = parameterService.getTempDirectory(); - return new StagingManager(directory); + return new BatchStagingManager(this, directory); } protected static void waitForAvailableDatabase(DataSource dataSource) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java index 4542d739c9..6ab0e261b2 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java @@ -331,7 +331,7 @@ protected void init() { this.concurrentConnectionManager = new ConcurrentConnectionManager(parameterService, statisticManager); this.purgeService = new PurgeService(parameterService, symmetricDialect, clusterService, - statisticManager, extensionService, stagingManager); + statisticManager, extensionService); this.transformService = new TransformService(parameterService, symmetricDialect, configurationService, extensionService); this.loadFilterService = new LoadFilterService(parameterService, symmetricDialect, diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompareDiffWriter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompareDiffWriter.java index 1a18e087fa..7f69d1b69f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompareDiffWriter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompareDiffWriter.java @@ -47,7 +47,6 @@ public DbCompareDiffWriter(ISymmetricEngine targetEngine, DbCompareTables tables private ISymmetricEngine targetEngine; private DbCompareTables tables; - private String fileName; private OutputStream stream; public void writeDelete(DbCompareRow targetCompareRow) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompareReport.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompareReport.java index ce66f1ccf2..cbfcd4f3c6 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompareReport.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompareReport.java @@ -20,7 +20,6 @@ */ package org.jumpmind.symmetric.io; -import java.io.OutputStream; import java.io.PrintStream; import java.util.ArrayList; import java.util.List; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbValueComparator.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbValueComparator.java index 0bf95efae0..6acb1877ce 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbValueComparator.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbValueComparator.java @@ -34,7 +34,6 @@ import org.jumpmind.db.model.Column; import org.jumpmind.db.model.TypeMap; import org.jumpmind.symmetric.ISymmetricEngine; -import org.jumpmind.util.FormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +59,6 @@ protected void initDateFormats() { dateFormats.add(new SimpleDateFormat("MM-dd-yyyy HH:mm:ss.S")); } - @SuppressWarnings("unchecked") public int compareValues(Column sourceColumn, Column targetColumn, String sourceValue, String targetValue) { if (sourceValue == null && targetValue == null) { @@ -167,7 +165,7 @@ protected int compareDefault(Column sourceColumn, Column targetColumn, Object so } if (sourceValue instanceof Comparable) { - return ((Comparable)sourceValue).compareTo(targetValue); + return ((Comparable)sourceValue).compareTo(targetValue); } else if (sourceValue instanceof String) { return ((String)sourceValue).compareTo((String)targetValue); } else { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java new file mode 100644 index 0000000000..d1e7edf2d8 --- /dev/null +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java @@ -0,0 +1,86 @@ +package org.jumpmind.symmetric.io.stage; + +import static org.jumpmind.symmetric.common.Constants.STAGING_CATEGORY_INCOMING; +import static org.jumpmind.symmetric.common.Constants.STAGING_CATEGORY_OUTGOING; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.jumpmind.symmetric.ISymmetricEngine; +import org.jumpmind.symmetric.model.BatchId; + +public class BatchStagingManager extends StagingManager { + + ISymmetricEngine engine; + + public BatchStagingManager(ISymmetricEngine engine, String directory) { + super(directory); + this.engine = engine; + } + + @Override + public long clean(long ttlInMs) { + boolean recordIncomingBatchesEnabled = engine.getIncomingBatchService().isRecordOkBatchesEnabled(); + List outgoingBatches = engine.getOutgoingBatchService().getAllBatches(); + List incomingBatches = engine.getIncomingBatchService().getAllBatches(); + synchronized (StagingManager.class) { + log.trace("Purging staging area"); + Set keys = new HashSet(resourceList.keySet()); + long purgedFileCount = 0; + long purgedFileSize = 0; + for (String key : keys) { + IStagedResource resource = resourceList.get(key); + String[] path = key.split("/"); + /* + * resource could have deleted itself between the time the keys + * were cloned and now + */ + if (resource != null) { + boolean resourceIsOld = (System.currentTimeMillis() - resource.getLastUpdateTime()) > ttlInMs; + if (path[0].equals(STAGING_CATEGORY_OUTGOING)) { + try { + Long batchId = new Long(path[path.length - 1]); + if (!outgoingBatches.contains(batchId) || ttlInMs == 0) { + purgedFileCount++; + purgedFileSize+=resource.getSize(); + resource.delete(); + } + } catch (NumberFormatException e) { + if (resourceIsOld || ttlInMs == 0) { + purgedFileCount++; + purgedFileSize+=resource.getSize(); + resource.delete(); + } + } + } else if (path[0].equals(STAGING_CATEGORY_INCOMING)) { + try { + BatchId batchId = new BatchId(new Long(path[path.length - 1]), path[1]); + if ((recordIncomingBatchesEnabled && !incomingBatches.contains(batchId)) + || (!recordIncomingBatchesEnabled && resourceIsOld) || ttlInMs == 0) { + purgedFileCount++; + purgedFileSize+=resource.getSize(); + resource.delete(); + } + } catch (NumberFormatException e) { + if (resourceIsOld || ttlInMs == 0) { + purgedFileCount++; + purgedFileSize+=resource.getSize(); + resource.delete(); + } + } + } + } + } + if (purgedFileCount > 0) { + if (purgedFileSize < 1000) { + log.info("Purged {} from stage, freeing {} bytes of space", purgedFileCount, (int) (purgedFileSize)); + } else { + log.info("Purged {} from stage, freeing {} kbytes of space", purgedFileCount, (int) (purgedFileSize / 1000)); + } + } + return purgedFileCount; + } + } + +} diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/BatchId.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/BatchId.java index 5a7642c48f..a00131874b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/BatchId.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/BatchId.java @@ -59,4 +59,34 @@ public String toString() { return nodeId + "-" + batchId; } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) (batchId ^ (batchId >>> 32)); + result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + BatchId other = (BatchId) obj; + if (batchId != other.batchId) + return false; + if (nodeId == null) { + if (other.nodeId != null) + return false; + } else if (!nodeId.equals(other.nodeId)) + return false; + return true; + } + + + } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java index d1a1ffc4d5..515c4ebd3c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java @@ -74,4 +74,6 @@ public List listIncomingBatches(List nodeIds, List findMaxBatchIdsByChannel(); + public List getAllBatches(); + } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java index d9b9d53e69..6533634bcb 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java @@ -112,5 +112,7 @@ public List listOutgoingBatches(List nodeIds, List> getLoadStatusSummarySql(long loadId); public void copyOutgoingBatches(String channelId, long startBatchId, String fromNodeId, String toNodeId); + + public List getAllBatches(); } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java index 12e73146eb..cfe221d91e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java @@ -213,26 +213,26 @@ protected String buildBatchWhere(List nodeIds, List channels, StringBuilder where = new StringBuilder(); boolean needsAnd = false; - if (nodeIds.size() > 0) { + if (nodeIds != null && nodeIds.size() > 0) { where.append("node_id in (:NODES)"); needsAnd = true; } - if (channels.size() > 0) { + if (channels != null && channels.size() > 0) { if (needsAnd) { where.append(" and "); } where.append("channel_id in (:CHANNELS)"); needsAnd = true; } - if (loads.size() > 0) { + if (loads != null && loads.size() > 0) { if (needsAnd) { where.append(" and "); } where.append("load_id in (:LOADS)"); needsAnd = true; } - if (statuses.size() > 0) { + if (statuses != null && statuses.size() > 0) { if (needsAnd) { where.append(" and "); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java index 72587cbf31..f063d04953 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java @@ -362,9 +362,17 @@ public Map findMaxBatchIdsByChannel() { IncomingBatch.Status.OK.name()); return ids; } + + @Override + public List getAllBatches() { + return sqlTemplateDirty.query(getSql("getAllBatchesSql"), new BatchIdMapper()); + } class BatchIdMapper implements ISqlRowMapper { Map ids; + + public BatchIdMapper() { + } public BatchIdMapper(Map ids) { this.ids = ids; @@ -374,7 +382,9 @@ public BatchId mapRow(Row rs) { BatchId batch = new BatchId(); batch.setBatchId(rs.getLong("batch_id")); batch.setNodeId(rs.getString("node_id")); - ids.put(rs.getString("channel_id"), batch); + if (ids != null) { + ids.put(rs.getString("channel_id"), batch); + } return batch; } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchServiceSqlMap.java index fdf46185c6..40f2fe0769 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchServiceSqlMap.java @@ -75,6 +75,8 @@ public IncomingBatchServiceSqlMap(IDatabasePlatform platform, Map getAllBatches() { + return sqlTemplateDirty.query(getSql("getAllBatchesSql"), new LongMapper()); + } + class OutgoingBatchSummaryMapper implements ISqlRowMapper { public OutgoingBatchSummary mapRow(Row rs) { OutgoingBatchSummary summary = new OutgoingBatchSummary(); @@ -862,5 +868,6 @@ public OutgoingBatch mapRow(Row rs) { } } } + } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java index 90c5d50476..3897b9774e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java @@ -174,6 +174,8 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, + " (select batch_id, ?, channel_id, 'NE', load_id, extract_job_flag, load_flag, common_flag, reload_event_count, other_event_count, " + " last_update_hostname, current_timestamp, create_time, 'copy' from $(outgoing_batch) where node_id=? and channel_id=? and batch_id > ?) "); + + putSql("getAllBatchesSql", "select batch_id from $(outgoing_batch)"); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java index adb01b8a88..bad28ddbd8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java @@ -33,11 +33,9 @@ import org.jumpmind.db.platform.DatabaseNamesConstants; import org.jumpmind.db.sql.ISqlRowMapper; import org.jumpmind.db.sql.Row; -import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.ext.IPurgeListener; -import org.jumpmind.symmetric.io.stage.IStagingManager; import org.jumpmind.symmetric.model.ExtractRequest; import org.jumpmind.symmetric.model.IncomingBatch; import org.jumpmind.symmetric.model.OutgoingBatch; @@ -64,15 +62,12 @@ enum MinMaxDeleteSql { private IExtensionService extensionService; - private IStagingManager stagingManager; - public PurgeService(IParameterService parameterService, ISymmetricDialect symmetricDialect, - IClusterService clusterService, IStatisticManager statisticManager, IExtensionService extensionService, IStagingManager stagingManager) { + IClusterService clusterService, IStatisticManager statisticManager, IExtensionService extensionService) { super(parameterService, symmetricDialect); this.clusterService = clusterService; this.statisticManager = statisticManager; this.extensionService = extensionService; - this.stagingManager = stagingManager; setSqlMap(new PurgeServiceSqlMap(symmetricDialect.getPlatform(), createSqlReplacementTokens())); @@ -88,16 +83,7 @@ public long purgeOutgoing(boolean force) { List purgeListeners = extensionService.getExtensionPointList(IPurgeListener.class); for (IPurgeListener purgeListener : purgeListeners) { rowsPurged += purgeListener.purgeOutgoing(force); - } - - List remainingBatches = sqlTemplate.query(getSql("getAllOutgoingBatches"), new ISqlRowMapper() { - @Override - public Long mapRow(Row row) { - return row.getLong("batch_id"); - } - }); - long stagingFilesPurged = stagingManager.cleanExcessBatches(remainingBatches, Constants.STAGING_CATEGORY_OUTGOING); - log.info("The outgoing purge process removed " + stagingFilesPurged + " outgoing staging files."); + } return rowsPurged; } @@ -112,16 +98,7 @@ public long purgeIncoming(boolean force) { for (IPurgeListener purgeListener : purgeListeners) { rowsPurged += purgeListener.purgeIncoming(force); } - - List remainingBatches = sqlTemplate.query(getSql("getAllIncomingBatches"), new ISqlRowMapper() { - @Override - public Long mapRow(Row row) { - return row.getLong("batch_id"); - } - }); - long stagingFilesPurged = stagingManager.cleanExcessBatches(remainingBatches, Constants.STAGING_CATEGORY_INCOMING); - log.info("The incoming purge process removed " + stagingFilesPurged + " incoming staging files."); - + return rowsPurged; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java index a6cf67c064..d1f1e0b4b0 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java @@ -114,8 +114,6 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map replac putSql("deleteDataByCreateTimeSql", "delete from sym_data where create_time < ?"); putSql("deleteExtractRequestByCreateTimeSql", "delete from sym_extract_request where create_time < ?"); - putSql("getAllOutgoingBatches", "select batch_id from $(outgoing_batch)"); - putSql("getAllIncomingBatches", "select batch_id from $(incoming_batch)"); } } \ No newline at end of file diff --git a/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric-core/src/main/resources/symmetric-default.properties index e347bb0cac..b111794e1b 100644 --- a/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric-core/src/main/resources/symmetric-default.properties @@ -780,7 +780,7 @@ job.refresh.cache.cron=0/30 * * * * * # # DatabaseOverridable: true # Tags: jobs -job.stage.management.period.time.ms=900000 +job.stage.management.cron=0 0 * * * * # This is how often the initial load extract queue job will run in the background # diff --git a/symmetric-db/src/main/java/org/jumpmind/db/sql/Row.java b/symmetric-db/src/main/java/org/jumpmind/db/sql/Row.java index 1958c8995c..33f456d15f 100644 --- a/symmetric-db/src/main/java/org/jumpmind/db/sql/Row.java +++ b/symmetric-db/src/main/java/org/jumpmind/db/sql/Row.java @@ -102,6 +102,20 @@ public Date dateValue() { return null; } } + + public Long longValue() { + Object obj = this.values().iterator().next(); + if (obj != null) { + if (obj instanceof Long) { + return (Long)obj; + } else { + return new Long(obj.toString()); + } + } else { + return null; + } + } + public String stringValue() { Object obj = this.values().iterator().next(); diff --git a/symmetric-db/src/main/java/org/jumpmind/db/sql/mapper/LongMapper.java b/symmetric-db/src/main/java/org/jumpmind/db/sql/mapper/LongMapper.java new file mode 100644 index 0000000000..cd968e20c3 --- /dev/null +++ b/symmetric-db/src/main/java/org/jumpmind/db/sql/mapper/LongMapper.java @@ -0,0 +1,31 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.db.sql.mapper; + +import org.jumpmind.db.sql.ISqlRowMapper; +import org.jumpmind.db.sql.Row; + +public class LongMapper implements ISqlRowMapper { + + public Long mapRow(Row row) { + return row.longValue(); + } +} diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/IStagingManager.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/IStagingManager.java index 26bf0d4a79..ac9fdf8c7f 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/IStagingManager.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/IStagingManager.java @@ -21,7 +21,6 @@ package org.jumpmind.symmetric.io.stage; import java.util.Collection; -import java.util.List; public interface IStagingManager { @@ -33,8 +32,6 @@ public interface IStagingManager { public long clean(long timeToLiveInMs); - public long cleanExcessBatches(List currentBatchesList, String type); - public Collection getResourceReferences(); } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java index 395a1405ed..ca59fd7941 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java @@ -23,7 +23,6 @@ import java.io.File; import java.util.Collection; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -146,76 +145,6 @@ public long clean(long ttlInMs) { return purgedFileCount + purgedMemCount; } } - - @Override - public long cleanExcessBatches(List currentBatchesList, String type) { - log.info("Cleaning staging area for inactive batches"); - - Set currentBatches = new HashSet(currentBatchesList); - Set keys = new HashSet(resourceList.keySet()); - String common = "common"; - - long purgedFileCount = 0; - long purgedFileSize = 0; - long purgedMemCount = 0; - long purgedMemSize = 0; - - for (String key : keys) { - IStagedResource resource = resourceList.get(key); - if (resource.getPath().contains(type) || resource.getPath().contains(common)) { - String fileName = key.substring(key.lastIndexOf("/") + 1); - - try { - Long currentFileBatchId = new Long(fileName); - if(!currentBatches.contains(currentFileBatchId)) { - if (!resource.isInUse()) { - long size = resource.getSize(); - if (resource.delete()) { - boolean file = resource.isFileResource(); - if (file) { - purgedFileCount++; - purgedFileSize += size; - } else { - purgedMemCount++; - purgedMemSize += size; - } - resourceList.remove(key); - } else { - log.warn("Failed to delete the '{}' staging resource", - resource.getPath()); - } - } else { - log.info( - "The '{}' staging resource qualified for being cleaned, but was in use. It will not be cleaned right now", - resource.getPath()); - } - } - } - catch (Exception e) { - log.warn("Failed to delete the '{}' staging resource due to unexpected error.", e); - } - } - } - if (purgedFileCount > 0) { - if (purgedFileSize < 1000) { - log.debug("Purged {} staged files, freeing {} bytes of disk space", - purgedFileCount, (int) (purgedFileSize)); - } else { - log.debug("Purged {} staged files, freeing {} kbytes of disk space", - purgedFileCount, (int) (purgedFileSize / 1000)); - } - } - if (purgedMemCount > 0) { - if (purgedMemSize < 1000) { - log.debug("Purged {} staged memory buffers, freeing {} bytes of memory", - purgedMemCount, (int) (purgedMemSize)); - } else { - log.debug("Purged {} staged memory buffers, freeing {} kbytes of memory", - purgedMemCount, (int) (purgedMemSize / 1000)); - } - } - return purgedFileCount + purgedMemCount; - } /** * Create a handle that can be written to