Skip to content

Commit

Permalink
0003760: Symmetric startup is very slow when there are lots of stagin…
Browse files Browse the repository at this point in the history
…g files on a SAN
  • Loading branch information
mmichalek committed Nov 27, 2018
1 parent fff337d commit 89ac93f
Show file tree
Hide file tree
Showing 7 changed files with 378 additions and 211 deletions.
@@ -1,16 +1,15 @@
package org.jumpmind.symmetric.io.stage;

import static org.jumpmind.symmetric.common.Constants.STAGING_CATEGORY_INCOMING;
import static org.jumpmind.symmetric.common.Constants.STAGING_CATEGORY_OUTGOING;
import static org.jumpmind.symmetric.common.Constants.*;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.io.stage.IStagedResource.State;
import org.jumpmind.symmetric.model.BatchId;
import org.jumpmind.symmetric.service.ClusterConstants;

Expand All @@ -22,106 +21,109 @@ public BatchStagingManager(ISymmetricEngine engine, String directory) {
super(directory,engine.getParameterService().is(ParameterConstants.CLUSTER_LOCKING_ENABLED));
this.engine = engine;
}

protected Map<String, Long> getBiggestBatchIds(List<BatchId> batches) {
Map<String,Long> biggest = new HashMap<String,Long>();
for (BatchId batchId : batches) {
Long batchNumber = biggest.get(batchId.getNodeId());
if (batchNumber == null || batchNumber < batchId.getBatchId()) {
biggest.put(batchId.getNodeId(), batchId.getBatchId());
}
}
return biggest;
}

