Skip to content

Commit

Permalink
0002888: Remove any staging files that do not associate with a batch in
Browse files Browse the repository at this point in the history
the incoming or outgoing batch table when purged.
  • Loading branch information
jumpmind-josh committed Nov 3, 2016
1 parent 2c7e634 commit 5544de8
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 2 deletions.
Expand Up @@ -331,7 +331,7 @@ protected void init() {
this.concurrentConnectionManager = new ConcurrentConnectionManager(parameterService,
statisticManager);
this.purgeService = new PurgeService(parameterService, symmetricDialect, clusterService,
statisticManager, extensionService);
statisticManager, extensionService, stagingManager);
this.transformService = new TransformService(parameterService, symmetricDialect,
configurationService, extensionService);
this.loadFilterService = new LoadFilterService(parameterService, symmetricDialect,
Expand Down
Expand Up @@ -33,9 +33,12 @@
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.Row;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.ext.IPurgeListener;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.io.stage.StagingManager;
import org.jumpmind.symmetric.model.ExtractRequest;
import org.jumpmind.symmetric.model.IncomingBatch;
import org.jumpmind.symmetric.model.OutgoingBatch;
Expand All @@ -62,12 +65,15 @@ enum MinMaxDeleteSql {

private IExtensionService extensionService;

private IStagingManager stagingManager;

public PurgeService(IParameterService parameterService, ISymmetricDialect symmetricDialect,
IClusterService clusterService, IStatisticManager statisticManager, IExtensionService extensionService) {
IClusterService clusterService, IStatisticManager statisticManager, IExtensionService extensionService, IStagingManager stagingManager) {
super(parameterService, symmetricDialect);
this.clusterService = clusterService;
this.statisticManager = statisticManager;
this.extensionService = extensionService;
this.stagingManager = stagingManager;

setSqlMap(new PurgeServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens()));
Expand All @@ -84,6 +90,15 @@ public long purgeOutgoing(boolean force) {
for (IPurgeListener purgeListener : purgeListeners) {
rowsPurged += purgeListener.purgeOutgoing(force);
}

List<Long> remainingBatches = sqlTemplate.query(getSql("getAllOutgoingBatches"), new ISqlRowMapper<Long>() {
@Override
public Long mapRow(Row row) {
return row.getLong("batch_id");
}
});
long stagingFilesPurged = stagingManager.cleanExcessBatches(remainingBatches, Constants.STAGING_CATEGORY_OUTGOING);
log.info("The outgoing purge process removed " + stagingFilesPurged + " outgoing staging files.");
return rowsPurged;
}

Expand All @@ -98,6 +113,16 @@ public long purgeIncoming(boolean force) {
for (IPurgeListener purgeListener : purgeListeners) {
rowsPurged += purgeListener.purgeIncoming(force);
}

List<Long> remainingBatches = sqlTemplate.query(getSql("getAllIncomingBatches"), new ISqlRowMapper<Long>() {
@Override
public Long mapRow(Row row) {
return row.getLong("batch_id");
}
});
long stagingFilesPurged = stagingManager.cleanExcessBatches(remainingBatches, Constants.STAGING_CATEGORY_INCOMING);
log.info("The incoming purge process removed " + stagingFilesPurged + " incoming staging files.");

return rowsPurged;
}

Expand Down Expand Up @@ -167,6 +192,8 @@ private long purgeOutgoingBatch(final Calendar time) {
int outgoingbatchPurgedCount = purgeByMinMax(minMax, minGapStartId, MinMaxDeleteSql.OUTGOING_BATCH,
time.getTime(), maxNumOfBatchIdsToPurgeInTx);
statisticManager.incrementPurgedBatchOutgoingRows(outgoingbatchPurgedCount);


return dataEventsPurgedCount + outgoingbatchPurgedCount;
}

Expand Down
Expand Up @@ -113,6 +113,9 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac
putSql("deleteDataEventByCreateTimeSql", "delete from sym_data_event where create_time < ?");
putSql("deleteDataByCreateTimeSql", "delete from sym_data where create_time < ?");
putSql("deleteExtractRequestByCreateTimeSql", "delete from sym_extract_request where create_time < ?");

putSql("getAllOutgoingBatches", "select batch_id from $(outgoing_batch)");
putSql("getAllIncomingBatches", "select batch_id from $(incoming_batch)");
}

}
Expand Up @@ -21,6 +21,7 @@
package org.jumpmind.symmetric.io.stage;

import java.util.Collection;
import java.util.List;

public interface IStagingManager {

Expand All @@ -32,6 +33,8 @@ public interface IStagingManager {

public long clean(long timeToLiveInMs);

public long cleanExcessBatches(List<Long> currentBatchesList, String type);

public Collection<String> getResourceReferences();

}
Expand Up @@ -23,6 +23,7 @@
import java.io.File;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -33,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class StagingManager implements IStagingManager {

protected static final Logger log = LoggerFactory.getLogger(StagingManager.class);
Expand Down Expand Up @@ -145,6 +147,76 @@ public long clean(long ttlInMs) {
}
}

@Override
public long cleanExcessBatches(List<Long> currentBatchesList, String type) {
log.info("Cleaning staging area for inactive batches");

Set<Long> currentBatches = new HashSet(currentBatchesList);
Set<String> keys = new HashSet<String>(resourceList.keySet());
String common = "common";

long purgedFileCount = 0;
long purgedFileSize = 0;
long purgedMemCount = 0;
long purgedMemSize = 0;

for (String key : keys) {
IStagedResource resource = resourceList.get(key);
if (resource.getPath().contains(type) || resource.getPath().contains(common)) {
String fileName = key.substring(key.lastIndexOf("/") + 1);

try {
Long currentFileBatchId = new Long(fileName);
if(!currentBatches.contains(currentFileBatchId)) {
if (!resource.isInUse()) {
long size = resource.getSize();
if (resource.delete()) {
boolean file = resource.isFileResource();
if (file) {
purgedFileCount++;
purgedFileSize += size;
} else {
purgedMemCount++;
purgedMemSize += size;
}
resourceList.remove(key);
} else {
log.warn("Failed to delete the '{}' staging resource",
resource.getPath());
}
} else {
log.info(
"The '{}' staging resource qualified for being cleaned, but was in use. It will not be cleaned right now",
resource.getPath());
}
}
}
catch (Exception e) {
log.warn("Failed to delete the '{}' staging resource due to unexpected error.", e);
}
}
}
if (purgedFileCount > 0) {
if (purgedFileSize < 1000) {
log.debug("Purged {} staged files, freeing {} bytes of disk space",
purgedFileCount, (int) (purgedFileSize));
} else {
log.debug("Purged {} staged files, freeing {} kbytes of disk space",
purgedFileCount, (int) (purgedFileSize / 1000));
}
}
if (purgedMemCount > 0) {
if (purgedMemSize < 1000) {
log.debug("Purged {} staged memory buffers, freeing {} bytes of memory",
purgedMemCount, (int) (purgedMemSize));
} else {
log.debug("Purged {} staged memory buffers, freeing {} kbytes of memory",
purgedMemCount, (int) (purgedMemSize / 1000));
}
}
return purgedFileCount + purgedMemCount;
}

/**
* Create a handle that can be written to
*/
Expand Down

0 comments on commit 5544de8

Please sign in to comment.