Skip to content

Commit

Permalink
0003807: Symmetric startup is very slow when there are lots of stagin…
Browse files Browse the repository at this point in the history
…g files on a SAN (3.9)
  • Loading branch information
mmichalek committed Nov 28, 2018
1 parent 5f261e4 commit 37bc426
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 199 deletions.
@@ -1,16 +1,19 @@
package org.jumpmind.symmetric.io.stage;

import static org.jumpmind.symmetric.common.Constants.STAGING_CATEGORY_INCOMING;

import static org.jumpmind.symmetric.common.Constants.STAGING_CATEGORY_OUTGOING;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.io.stage.IStagedResource.State;
import org.jumpmind.symmetric.model.BatchId;
import org.jumpmind.symmetric.service.ClusterConstants;

Expand All @@ -23,7 +26,7 @@ public BatchStagingManager(ISymmetricEngine engine, String directory) {
this.engine = engine;
}

protected Map<String, Long> getBiggestBatchIds(List<BatchId> batches) {
protected Map<String, Long> getBiggestBatchIds(Set<BatchId> batches) {
Map<String,Long> biggest = new HashMap<String,Long>();
for (BatchId batchId : batches) {
Long batchNumber = biggest.get(batchId.getNodeId());
Expand All @@ -32,7 +35,7 @@ protected Map<String, Long> getBiggestBatchIds(List<BatchId> batches) {
}
}
return biggest;
}
}

