From 89ac93f5724644436572d35b856ca7ee709e5c06 Mon Sep 17 00:00:00 2001 From: mmichalek Date: Tue, 27 Nov 2018 12:53:42 -0500 Subject: [PATCH] 0003760: Symmetric startup is very slow when there are lots of staging files on a SAN --- .../io/stage/BatchStagingManager.java | 182 +++++++------- .../service/impl/DataExtractorService.java | 14 +- .../io/data/writer/StagingDataWriter.java | 3 +- .../symmetric/io/stage/StagedResource.java | 35 ++- .../symmetric/io/stage/StagingFileLock.java | 20 +- .../symmetric/io/stage/StagingManager.java | 227 ++++++++++-------- .../io/stage/StagingPurgeContext.java | 108 +++++++++ 7 files changed, 378 insertions(+), 211 deletions(-) create mode 100644 symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingPurgeContext.java diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java index 4d9b667614..7bd699d88f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java @@ -1,16 +1,15 @@ package org.jumpmind.symmetric.io.stage; -import static org.jumpmind.symmetric.common.Constants.STAGING_CATEGORY_INCOMING; -import static org.jumpmind.symmetric.common.Constants.STAGING_CATEGORY_OUTGOING; +import static org.jumpmind.symmetric.common.Constants.*; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; +import java.util.HashSet; import java.util.Map; import java.util.Set; import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.common.ParameterConstants; +import org.jumpmind.symmetric.io.stage.IStagedResource.State; import org.jumpmind.symmetric.model.BatchId; import org.jumpmind.symmetric.service.ClusterConstants; @@ -22,17 +21,6 @@ public BatchStagingManager(ISymmetricEngine engine, String directory) { super(directory,engine.getParameterService().is(ParameterConstants.CLUSTER_LOCKING_ENABLED)); this.engine = engine; } - - protected Map getBiggestBatchIds(List batches) { - Map biggest = new HashMap(); - for (BatchId batchId : batches) { - Long batchNumber = biggest.get(batchId.getNodeId()); - if (batchNumber == null || batchNumber < batchId.getBatchId()) { - biggest.put(batchId.getNodeId(), batchId.getBatchId()); - } - } - return biggest; - } @Override public long clean(long ttlInMs) { @@ -40,88 +28,102 @@ public long clean(long ttlInMs) { log.debug("Could not get a lock to run stage management"); return 0; } - - try { + try { boolean purgeBasedOnTTL = engine.getParameterService().is(ParameterConstants.STREAM_TO_FILE_PURGE_ON_TTL_ENABLED, false); - if (purgeBasedOnTTL) { - return super.clean(ttlInMs); - } else { - synchronized (StagingManager.class) { - return purgeStagingBasedOnDatabaseStatus(ttlInMs); - } - } - } finally { + boolean recordIncomingBatchesEnabled = engine.getIncomingBatchService().isRecordOkBatchesEnabled(); + long minTtlInMs = engine.getParameterService().getLong(ParameterConstants.STREAM_TO_FILE_MIN_TIME_TO_LIVE_MS,600000); + Set outgoingBatches = ttlInMs == 0 ? new HashSet() : new HashSet(engine.getOutgoingBatchService().getAllBatches()); + Set incomingBatches = ttlInMs == 0 ? new HashSet() : new HashSet(engine.getIncomingBatchService().getAllBatches()); + Map biggestIncomingByNode = getBiggestBatchIds(incomingBatches); + + StagingPurgeContext context = new StagingPurgeContext(); + context.putContextValue("purgeBasedOnTTL", purgeBasedOnTTL); + context.putContextValue("recordIncomingBatchesEnabled", recordIncomingBatchesEnabled); + context.putContextValue("minTtlInMs", minTtlInMs); + context.putContextValue("outgoingBatches", outgoingBatches); + context.putContextValue("incomingBatches", incomingBatches); + context.putContextValue("biggestIncomingByNode", biggestIncomingByNode); + + return super.clean(ttlInMs, context); + } finally { engine.getClusterService().unlock(ClusterConstants.STAGE_MANAGEMENT); } } - protected long purgeStagingBasedOnDatabaseStatus(long ttlInMs) { - boolean recordIncomingBatchesEnabled = engine.getIncomingBatchService().isRecordOkBatchesEnabled(); - long minTtlInMs = engine.getParameterService().getLong(ParameterConstants.STREAM_TO_FILE_MIN_TIME_TO_LIVE_MS,600000); - List outgoingBatches = ttlInMs == 0 ? new ArrayList() : engine.getOutgoingBatchService().getAllBatches(); - List incomingBatches = ttlInMs == 0 ? new ArrayList() : engine.getIncomingBatchService().getAllBatches(); - Map biggestIncomingByNode = getBiggestBatchIds(incomingBatches); - synchronized (StagingManager.class) { - log.trace("Purging staging area"); - Set keys = getResourceReferences(); - long purgedFileCount = 0; - long purgedFileSize = 0; - for (String key : keys) { - IStagedResource resource = find(key); - String[] path = key.split("/"); - /* - * resource could have deleted itself between the time the keys - * were cloned and now - */ - if (resource != null && !resource.isInUse()) { - boolean resourceIsOld = (System.currentTimeMillis() - resource.getLastUpdateTime()) > ttlInMs; - boolean resourceClearsMinTimeHurdle = (System.currentTimeMillis() - resource.getLastUpdateTime()) > minTtlInMs; - if (path[0].equals(STAGING_CATEGORY_OUTGOING)) { - try { - Long batchId = new Long(path[path.length - 1]); - if (!outgoingBatches.contains(batchId) || ttlInMs == 0) { - purgedFileCount++; - purgedFileSize+=resource.getSize(); - resource.delete(); - } - } catch (NumberFormatException e) { - if (resourceIsOld || ttlInMs == 0) { - purgedFileCount++; - purgedFileSize+=resource.getSize(); - resource.delete(); - } - } - } else if (path[0].equals(STAGING_CATEGORY_INCOMING)) { - try { - BatchId batchId = new BatchId(new Long(path[path.length - 1]), path[1]); - Long biggestBatchId = biggestIncomingByNode.get(batchId.getNodeId()); - if ((recordIncomingBatchesEnabled && !incomingBatches.contains(batchId) && - biggestBatchId != null && biggestBatchId > batchId.getBatchId() && - resourceClearsMinTimeHurdle) - || (!recordIncomingBatchesEnabled && resourceIsOld) || ttlInMs == 0) { - purgedFileCount++; - purgedFileSize+=resource.getSize(); - resource.delete(); - } - } catch (NumberFormatException e) { - if (resourceIsOld || ttlInMs == 0) { - purgedFileCount++; - purgedFileSize+=resource.getSize(); - resource.delete(); - } - } - } - } + @Override + protected boolean shouldCleanPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context) { + if (context.getBoolean("purgeBasedOnTTL")) { + return super.shouldCleanPath(resource, ttlInMs, context); + } + + if (resource.isInUse() || resource.getState() != State.DONE) { + return false; + } + + String[] path = resource.getPath().split("/"); + + boolean resourceIsOld = (System.currentTimeMillis() - resource.getLastUpdateTime()) > ttlInMs; + boolean resourceClearsMinTimeHurdle = (System.currentTimeMillis() - resource.getLastUpdateTime()) > context.getLong("minTtlInMs"); + + if (path[0].equals(STAGING_CATEGORY_OUTGOING)) { + return shouldCleanOutgoingPath(resource, ttlInMs, context, path, resourceIsOld); + } else if (path[0].equals(STAGING_CATEGORY_INCOMING)) { + return shouldCleanIncomingPath(resource, ttlInMs, context, path, resourceIsOld, resourceClearsMinTimeHurdle); + } else { + log.warn("Unrecognized path: " + resource.getPath()); + } + return false; + } + + protected boolean shouldCleanOutgoingPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context, String[] path, + boolean resourceIsOld) { + Set outgoingBatches = (Set) context.getContextValue("outgoingBatches"); + try { + Long batchId = new Long(path[path.length - 1]); + if (!outgoingBatches.contains(batchId) || ttlInMs == 0) { + return true; + } + } catch (NumberFormatException e) { + if (resourceIsOld || ttlInMs == 0) { + return true; + } + } + + return false; + } + + protected boolean shouldCleanIncomingPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context, String[] path, boolean resourceIsOld, + boolean resourceClearsMinTimeHurdle) { + Set incomingBatches = (Set) context.getContextValue("incomingBatches"); + Map biggestIncomingByNode = (Map) context.getContextValue("biggestIncomingByNode"); + boolean recordIncomingBatchesEnabled = context.getBoolean("recordIncomingBatchesEnabled"); + try { + BatchId batchId = new BatchId(new Long(path[path.length - 1]), path[1]); + Long biggestBatchId = biggestIncomingByNode.get(batchId.getNodeId()); + if ((recordIncomingBatchesEnabled && !incomingBatches.contains(batchId) && + biggestBatchId != null && biggestBatchId > batchId.getBatchId() && + resourceClearsMinTimeHurdle) + || (!recordIncomingBatchesEnabled && resourceIsOld) || ttlInMs == 0) { + return true; } - if (purgedFileCount > 0) { - if (purgedFileSize < 1000) { - log.info("Purged {} from stage, freeing {} bytes of space", purgedFileCount, (int) (purgedFileSize)); - } else { - log.info("Purged {} from stage, freeing {} kbytes of space", purgedFileCount, (int) (purgedFileSize / 1000)); - } + } catch (NumberFormatException e) { + if (resourceIsOld || ttlInMs == 0) { + return true; } - return purgedFileCount; } + + return false; } - + + protected Map getBiggestBatchIds(Set batches) { + Map biggest = new HashMap(); + for (BatchId batchId : batches) { + Long batchNumber = biggest.get(batchId.getNodeId()); + if (batchNumber == null || batchNumber < batchId.getBatchId()) { + biggest.put(batchId.getNodeId(), batchId.getBatchId()); + } + } + return biggest; + } + } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index b831493405..762c21872c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -43,6 +43,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -189,7 +190,7 @@ protected enum ExtractMode { FOR_SYM_CLIENT, FOR_PAYLOAD_CLIENT, EXTRACT_ONLY }; private IClusterService clusterService; - private Map locks = new HashMap(); + private Map locks = new ConcurrentHashMap(); private CustomizableThreadFactory threadPoolFactory; @@ -986,7 +987,7 @@ private BatchLock acquireLock(OutgoingBatch batch, boolean useStagingDataWriter) } catch (InterruptedException e) { throw new org.jumpmind.exception.InterruptedException(e); } - + log.debug("Acquired {}", lock); return lock; } @@ -1011,7 +1012,7 @@ protected StagingFileLock acquireStagingFileLock(OutgoingBatch batch) { log.warn("Lock {} in place for {} > about to BREAK the lock.", fileLock.getLockFile(), DurationFormatUtils.formatDurationWords(lockAge, true, true)); fileLock.breakLock(); } else { - if ((iterations % 10) == 0) { + if ((iterations % 10) == 0 && iterations > 0) { log.info("Lock {} in place for {}, waiting...", fileLock.getLockFile(), DurationFormatUtils.formatDurationWords(lockAge, true, true)); } else { log.debug("Lock {} in place for {}, waiting...", fileLock.getLockFile(), DurationFormatUtils.formatDurationWords(lockAge, true, true)); @@ -1041,9 +1042,11 @@ protected void releaseLock(BatchLock lock, OutgoingBatch batch, boolean useStagi locks.remove(lock.semaphoreKey); } lock.release(); + log.debug("Released memory {}", lock); } if (lock.fileLock != null) { lock.fileLock.releaseLock(); + log.debug("Released file {}", lock); } } } @@ -2383,6 +2386,11 @@ public void release() { private Semaphore inMemoryLock = new Semaphore(1); StagingFileLock fileLock; int referenceCount = 0; + + @Override + public String toString() { + return semaphoreKey + " " + super.toString(); + } } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/StagingDataWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/StagingDataWriter.java index 5e09e8dcd3..3620c87f65 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/StagingDataWriter.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/StagingDataWriter.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.jumpmind.exception.IoException; import org.jumpmind.symmetric.io.data.Batch; @@ -40,7 +41,7 @@ public class StagingDataWriter extends AbstractProtocolDataWriter { private String category; - private Map stagedResources = new HashMap(); + private Map stagedResources = new ConcurrentHashMap(); private long memoryThresholdInBytes; diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java index 8d568e5fc7..951a015d36 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java @@ -78,15 +78,19 @@ public StagedResource(File directory, String path, StagingManager stagingManager this.path = path; this.stagingManager = stagingManager; lastUpdateTime = System.currentTimeMillis(); + + File doneFile = buildFile(State.DONE); - if (buildFile(State.DONE).exists()){ - this.state = State.DONE; - } else { - this.state = State.CREATE; - } - this.file = buildFile(state); - if (file.exists()) { + if (doneFile.exists()) { // Only call exists once for done files. This can be expensive on some SAN type devices. + this.state = State.DONE; + this.file = doneFile; lastUpdateTime = file.lastModified(); + } else { + this.state = State.CREATE; + this.file = buildFile(state); + if (file.exists()) { + lastUpdateTime = file.lastModified(); + } } } @@ -177,6 +181,15 @@ protected void handleFailedRename(File oldFile, File newFile) { String msg = null; SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S"); + int tries = 5; + + while (!newFile.exists() && tries-- > 0) { + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + } + } + if (newFile.exists()) { if (isSameFile(oldFile, newFile)) { msg = String.format("Had trouble renaming file. The destination file already exists, and is the same size - will proceed. " + @@ -294,7 +307,7 @@ public OutputStream getOutputStream() { try { if (outputStream == null) { if (file != null && file.exists()) { - log.warn("We had to delete {} because it already existed", + log.warn("getOutputStream had to delete {} because it already existed", file.getAbsolutePath()); file.delete(); } @@ -330,7 +343,8 @@ public synchronized InputStream getInputStream() { public BufferedWriter getWriter(long threshold) { if (writer == null) { if (file != null && file.exists()) { - log.warn("We had to delete {} because it already existed", file.getAbsolutePath()); + log.warn("getWriter had to delete {} because it already existed.", + file.getAbsolutePath(), new RuntimeException("Stack Trace")); file.delete(); } else if (this.memoryBuffer != null) { log.warn("We had to delete the memory buffer for {} because it already existed", getPath()); @@ -379,8 +393,7 @@ public boolean delete() { } if (deleted) { - stagingManager.resourcePaths.remove(path); - stagingManager.inUse.remove(path); + stagingManager.removeResourcePath(path); if (log.isDebugEnabled() && path.contains("outgoing")) { log.debug("Deleted staging resource {}", path); } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingFileLock.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingFileLock.java index 716d942d8b..b691d7bba4 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingFileLock.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingFileLock.java @@ -73,10 +73,26 @@ public void setLockFailureMessage(String lockFailureMessage) { } public void releaseLock() { - if (lockFile.delete()) { + int retries = 5; + + boolean ok = false; + + do { + ok = lockFile.delete(); + if (!ok) { + try { + Thread.sleep(1000); + } catch (Exception ex) { + // no action. + } + } + } while (!ok && retries-- > 0); + + if (ok) { log.debug("Lock {} released successfully.", lockFile); } else { - log.warn("Failed to release lock {}", lockFile); + boolean exists = lockFile.exists(); + log.warn("Failed to release lock {} exists={}", lockFile, exists); } } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java index b4d1de7fa3..3dcbf5f2ce 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java @@ -1,5 +1,5 @@ /** - * Licensed to JumpMind Inc under one or more contributor + * Licensed to JumpMind Inc under one or more contributor * license agreements. See the NOTICE file distributed * with this work for additional information regarding * copyright ownership. JumpMind Inc licenses this file @@ -22,8 +22,10 @@ import java.io.File; import java.io.IOException; -import java.util.Collection; -import java.util.Collections; +import java.nio.file.DirectoryStream; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Map; import java.util.Set; import java.util.TreeSet; @@ -31,137 +33,137 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.time.DurationFormatUtils; import org.jumpmind.symmetric.io.stage.IStagedResource.State; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - - public class StagingManager implements IStagingManager { - + private static final String LOCK_EXTENSION = ".lock"; protected static final Logger log = LoggerFactory.getLogger(StagingManager.class); private File directory; - - protected Set resourcePaths; - - protected Map inUse; - + + private Map resourcePathsCache = new ConcurrentHashMap(); + protected Map inUse = new ConcurrentHashMap(); + boolean clusterEnabled; public StagingManager(String directory, boolean clusterEnabled) { log.info("The staging directory was initialized at the following location: " + directory); this.directory = new File(directory); this.directory.mkdirs(); - this.resourcePaths = Collections.synchronizedSet(new TreeSet()); - this.inUse = new ConcurrentHashMap(); this.clusterEnabled = clusterEnabled; - refreshResourceList(); } - + + @Override public Set getResourceReferences() { - synchronized (resourcePaths) { - return new TreeSet(resourcePaths); - } + return new TreeSet(resourcePathsCache.keySet()); } - private void refreshResourceList() { - Collection files = FileUtils.listFiles(this.directory, - new String[] { State.CREATE.getExtensionName(), State.DONE.getExtensionName() }, true); - for (File file : files) { - try { - String path = StagedResource.toPath(directory, file); - if (path != null && !resourcePaths.contains(path)) { - resourcePaths.add(path); - } - } catch (IllegalStateException ex) { - log.warn(ex.getMessage()); + @Override + public long clean(long ttlInMs) { + return clean(ttlInMs, null); + } + + public synchronized long clean(long ttlInMs, StagingPurgeContext context) { + try { + log.info("Cleaning staging..."); + if (context == null) { + context = new StagingPurgeContext(); } + long start = System.currentTimeMillis(); + context.setStartTime(start); + + resourcePathsCache.clear(); + clean(FileSystems.getDefault().getPath(this.directory.getAbsolutePath()), ttlInMs, context); + logCleaningProgress(context); + long end = System.currentTimeMillis(); + log.info("Finished cleaning staging in " + DurationFormatUtils.formatDurationWords(end-start, true, true) + "."); + return context.getPurgedFileSize() + context.getPurgedMemSize(); + } catch (Exception ex) { + throw new RuntimeException("Failure while cleaning staging.", ex); } } - /** - * Clean up resources that are older than the passed in parameter. - * - * @param ttlInMs - * If resources are older than this number of milliseconds they - * will be purged - */ - public long clean(long ttlInMs) { - synchronized (StagingManager.class) { - log.trace("Cleaning staging area"); - Set keys = getResourceReferences(); - long purgedFileCount = 0; - long purgedFileSize = 0; - long purgedMemCount = 0; - long purgedMemSize = 0; - for (String key : keys) { - IStagedResource resource = new StagedResource(directory, key, this); - /* resource could have deleted itself between the time the keys were cloned and now */ - if (resource != null) { - boolean resourceIsOld = (System.currentTimeMillis() - resource - .getLastUpdateTime()) > ttlInMs; - if (resource.getState() == State.DONE && resourceIsOld) { - if (!resource.isInUse()) { - boolean file = resource.isFileResource(); - long size = resource.getSize(); - if (resource.delete()) { - if (file) { - purgedFileCount++; - purgedFileSize += size; - } else { - purgedMemCount++; - purgedMemSize += size; - } - resourcePaths.remove(key); + protected void logCleaningProgress(StagingPurgeContext context) { + if (context.getPurgedFileCount() > 0) { + log.info("Purged {} staging files, freed {} of disk space.", + context.getPurgedFileCount(), FileUtils.byteCountToDisplaySize(context.getPurgedFileSize())); + } + if (context.getPurgedMemCount() > 0) { + log.info("Purged {} staging memory buffers, freed {}.", + context.getPurgedMemCount(), FileUtils.byteCountToDisplaySize(context.getPurgedMemSize())); + } + } + + protected void clean(Path path, long ttlInMs, StagingPurgeContext context) throws IOException { + DirectoryStream stream = Files.newDirectoryStream(path, STAGING_FILE_FILTER); + + if (context.shouldLogStatus()) { + logCleaningProgress(context); + context.setLastLogTime(System.currentTimeMillis()); + } + + for (Path entry : stream) { + if (Files.isDirectory(entry)) { + clean(entry, ttlInMs, context); + } else { + try { + String stagingPath = StagedResource.toPath(directory, + new File((entry.getParent().toString() + "/" + entry.getFileName().toString()))); + IStagedResource resource = new StagedResource(directory, stagingPath, this); + if (stagingPath != null) { + if (shouldCleanPath(resource, ttlInMs, context)) { + if (resource.getFile() != null) { + context.incrementPurgedFileCount(); + context.addPurgedFileBytes(resource.getSize()); } else { - log.warn("Failed to delete the '{}' staging resource", - resource.getPath()); + context.incrementPurgedMemoryCount(); + context.addPurgedMemoryBytes(resource.getSize()); } + + cleanPath(resource, ttlInMs, context); // this comes after stat collection because + // once the file is gone we loose visibility to size } else { - log.info( - "The '{}' staging resource qualified for being cleaned, but was in use. It will not be cleaned right now", - resource.getPath()); + resourcePathsCache.put(stagingPath,stagingPath); } } - } - } - if (purgedFileCount > 0) { - if (purgedFileSize < 1000) { - log.debug("Purged {} staged files, freeing {} bytes of disk space", - purgedFileCount, (int) (purgedFileSize)); - } else { - log.debug("Purged {} staged files, freeing {} kbytes of disk space", - purgedFileCount, (int) (purgedFileSize / 1000)); - } + } catch (IllegalStateException ex) { + log.warn("Failure during refreshResourceList ", ex); + } } - if (purgedMemCount > 0) { - if (purgedMemSize < 1000) { - log.debug("Purged {} staged memory buffers, freeing {} bytes of memory", - purgedMemCount, (int) (purgedMemSize)); - } else { - log.debug("Purged {} staged memory buffers, freeing {} kbytes of memory", - purgedMemCount, (int) (purgedMemSize / 1000)); - } - } - return purgedFileCount + purgedMemCount; } - } + + stream.close(); + } + protected boolean shouldCleanPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context) { + boolean resourceIsOld = (System.currentTimeMillis() - resource.getLastUpdateTime()) > ttlInMs; + return (resourceIsOld && resource.getState() == State.DONE && !resource.isInUse()); + } + + protected boolean cleanPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context) { + boolean success = resource.delete(); + if (!success) { + log.warn("Failed to delete the '{}' staging resource", resource.getPath()); + } + return success; + } + /** * Create a handle that can be written to */ public IStagedResource create(Object... path) { String filePath = buildFilePath(path); - IStagedResource resource = new StagedResource(directory, filePath, - this); + IStagedResource resource = new StagedResource(directory, filePath, this); if (resource.exists()) { resource.delete(); } this.inUse.put(filePath, resource); - this.resourcePaths.add(filePath); + this.resourcePathsCache.put(filePath, filePath); return resource; } @@ -179,16 +181,16 @@ protected String buildFilePath(Object... path) { } return buffer.toString(); } - + public IStagedResource find(String path) { IStagedResource resource = inUse.get(path); if (resource == null) { - boolean foundResourcePath = resourcePaths.contains(path); + boolean foundResourcePath = resourcePathsCache.containsKey(path); if (!foundResourcePath && clusterEnabled) { synchronized (this) { StagedResource staged = new StagedResource(directory, path, this); if (staged.exists() && staged.getState() == State.DONE) { - resourcePaths.add(path); + resourcePathsCache.put(path, path); resource = staged; foundResourcePath = true; } @@ -203,21 +205,26 @@ public IStagedResource find(String path) { public IStagedResource find(Object... path) { return find(buildFilePath(path)); } - + + public void removeResourcePath(String path) { + resourcePathsCache.remove(path); + inUse.remove(path); + } + @Override public StagingFileLock acquireFileLock(String serverInfo, Object... path) { String lockFilePath = String.format("%s/%s%s", directory, buildFilePath(path), LOCK_EXTENSION); log.debug("About to acquire lock at {}", lockFilePath); - + StagingFileLock stagingFileLock = new StagingFileLock(); - + File lockFile = new File(lockFilePath); File containingDirectory = lockFile.getParentFile(); - + if (containingDirectory != null) { containingDirectory.mkdirs(); } - + boolean acquired = false; try { acquired = lockFile.createNewFile(); @@ -225,12 +232,12 @@ public StagingFileLock acquireFileLock(String serverInfo, Object... path) { FileUtils.write(lockFile, serverInfo); } } catch (IOException ex) { // Hitting this when file already exists. - log.warn("Failed to create lock file (" + lockFilePath + ")", ex); + log.debug("Failed to create lock file (" + lockFilePath + ")", ex); } - + stagingFileLock.setAcquired(acquired); stagingFileLock.setLockFile(lockFile); - + if (!acquired) { if (lockFile.exists()) { try { @@ -247,7 +254,19 @@ public StagingFileLock acquireFileLock(String serverInfo, Object... path) { } } - + return stagingFileLock; } + + protected static final DirectoryStream.Filter STAGING_FILE_FILTER = new DirectoryStream.Filter() { + @Override + public boolean accept(Path entry) { + boolean accept = Files.isDirectory(entry) || + entry.getFileName().toString().endsWith(".create") + || entry.getFileName().toString().endsWith(".done"); + return accept; + } + }; + + } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingPurgeContext.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingPurgeContext.java new file mode 100644 index 0000000000..3a6adcf7ff --- /dev/null +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingPurgeContext.java @@ -0,0 +1,108 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.io.stage; + +import java.util.HashMap; +import java.util.Map; + +public class StagingPurgeContext { + + private Map context = new HashMap(); + + private long startTime; + private long lastLogTime; + private long purgedFileCount = 0; + private long purgedFileSize = 0; + private long purgedMemCount = 0; + private long purgedMemSize = 0; + + public void incrementPurgedFileCount() { + purgedFileCount++; + } + public void addPurgedFileBytes(long bytes) { + this.purgedFileSize += bytes; + } + public void incrementPurgedMemoryCount() { + purgedMemCount++; + } + public void addPurgedMemoryBytes(long bytes) { + this.purgedMemCount += bytes; + } + public long getPurgedFileCount() { + return purgedFileCount; + } + public void setPurgedFileCount(long purgedFileCount) { + this.purgedFileCount = purgedFileCount; + } + public long getPurgedFileSize() { + return purgedFileSize; + } + public void setPurgedFileSize(long purgedFileSize) { + this.purgedFileSize = purgedFileSize; + } + public long getPurgedMemCount() { + return purgedMemCount; + } + public void setPurgedMemCount(long purgedMemCount) { + this.purgedMemCount = purgedMemCount; + } + public long getPurgedMemSize() { + return purgedMemSize; + } + public void setPurgedMemSize(long purgedMemSize) { + this.purgedMemSize = purgedMemSize; + } + + public Object getContextValue(String key) { + return context.get(key); + } + public Object putContextValue(String key, Object value) { + return context.put(key, value); + } + public boolean getBoolean(String key) { + return (Boolean) getContextValue(key); + } + public long getLong(String key) { + return (Long) getContextValue(key); + } + public long getStartTime() { + return startTime; + } + public void setStartTime(long startTime) { + this.startTime = startTime; + } + public long getLastLogTime() { + return lastLogTime; + } + public void setLastLogTime(long lastLogTime) { + this.lastLogTime = lastLogTime; + } + + public boolean shouldLogStatus() { + long now = System.currentTimeMillis(); + if (now - startTime > 60000 + && now - lastLogTime > 60000) { + return true; + } else { + return false; + } + } +}