@Override
public long clean(long ttlInMs) {
if (!engine.getClusterService().lock(ClusterConstants.STAGE_MANAGEMENT)) {
log.debug("Could not get a lock to run stage management");
return 0;
}

try {
try {
boolean purgeBasedOnTTL = engine.getParameterService().is(ParameterConstants.STREAM_TO_FILE_PURGE_ON_TTL_ENABLED, false);
if (purgeBasedOnTTL) {
return super.clean(ttlInMs);
} else {
synchronized (StagingManager.class) {
return purgeStagingBasedOnDatabaseStatus(ttlInMs);
}
}
} finally {
boolean recordIncomingBatchesEnabled = engine.getIncomingBatchService().isRecordOkBatchesEnabled();
long minTtlInMs = engine.getParameterService().getLong(ParameterConstants.STREAM_TO_FILE_MIN_TIME_TO_LIVE_MS,600000);
Set<Long> outgoingBatches = ttlInMs == 0 ? new HashSet<Long>() : new HashSet<Long>(engine.getOutgoingBatchService().getAllBatches());
Set<BatchId> incomingBatches = ttlInMs == 0 ? new HashSet<BatchId>() : new HashSet<BatchId>(engine.getIncomingBatchService().getAllBatches());
Map<String, Long> biggestIncomingByNode = getBiggestBatchIds(incomingBatches);

StagingPurgeContext context = new StagingPurgeContext();
context.putContextValue("purgeBasedOnTTL", purgeBasedOnTTL);
context.putContextValue("recordIncomingBatchesEnabled", recordIncomingBatchesEnabled);
context.putContextValue("minTtlInMs", minTtlInMs);
context.putContextValue("outgoingBatches", outgoingBatches);
context.putContextValue("incomingBatches", incomingBatches);
context.putContextValue("biggestIncomingByNode", biggestIncomingByNode);

return super.clean(ttlInMs, context);
} finally {
engine.getClusterService().unlock(ClusterConstants.STAGE_MANAGEMENT);
}
}

protected long purgeStagingBasedOnDatabaseStatus(long ttlInMs) {
boolean recordIncomingBatchesEnabled = engine.getIncomingBatchService().isRecordOkBatchesEnabled();
long minTtlInMs = engine.getParameterService().getLong(ParameterConstants.STREAM_TO_FILE_MIN_TIME_TO_LIVE_MS,600000);
List<Long> outgoingBatches = ttlInMs == 0 ? new ArrayList<Long>() : engine.getOutgoingBatchService().getAllBatches();
List<BatchId> incomingBatches = ttlInMs == 0 ? new ArrayList<BatchId>() : engine.getIncomingBatchService().getAllBatches();
Map<String, Long> biggestIncomingByNode = getBiggestBatchIds(incomingBatches);
synchronized (StagingManager.class) {
log.trace("Purging staging area");
Set<String> keys = getResourceReferences();
long purgedFileCount = 0;
long purgedFileSize = 0;
for (String key : keys) {
IStagedResource resource = find(key);
String[] path = key.split("/");
/*
* resource could have deleted itself between the time the keys
* were cloned and now
*/
if (resource != null && !resource.isInUse()) {
boolean resourceIsOld = (System.currentTimeMillis() - resource.getLastUpdateTime()) > ttlInMs;
boolean resourceClearsMinTimeHurdle = (System.currentTimeMillis() - resource.getLastUpdateTime()) > minTtlInMs;
if (path[0].equals(STAGING_CATEGORY_OUTGOING)) {
try {
Long batchId = new Long(path[path.length - 1]);
if (!outgoingBatches.contains(batchId) || ttlInMs == 0) {
purgedFileCount++;
purgedFileSize+=resource.getSize();
resource.delete();
}
} catch (NumberFormatException e) {
if (resourceIsOld || ttlInMs == 0) {
purgedFileCount++;
purgedFileSize+=resource.getSize();
resource.delete();
}
}
} else if (path[0].equals(STAGING_CATEGORY_INCOMING)) {
try {
BatchId batchId = new BatchId(new Long(path[path.length - 1]), path[1]);
Long biggestBatchId = biggestIncomingByNode.get(batchId.getNodeId());
if ((recordIncomingBatchesEnabled && !incomingBatches.contains(batchId) &&
biggestBatchId != null && biggestBatchId > batchId.getBatchId() &&
resourceClearsMinTimeHurdle)
|| (!recordIncomingBatchesEnabled && resourceIsOld) || ttlInMs == 0) {
purgedFileCount++;
purgedFileSize+=resource.getSize();
resource.delete();
}
} catch (NumberFormatException e) {
if (resourceIsOld || ttlInMs == 0) {
purgedFileCount++;
purgedFileSize+=resource.getSize();
resource.delete();
}
}
}
}
@Override
protected boolean shouldCleanPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context) {
if (context.getBoolean("purgeBasedOnTTL")) {
return super.shouldCleanPath(resource, ttlInMs, context);
}

if (resource.isInUse() || resource.getState() != State.DONE) {
return false;
}

String[] path = resource.getPath().split("/");

boolean resourceIsOld = (System.currentTimeMillis() - resource.getLastUpdateTime()) > ttlInMs;
boolean resourceClearsMinTimeHurdle = (System.currentTimeMillis() - resource.getLastUpdateTime()) > context.getLong("minTtlInMs");

if (path[0].equals(STAGING_CATEGORY_OUTGOING)) {
return shouldCleanOutgoingPath(resource, ttlInMs, context, path, resourceIsOld);
} else if (path[0].equals(STAGING_CATEGORY_INCOMING)) {
return shouldCleanIncomingPath(resource, ttlInMs, context, path, resourceIsOld, resourceClearsMinTimeHurdle);
} else {
log.warn("Unrecognized path: " + resource.getPath());
}
return false;
}

protected boolean shouldCleanOutgoingPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context, String[] path,
boolean resourceIsOld) {
Set<Long> outgoingBatches = (Set<Long>) context.getContextValue("outgoingBatches");
try {
Long batchId = new Long(path[path.length - 1]);
if (!outgoingBatches.contains(batchId) || ttlInMs == 0) {
return true;
}
} catch (NumberFormatException e) {
if (resourceIsOld || ttlInMs == 0) {
return true;
}
}

return false;
}

protected boolean shouldCleanIncomingPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context, String[] path, boolean resourceIsOld,
boolean resourceClearsMinTimeHurdle) {
Set<BatchId> incomingBatches = (Set<BatchId>) context.getContextValue("incomingBatches");
Map<String, Long> biggestIncomingByNode = (Map<String, Long>) context.getContextValue("biggestIncomingByNode");
boolean recordIncomingBatchesEnabled = context.getBoolean("recordIncomingBatchesEnabled");
try {
BatchId batchId = new BatchId(new Long(path[path.length - 1]), path[1]);
Long biggestBatchId = biggestIncomingByNode.get(batchId.getNodeId());
if ((recordIncomingBatchesEnabled && !incomingBatches.contains(batchId) &&
biggestBatchId != null && biggestBatchId > batchId.getBatchId() &&
resourceClearsMinTimeHurdle)
|| (!recordIncomingBatchesEnabled && resourceIsOld) || ttlInMs == 0) {
return true;
}
if (purgedFileCount > 0) {
if (purgedFileSize < 1000) {
log.info("Purged {} from stage, freeing {} bytes of space", purgedFileCount, (int) (purgedFileSize));
} else {
log.info("Purged {} from stage, freeing {} kbytes of space", purgedFileCount, (int) (purgedFileSize / 1000));
}
} catch (NumberFormatException e) {
if (resourceIsOld || ttlInMs == 0) {
return true;
}
return purgedFileCount;
}

return false;
}


protected Map<String, Long> getBiggestBatchIds(Set<BatchId> batches) {
Map<String,Long> biggest = new HashMap<String,Long>();
for (BatchId batchId : batches) {
Long batchNumber = biggest.get(batchId.getNodeId());
if (batchNumber == null || batchNumber < batchId.getBatchId()) {
biggest.put(batchId.getNodeId(), batchId.getBatchId());
}
}
return biggest;
}

}
Expand Up @@ -43,6 +43,7 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -189,7 +190,7 @@ protected enum ExtractMode { FOR_SYM_CLIENT, FOR_PAYLOAD_CLIENT, EXTRACT_ONLY };

private IClusterService clusterService;

private Map<String, BatchLock> locks = new HashMap<String, BatchLock>();
private Map<String, BatchLock> locks = new ConcurrentHashMap<String, BatchLock>();

private CustomizableThreadFactory threadPoolFactory;

Expand Down Expand Up @@ -986,7 +987,7 @@ private BatchLock acquireLock(OutgoingBatch batch, boolean useStagingDataWriter)
} catch (InterruptedException e) {
throw new org.jumpmind.exception.InterruptedException(e);
}

