Skip to content

Commit

Permalink
0002928: Staging purge process fails to clean registration batch. Purge
Browse files Browse the repository at this point in the history
staging hourly based on absence of batch in batch tables
  • Loading branch information
chenson42 committed Dec 1, 2016
1 parent e0c61a6 commit 0fe3756
Show file tree
Hide file tree
Showing 21 changed files with 199 additions and 116 deletions.
Expand Up @@ -53,8 +53,8 @@
import org.jumpmind.symmetric.common.SystemConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.db.JdbcSymmetricDialectFactory;
import org.jumpmind.symmetric.io.stage.BatchStagingManager;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.io.stage.StagingManager;
import org.jumpmind.symmetric.job.IJobManager;
import org.jumpmind.symmetric.job.JobManager;
import org.jumpmind.symmetric.service.IExtensionService;
Expand Down Expand Up @@ -349,7 +349,7 @@ protected IJobManager createJobManager() {
@Override
protected IStagingManager createStagingManager() {
String directory = parameterService.getTempDirectory();
return new StagingManager(directory);
return new BatchStagingManager(this, directory);
}

protected static void waitForAvailableDatabase(DataSource dataSource) {
Expand Down
Expand Up @@ -331,7 +331,7 @@ protected void init() {
this.concurrentConnectionManager = new ConcurrentConnectionManager(parameterService,
statisticManager);
this.purgeService = new PurgeService(parameterService, symmetricDialect, clusterService,
statisticManager, extensionService, stagingManager);
statisticManager, extensionService);
this.transformService = new TransformService(parameterService, symmetricDialect,
configurationService, extensionService);
this.loadFilterService = new LoadFilterService(parameterService, symmetricDialect,
Expand Down
Expand Up @@ -47,7 +47,6 @@ public DbCompareDiffWriter(ISymmetricEngine targetEngine, DbCompareTables tables

private ISymmetricEngine targetEngine;
private DbCompareTables tables;
private String fileName;
private OutputStream stream;

public void writeDelete(DbCompareRow targetCompareRow) {
Expand Down
Expand Up @@ -20,7 +20,6 @@
*/
package org.jumpmind.symmetric.io;

import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
Expand Down
Expand Up @@ -34,7 +34,6 @@
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.TypeMap;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -60,7 +59,6 @@ protected void initDateFormats() {
dateFormats.add(new SimpleDateFormat("MM-dd-yyyy HH:mm:ss.S"));
}

@SuppressWarnings("unchecked")
public int compareValues(Column sourceColumn, Column targetColumn, String sourceValue, String targetValue) {

if (sourceValue == null && targetValue == null) {
Expand Down Expand Up @@ -167,7 +165,7 @@ protected int compareDefault(Column sourceColumn, Column targetColumn, Object so
}

if (sourceValue instanceof Comparable<?>) {
return ((Comparable)sourceValue).compareTo(targetValue);
return ((Comparable<Object>)sourceValue).compareTo(targetValue);
} else if (sourceValue instanceof String) {
return ((String)sourceValue).compareTo((String)targetValue);
} else {
Expand Down
@@ -0,0 +1,86 @@
package org.jumpmind.symmetric.io.stage;

import static org.jumpmind.symmetric.common.Constants.STAGING_CATEGORY_INCOMING;
import static org.jumpmind.symmetric.common.Constants.STAGING_CATEGORY_OUTGOING;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.model.BatchId;

public class BatchStagingManager extends StagingManager {

ISymmetricEngine engine;

public BatchStagingManager(ISymmetricEngine engine, String directory) {
super(directory);
this.engine = engine;
}

@Override
public long clean(long ttlInMs) {
boolean recordIncomingBatchesEnabled = engine.getIncomingBatchService().isRecordOkBatchesEnabled();
List<Long> outgoingBatches = engine.getOutgoingBatchService().getAllBatches();
List<BatchId> incomingBatches = engine.getIncomingBatchService().getAllBatches();
synchronized (StagingManager.class) {
log.trace("Purging staging area");
Set<String> keys = new HashSet<String>(resourceList.keySet());
long purgedFileCount = 0;
long purgedFileSize = 0;
for (String key : keys) {
IStagedResource resource = resourceList.get(key);
String[] path = key.split("/");
/*
* resource could have deleted itself between the time the keys
* were cloned and now
*/
if (resource != null) {
boolean resourceIsOld = (System.currentTimeMillis() - resource.getLastUpdateTime()) > ttlInMs;
if (path[0].equals(STAGING_CATEGORY_OUTGOING)) {
try {
Long batchId = new Long(path[path.length - 1]);
if (!outgoingBatches.contains(batchId) || ttlInMs == 0) {
purgedFileCount++;
purgedFileSize+=resource.getSize();
resource.delete();
}
} catch (NumberFormatException e) {
if (resourceIsOld || ttlInMs == 0) {
purgedFileCount++;
purgedFileSize+=resource.getSize();
resource.delete();
}
}
} else if (path[0].equals(STAGING_CATEGORY_INCOMING)) {
try {
BatchId batchId = new BatchId(new Long(path[path.length - 1]), path[1]);
if ((recordIncomingBatchesEnabled && !incomingBatches.contains(batchId))
|| (!recordIncomingBatchesEnabled && resourceIsOld) || ttlInMs == 0) {
purgedFileCount++;
purgedFileSize+=resource.getSize();
resource.delete();
}
} catch (NumberFormatException e) {
if (resourceIsOld || ttlInMs == 0) {
purgedFileCount++;
purgedFileSize+=resource.getSize();
resource.delete();
}
}
}
}
}
if (purgedFileCount > 0) {
if (purgedFileSize < 1000) {
log.info("Purged {} from stage, freeing {} bytes of space", purgedFileCount, (int) (purgedFileSize));
} else {
log.info("Purged {} from stage, freeing {} kbytes of space", purgedFileCount, (int) (purgedFileSize / 1000));
}
}
return purgedFileCount;
}
}

}
Expand Up @@ -59,4 +59,34 @@ public String toString() {
return nodeId + "-" + batchId;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (batchId ^ (batchId >>> 32));
result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
BatchId other = (BatchId) obj;
if (batchId != other.batchId)
return false;
if (nodeId == null) {
if (other.nodeId != null)
return false;
} else if (!nodeId.equals(other.nodeId))
return false;
return true;
}



}
Expand Up @@ -74,4 +74,6 @@ public List<IncomingBatch> listIncomingBatches(List<String> nodeIds, List<String

public Map<String,BatchId> findMaxBatchIdsByChannel();

public List<BatchId> getAllBatches();

}
Expand Up @@ -112,5 +112,7 @@ public List<OutgoingBatch> listOutgoingBatches(List<String> nodeIds, List<String
public Map<String, Map<String, LoadStatusSummary>> getLoadStatusSummarySql(long loadId);

public void copyOutgoingBatches(String channelId, long startBatchId, String fromNodeId, String toNodeId);

public List<Long> getAllBatches();

}
Expand Up @@ -213,26 +213,26 @@ protected String buildBatchWhere(List<String> nodeIds, List<String> channels,

StringBuilder where = new StringBuilder();
boolean needsAnd = false;
if (nodeIds.size() > 0) {
if (nodeIds != null && nodeIds.size() > 0) {
where.append("node_id in (:NODES)");
needsAnd = true;
}

if (channels.size() > 0) {
if (channels != null && channels.size() > 0) {
if (needsAnd) {
where.append(" and ");
}
where.append("channel_id in (:CHANNELS)");
needsAnd = true;
}
if (loads.size() > 0) {
if (loads != null && loads.size() > 0) {
if (needsAnd) {
where.append(" and ");
}
where.append("load_id in (:LOADS)");
needsAnd = true;
}
if (statuses.size() > 0) {
if (statuses != null && statuses.size() > 0) {
if (needsAnd) {
where.append(" and ");
}
Expand Down
Expand Up @@ -362,9 +362,17 @@ public Map<String, BatchId> findMaxBatchIdsByChannel() {
IncomingBatch.Status.OK.name());
return ids;
}

@Override
public List<BatchId> getAllBatches() {
return sqlTemplateDirty.query(getSql("getAllBatchesSql"), new BatchIdMapper());
}

class BatchIdMapper implements ISqlRowMapper<BatchId> {
Map<String, BatchId> ids;

public BatchIdMapper() {
}

public BatchIdMapper(Map<String, BatchId> ids) {
this.ids = ids;
Expand All @@ -374,7 +382,9 @@ public BatchId mapRow(Row rs) {
BatchId batch = new BatchId();
batch.setBatchId(rs.getLong("batch_id"));
batch.setNodeId(rs.getString("node_id"));
ids.put(rs.getString("channel_id"), batch);
if (ids != null) {
ids.put(rs.getString("channel_id"), batch);
}
return batch;
}
}
Expand Down
Expand Up @@ -75,6 +75,8 @@ public IncomingBatchServiceSqlMap(IDatabasePlatform platform, Map<String, String
putSql("deleteIncomingBatchByNodeSql" ,"delete from $(incoming_batch) where node_id = ?");

putSql("maxBatchIdsSql", "select max(batch_id) as batch_id, node_id, channel_id from $(incoming_batch) where status = ? group by node_id, channel_id");

putSql("getAllBatchesSql", "select batch_id, node_id from $(incoming_batch)");
}

}
Expand Up @@ -37,6 +37,7 @@
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.mapper.LongMapper;
import org.jumpmind.db.sql.mapper.StringMapper;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
Expand Down Expand Up @@ -787,6 +788,11 @@ public OutgoingLoadSummary mapRow(Row rs) {
return loads;
}

@Override
public List<Long> getAllBatches() {
return sqlTemplateDirty.query(getSql("getAllBatchesSql"), new LongMapper());
}

class OutgoingBatchSummaryMapper implements ISqlRowMapper<OutgoingBatchSummary> {
public OutgoingBatchSummary mapRow(Row rs) {
OutgoingBatchSummary summary = new OutgoingBatchSummary();
Expand Down Expand Up @@ -862,5 +868,6 @@ public OutgoingBatch mapRow(Row rs) {
}
}
}


}
Expand Up @@ -174,6 +174,8 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform,
+ " (select batch_id, ?, channel_id, 'NE', load_id, extract_job_flag, load_flag, common_flag, reload_event_count, other_event_count, "
+ " last_update_hostname, current_timestamp, create_time, 'copy' from $(outgoing_batch) where node_id=? and channel_id=? and batch_id > ?) ");


putSql("getAllBatchesSql", "select batch_id from $(outgoing_batch)");

}

Expand Down
Expand Up @@ -33,11 +33,9 @@
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.Row;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.ext.IPurgeListener;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.model.ExtractRequest;
import org.jumpmind.symmetric.model.IncomingBatch;
import org.jumpmind.symmetric.model.OutgoingBatch;
Expand All @@ -64,15 +62,12 @@ enum MinMaxDeleteSql {

private IExtensionService extensionService;

private IStagingManager stagingManager;

public PurgeService(IParameterService parameterService, ISymmetricDialect symmetricDialect,
IClusterService clusterService, IStatisticManager statisticManager, IExtensionService extensionService, IStagingManager stagingManager) {
IClusterService clusterService, IStatisticManager statisticManager, IExtensionService extensionService) {
super(parameterService, symmetricDialect);
this.clusterService = clusterService;
this.statisticManager = statisticManager;
this.extensionService = extensionService;
this.stagingManager = stagingManager;

setSqlMap(new PurgeServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens()));
Expand All @@ -88,16 +83,7 @@ public long purgeOutgoing(boolean force) {
List<IPurgeListener> purgeListeners = extensionService.getExtensionPointList(IPurgeListener.class);
for (IPurgeListener purgeListener : purgeListeners) {
rowsPurged += purgeListener.purgeOutgoing(force);
}

List<Long> remainingBatches = sqlTemplate.query(getSql("getAllOutgoingBatches"), new ISqlRowMapper<Long>() {
@Override
public Long mapRow(Row row) {
return row.getLong("batch_id");
}
});
long stagingFilesPurged = stagingManager.cleanExcessBatches(remainingBatches, Constants.STAGING_CATEGORY_OUTGOING);
log.info("The outgoing purge process removed " + stagingFilesPurged + " outgoing staging files.");
}
return rowsPurged;
}

Expand All @@ -112,16 +98,7 @@ public long purgeIncoming(boolean force) {
for (IPurgeListener purgeListener : purgeListeners) {
rowsPurged += purgeListener.purgeIncoming(force);
}

List<Long> remainingBatches = sqlTemplate.query(getSql("getAllIncomingBatches"), new ISqlRowMapper<Long>() {
@Override
public Long mapRow(Row row) {
return row.getLong("batch_id");
}
});
long stagingFilesPurged = stagingManager.cleanExcessBatches(remainingBatches, Constants.STAGING_CATEGORY_INCOMING);
log.info("The incoming purge process removed " + stagingFilesPurged + " incoming staging files.");


return rowsPurged;
}

Expand Down
Expand Up @@ -114,8 +114,6 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac
putSql("deleteDataByCreateTimeSql", "delete from sym_data where create_time < ?");
putSql("deleteExtractRequestByCreateTimeSql", "delete from sym_extract_request where create_time < ?");

putSql("getAllOutgoingBatches", "select batch_id from $(outgoing_batch)");
putSql("getAllIncomingBatches", "select batch_id from $(incoming_batch)");
}

}
Expand Up @@ -780,7 +780,7 @@ job.refresh.cache.cron=0/30 * * * * *
#
# DatabaseOverridable: true
# Tags: jobs
job.stage.management.period.time.ms=900000
job.stage.management.cron=0 0 * * * *

# This is how often the initial load extract queue job will run in the background
#
Expand Down

0 comments on commit 0fe3756

Please sign in to comment.