@Override
public long clean(long ttlInMs) {
Expand All @@ -50,17 +53,24 @@ public long clean(long ttlInMs) {
} catch (Exception e) {
// during setup or un-install, it's possible sym_lock table isn't available yet
}

try {
try {
boolean purgeBasedOnTTL = engine.getParameterService().is(ParameterConstants.STREAM_TO_FILE_PURGE_ON_TTL_ENABLED, false);
if (purgeBasedOnTTL) {
return super.clean(ttlInMs);
} else {
synchronized (StagingManager.class) {
return purgeStagingBasedOnDatabaseStatus(ttlInMs);
}
}
} finally {
boolean recordIncomingBatchesEnabled = engine.getIncomingBatchService().isRecordOkBatchesEnabled();
long minTtlInMs = engine.getParameterService().getLong(ParameterConstants.STREAM_TO_FILE_MIN_TIME_TO_LIVE_MS,600000);
Set<Long> outgoingBatches = ttlInMs == 0 ? new HashSet<Long>() : new HashSet<Long>(engine.getOutgoingBatchService().getAllBatches());
Set<BatchId> incomingBatches = ttlInMs == 0 ? new HashSet<BatchId>() : new HashSet<BatchId>(engine.getIncomingBatchService().getAllBatches());
Map<String, Long> biggestIncomingByNode = getBiggestBatchIds(incomingBatches);

StagingPurgeContext context = new StagingPurgeContext();
context.putContextValue("purgeBasedOnTTL", purgeBasedOnTTL);
context.putContextValue("recordIncomingBatchesEnabled", recordIncomingBatchesEnabled);
context.putContextValue("minTtlInMs", minTtlInMs);
context.putContextValue("outgoingBatches", outgoingBatches);
context.putContextValue("incomingBatches", incomingBatches);
context.putContextValue("biggestIncomingByNode", biggestIncomingByNode);

return super.clean(ttlInMs, context);
} finally {
if (isLockAcquired) {
try {
engine.getClusterService().unlock(ClusterConstants.STAGE_MANAGEMENT);
Expand All @@ -70,73 +80,69 @@ public long clean(long ttlInMs) {
}
}

protected long purgeStagingBasedOnDatabaseStatus(long ttlInMs) {
boolean recordIncomingBatchesEnabled = engine.getIncomingBatchService().isRecordOkBatchesEnabled();
long minTtlInMs = engine.getParameterService().getLong(ParameterConstants.STREAM_TO_FILE_MIN_TIME_TO_LIVE_MS,600000);
List<Long> outgoingBatches = ttlInMs == 0 ? new ArrayList<Long>() : engine.getOutgoingBatchService().getAllBatches();
List<BatchId> incomingBatches = ttlInMs == 0 ? new ArrayList<BatchId>() : engine.getIncomingBatchService().getAllBatches();
Map<String, Long> biggestIncomingByNode = getBiggestBatchIds(incomingBatches);
synchronized (StagingManager.class) {
log.trace("Purging staging area");
Set<String> keys = getResourceReferences();
long purgedFileCount = 0;
long purgedFileSize = 0;
for (String key : keys) {
IStagedResource resource = find(key);
String[] path = key.split("/");
/*
* resource could have deleted itself between the time the keys
* were cloned and now
*/
if (resource != null && !resource.isInUse()) {
boolean resourceIsOld = (System.currentTimeMillis() - resource.getLastUpdateTime()) > ttlInMs;
boolean resourceClearsMinTimeHurdle = (System.currentTimeMillis() - resource.getLastUpdateTime()) > minTtlInMs;
if (path[0].equals(STAGING_CATEGORY_OUTGOING)) {
try {
Long batchId = new Long(path[path.length - 1]);
if (!outgoingBatches.contains(batchId) || ttlInMs == 0) {
purgedFileCount++;
purgedFileSize+=resource.getSize();
resource.delete();
}
} catch (NumberFormatException e) {
if (resourceIsOld || ttlInMs == 0) {
purgedFileCount++;
purgedFileSize+=resource.getSize();
resource.delete();
}
}
} else if (path[0].equals(STAGING_CATEGORY_INCOMING)) {
try {
BatchId batchId = new BatchId(new Long(path[path.length - 1]), path[1]);
Long biggestBatchId = biggestIncomingByNode.get(batchId.getNodeId());
if ((recordIncomingBatchesEnabled && !incomingBatches.contains(batchId) &&
biggestBatchId != null && biggestBatchId > batchId.getBatchId() &&
resourceClearsMinTimeHurdle)
|| (!recordIncomingBatchesEnabled && resourceIsOld) || ttlInMs == 0) {
purgedFileCount++;
purgedFileSize+=resource.getSize();
resource.delete();
}
} catch (NumberFormatException e) {
if (resourceIsOld || ttlInMs == 0) {
purgedFileCount++;
purgedFileSize+=resource.getSize();
resource.delete();
}
}
}
}
@Override
protected boolean shouldCleanPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context) {
if (context.getBoolean("purgeBasedOnTTL")) {
return super.shouldCleanPath(resource, ttlInMs, context);
}

if (resource.isInUse() || resource.getState() != State.DONE) {
return false;
}

String[] path = resource.getPath().split("/");

boolean resourceIsOld = (System.currentTimeMillis() - resource.getLastUpdateTime()) > ttlInMs;
boolean resourceClearsMinTimeHurdle = (System.currentTimeMillis() - resource.getLastUpdateTime()) > context.getLong("minTtlInMs");

if (path[0].equals(STAGING_CATEGORY_OUTGOING)) {
return shouldCleanOutgoingPath(resource, ttlInMs, context, path, resourceIsOld);
} else if (path[0].equals(STAGING_CATEGORY_INCOMING)) {
return shouldCleanIncomingPath(resource, ttlInMs, context, path, resourceIsOld, resourceClearsMinTimeHurdle);
} else {
log.warn("Unrecognized path: " + resource.getPath());
}
return false;
}

protected boolean shouldCleanOutgoingPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context, String[] path,
boolean resourceIsOld) {
Set<Long> outgoingBatches = (Set<Long>) context.getContextValue("outgoingBatches");
try {
Long batchId = new Long(path[path.length - 1]);
if (!outgoingBatches.contains(batchId) || ttlInMs == 0) {
return true;
}
if (purgedFileCount > 0) {
if (purgedFileSize < 1000) {
log.info("Purged {} from stage, freeing {} bytes of space", purgedFileCount, (int) (purgedFileSize));
} else {
log.info("Purged {} from stage, freeing {} kbytes of space", purgedFileCount, (int) (purgedFileSize / 1000));
}
} catch (NumberFormatException e) {
if (resourceIsOld || ttlInMs == 0) {
return true;
}
}

return false;
}

protected boolean shouldCleanIncomingPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context, String[] path, boolean resourceIsOld,
boolean resourceClearsMinTimeHurdle) {
Set<BatchId> incomingBatches = (Set<BatchId>) context.getContextValue("incomingBatches");
Map<String, Long> biggestIncomingByNode = (Map<String, Long>) context.getContextValue("biggestIncomingByNode");
boolean recordIncomingBatchesEnabled = context.getBoolean("recordIncomingBatchesEnabled");
try {
BatchId batchId = new BatchId(new Long(path[path.length - 1]), path[1]);
Long biggestBatchId = biggestIncomingByNode.get(batchId.getNodeId());
if ((recordIncomingBatchesEnabled && !incomingBatches.contains(batchId) &&
biggestBatchId != null && biggestBatchId > batchId.getBatchId() &&
resourceClearsMinTimeHurdle)
|| (!recordIncomingBatchesEnabled && resourceIsOld) || ttlInMs == 0) {
return true;
}
} catch (NumberFormatException e) {
if (resourceIsOld || ttlInMs == 0) {
return true;
}
return purgedFileCount;
}

return false;
}

}
Expand Up @@ -45,6 +45,7 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -205,7 +206,7 @@ protected enum ExtractMode { FOR_SYM_CLIENT, FOR_PAYLOAD_CLIENT, EXTRACT_ONLY };

private IExtensionService extensionService;

private Map<String, BatchLock> locks = new HashMap<String, BatchLock>();
private Map<String, BatchLock> locks = new ConcurrentHashMap<String, BatchLock>();

private CustomizableThreadFactory threadPoolFactory;

Expand Down Expand Up @@ -1134,6 +1135,7 @@ private BatchLock acquireLock(OutgoingBatch batch, boolean useStagingDataWriter)
throw new org.jumpmind.exception.InterruptedException(e);
}

log.debug("Acquired {}", lock);
return lock;
}

Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.io.data.Batch;
Expand All @@ -40,7 +41,7 @@ public class StagingDataWriter extends AbstractProtocolDataWriter {

private String category;

private Map<Batch, IStagedResource> stagedResources = new HashMap<Batch, IStagedResource>();
private Map<Batch, IStagedResource> stagedResources = new ConcurrentHashMap<Batch, IStagedResource>();

private long memoryThresholdInBytes;

Expand Down
Expand Up @@ -78,15 +78,19 @@ public StagedResource(File directory, String path, StagingManager stagingManager
this.path = path;
this.stagingManager = stagingManager;
lastUpdateTime = System.currentTimeMillis();

File doneFile = buildFile(State.DONE);

if (buildFile(State.DONE).exists()){
this.state = State.DONE;
} else {
this.state = State.CREATE;
}
this.file = buildFile(state);
if (file.exists()) {
if (doneFile.exists()) { // Only call exists once for done files. This can be expensive on some SAN type devices.
this.state = State.DONE;
this.file = doneFile;
lastUpdateTime = file.lastModified();
} else {
this.state = State.CREATE;
this.file = buildFile(state);
if (file.exists()) {
lastUpdateTime = file.lastModified();
}
}
}

Expand Down Expand Up @@ -177,6 +181,15 @@ protected void handleFailedRename(File oldFile, File newFile) {
String msg = null;
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S");

int tries = 5;

while (!newFile.exists() && tries-- > 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
}
}

if (newFile.exists()) {
if (isSameFile(oldFile, newFile)) {
msg = String.format("Had trouble renaming file. The destination file already exists, and is the same size - will proceed. " +
Expand Down Expand Up @@ -300,7 +313,7 @@ public OutputStream getOutputStream() {
try {
if (outputStream == null) {
if (file != null && file.exists()) {
log.warn("We had to delete {} because it already existed",
log.warn("getOutputStream had to delete {} because it already existed",
file.getAbsolutePath());
file.delete();
}
Expand Down Expand Up @@ -345,7 +358,8 @@ protected InputStream createInputStream() throws FileNotFoundException {
public BufferedWriter getWriter(long threshold) {
if (writer == null) {
if (file != null && file.exists()) {
log.warn("We had to delete {} because it already existed", file.getAbsolutePath());
log.warn("getWriter had to delete {} because it already existed.",
file.getAbsolutePath(), new RuntimeException("Stack Trace"));
file.delete();
} else if (this.memoryBuffer != null) {
log.warn("We had to delete the memory buffer for {} because it already existed", getPath());
Expand Down Expand Up @@ -397,8 +411,7 @@ public boolean delete() {
}

if (deleted) {
stagingManager.resourcePaths.remove(path);
stagingManager.inUse.remove(path);
stagingManager.removeResourcePath(path);
if (log.isDebugEnabled() && path.contains("outgoing")) {
log.debug("Deleted staging resource {}", path);
}
Expand Down
Expand Up @@ -73,10 +73,26 @@ public void setLockFailureMessage(String lockFailureMessage) {
}

public void releaseLock() {
if (lockFile.delete()) {
int retries = 5;

boolean ok = false;

do {
ok = lockFile.delete();
if (!ok) {
try {
Thread.sleep(1000);
} catch (Exception ex) {
// no action.
}
}
} while (!ok && retries-- > 0);

if (ok) {
log.debug("Lock {} released successfully.", lockFile);
} else {
log.warn("Failed to release lock {}", lockFile);
boolean exists = lockFile.exists();
log.warn("Failed to release lock {} exists={}", lockFile, exists);
}
}

Expand Down

0 comments on commit 37bc426

Please sign in to comment.