log.debug("Acquired {}", lock);
return lock;
}

Expand All @@ -1011,7 +1012,7 @@ protected StagingFileLock acquireStagingFileLock(OutgoingBatch batch) {
log.warn("Lock {} in place for {} > about to BREAK the lock.", fileLock.getLockFile(), DurationFormatUtils.formatDurationWords(lockAge, true, true));
fileLock.breakLock();
} else {
if ((iterations % 10) == 0) {
if ((iterations % 10) == 0 && iterations > 0) {
log.info("Lock {} in place for {}, waiting...", fileLock.getLockFile(), DurationFormatUtils.formatDurationWords(lockAge, true, true));
} else {
log.debug("Lock {} in place for {}, waiting...", fileLock.getLockFile(), DurationFormatUtils.formatDurationWords(lockAge, true, true));
Expand Down Expand Up @@ -1041,9 +1042,11 @@ protected void releaseLock(BatchLock lock, OutgoingBatch batch, boolean useStagi
locks.remove(lock.semaphoreKey);
}
lock.release();
log.debug("Released memory {}", lock);
}
if (lock.fileLock != null) {
lock.fileLock.releaseLock();
log.debug("Released file {}", lock);
}
}
}
Expand Down Expand Up @@ -2383,6 +2386,11 @@ public void release() {
private Semaphore inMemoryLock = new Semaphore(1);
StagingFileLock fileLock;
int referenceCount = 0;

@Override
public String toString() {
return semaphoreKey + " " + super.toString();
}
}


Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.io.data.Batch;
Expand All @@ -40,7 +41,7 @@ public class StagingDataWriter extends AbstractProtocolDataWriter {

private String category;

private Map<Batch, IStagedResource> stagedResources = new HashMap<Batch, IStagedResource>();
private Map<Batch, IStagedResource> stagedResources = new ConcurrentHashMap<Batch, IStagedResource>();

private long memoryThresholdInBytes;

Expand Down
Expand Up @@ -78,15 +78,19 @@ public StagedResource(File directory, String path, StagingManager stagingManager
this.path = path;
this.stagingManager = stagingManager;
lastUpdateTime = System.currentTimeMillis();

File doneFile = buildFile(State.DONE);

if (buildFile(State.DONE).exists()){
this.state = State.DONE;
} else {
this.state = State.CREATE;
}
this.file = buildFile(state);
if (file.exists()) {
if (doneFile.exists()) { // Only call exists once for done files. This can be expensive on some SAN type devices.
this.state = State.DONE;
this.file = doneFile;
lastUpdateTime = file.lastModified();
} else {
this.state = State.CREATE;
this.file = buildFile(state);
if (file.exists()) {
lastUpdateTime = file.lastModified();
}
}
}

Expand Down Expand Up @@ -177,6 +181,15 @@ protected void handleFailedRename(File oldFile, File newFile) {
String msg = null;
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S");

int tries = 5;

while (!newFile.exists() && tries-- > 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
}
}

if (newFile.exists()) {
if (isSameFile(oldFile, newFile)) {
msg = String.format("Had trouble renaming file. The destination file already exists, and is the same size - will proceed. " +
Expand Down Expand Up @@ -294,7 +307,7 @@ public OutputStream getOutputStream() {
try {
if (outputStream == null) {
if (file != null && file.exists()) {
log.warn("We had to delete {} because it already existed",
log.warn("getOutputStream had to delete {} because it already existed",
file.getAbsolutePath());
file.delete();
}
Expand Down Expand Up @@ -330,7 +343,8 @@ public synchronized InputStream getInputStream() {
public BufferedWriter getWriter(long threshold) {
if (writer == null) {
if (file != null && file.exists()) {
log.warn("We had to delete {} because it already existed", file.getAbsolutePath());
log.warn("getWriter had to delete {} because it already existed.",
file.getAbsolutePath(), new RuntimeException("Stack Trace"));
file.delete();
} else if (this.memoryBuffer != null) {
log.warn("We had to delete the memory buffer for {} because it already existed", getPath());
Expand Down Expand Up @@ -379,8 +393,7 @@ public boolean delete() {
}

if (deleted) {
stagingManager.resourcePaths.remove(path);
stagingManager.inUse.remove(path);
stagingManager.removeResourcePath(path);
if (log.isDebugEnabled() && path.contains("outgoing")) {
log.debug("Deleted staging resource {}", path);
}
Expand Down
Expand Up @@ -73,10 +73,26 @@ public void setLockFailureMessage(String lockFailureMessage) {
}

public void releaseLock() {
if (lockFile.delete()) {
int retries = 5;

boolean ok = false;

do {
ok = lockFile.delete();
if (!ok) {
try {
Thread.sleep(1000);
} catch (Exception ex) {
// no action.
}
}
} while (!ok && retries-- > 0);

if (ok) {
log.debug("Lock {} released successfully.", lockFile);
} else {
log.warn("Failed to release lock {}", lockFile);
boolean exists = lockFile.exists();
log.warn("Failed to release lock {} exists={}", lockFile, exists);
}
}

Expand Down

0 comments on commit 89ac93f

Please sign in to comment.