From 37bc4262e45cd1f9c39c066d31d110119cd746c3 Mon Sep 17 00:00:00 2001 From: mmichalek Date: Wed, 28 Nov 2018 10:57:40 -0500 Subject: [PATCH] 0003807: Symmetric startup is very slow when there are lots of staging files on a SAN (3.9) --- .../io/stage/BatchStagingManager.java | 158 ++++++------ .../service/impl/DataExtractorService.java | 4 +- .../io/data/writer/StagingDataWriter.java | 3 +- .../symmetric/io/stage/StagedResource.java | 35 ++- .../symmetric/io/stage/StagingFileLock.java | 20 +- .../symmetric/io/stage/StagingManager.java | 234 ++++++++++-------- .../io/stage/StagingPurgeContext.java | 108 ++++++++ 7 files changed, 363 insertions(+), 199 deletions(-) create mode 100644 symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingPurgeContext.java diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java index bb43f95d64..6e4929453e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java @@ -1,16 +1,19 @@ package org.jumpmind.symmetric.io.stage; import static org.jumpmind.symmetric.common.Constants.STAGING_CATEGORY_INCOMING; + import static org.jumpmind.symmetric.common.Constants.STAGING_CATEGORY_OUTGOING; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.common.ParameterConstants; +import org.jumpmind.symmetric.io.stage.IStagedResource.State; import org.jumpmind.symmetric.model.BatchId; import org.jumpmind.symmetric.service.ClusterConstants; @@ -23,7 +26,7 @@ public BatchStagingManager(ISymmetricEngine engine, String directory) { this.engine = engine; } - protected Map getBiggestBatchIds(List batches) { + protected Map getBiggestBatchIds(Set batches) { Map biggest = new HashMap(); for (BatchId batchId : batches) { Long batchNumber = biggest.get(batchId.getNodeId()); @@ -32,7 +35,7 @@ protected Map getBiggestBatchIds(List batches) { } } return biggest; - } + } @Override public long clean(long ttlInMs) { @@ -50,17 +53,24 @@ public long clean(long ttlInMs) { } catch (Exception e) { // during setup or un-install, it's possible sym_lock table isn't available yet } - - try { + try { boolean purgeBasedOnTTL = engine.getParameterService().is(ParameterConstants.STREAM_TO_FILE_PURGE_ON_TTL_ENABLED, false); - if (purgeBasedOnTTL) { - return super.clean(ttlInMs); - } else { - synchronized (StagingManager.class) { - return purgeStagingBasedOnDatabaseStatus(ttlInMs); - } - } - } finally { + boolean recordIncomingBatchesEnabled = engine.getIncomingBatchService().isRecordOkBatchesEnabled(); + long minTtlInMs = engine.getParameterService().getLong(ParameterConstants.STREAM_TO_FILE_MIN_TIME_TO_LIVE_MS,600000); + Set outgoingBatches = ttlInMs == 0 ? new HashSet() : new HashSet(engine.getOutgoingBatchService().getAllBatches()); + Set incomingBatches = ttlInMs == 0 ? new HashSet() : new HashSet(engine.getIncomingBatchService().getAllBatches()); + Map biggestIncomingByNode = getBiggestBatchIds(incomingBatches); + + StagingPurgeContext context = new StagingPurgeContext(); + context.putContextValue("purgeBasedOnTTL", purgeBasedOnTTL); + context.putContextValue("recordIncomingBatchesEnabled", recordIncomingBatchesEnabled); + context.putContextValue("minTtlInMs", minTtlInMs); + context.putContextValue("outgoingBatches", outgoingBatches); + context.putContextValue("incomingBatches", incomingBatches); + context.putContextValue("biggestIncomingByNode", biggestIncomingByNode); + + return super.clean(ttlInMs, context); + } finally { if (isLockAcquired) { try { engine.getClusterService().unlock(ClusterConstants.STAGE_MANAGEMENT); @@ -70,73 +80,69 @@ public long clean(long ttlInMs) { } } - protected long purgeStagingBasedOnDatabaseStatus(long ttlInMs) { - boolean recordIncomingBatchesEnabled = engine.getIncomingBatchService().isRecordOkBatchesEnabled(); - long minTtlInMs = engine.getParameterService().getLong(ParameterConstants.STREAM_TO_FILE_MIN_TIME_TO_LIVE_MS,600000); - List outgoingBatches = ttlInMs == 0 ? new ArrayList() : engine.getOutgoingBatchService().getAllBatches(); - List incomingBatches = ttlInMs == 0 ? new ArrayList() : engine.getIncomingBatchService().getAllBatches(); - Map biggestIncomingByNode = getBiggestBatchIds(incomingBatches); - synchronized (StagingManager.class) { - log.trace("Purging staging area"); - Set keys = getResourceReferences(); - long purgedFileCount = 0; - long purgedFileSize = 0; - for (String key : keys) { - IStagedResource resource = find(key); - String[] path = key.split("/"); - /* - * resource could have deleted itself between the time the keys - * were cloned and now - */ - if (resource != null && !resource.isInUse()) { - boolean resourceIsOld = (System.currentTimeMillis() - resource.getLastUpdateTime()) > ttlInMs; - boolean resourceClearsMinTimeHurdle = (System.currentTimeMillis() - resource.getLastUpdateTime()) > minTtlInMs; - if (path[0].equals(STAGING_CATEGORY_OUTGOING)) { - try { - Long batchId = new Long(path[path.length - 1]); - if (!outgoingBatches.contains(batchId) || ttlInMs == 0) { - purgedFileCount++; - purgedFileSize+=resource.getSize(); - resource.delete(); - } - } catch (NumberFormatException e) { - if (resourceIsOld || ttlInMs == 0) { - purgedFileCount++; - purgedFileSize+=resource.getSize(); - resource.delete(); - } - } - } else if (path[0].equals(STAGING_CATEGORY_INCOMING)) { - try { - BatchId batchId = new BatchId(new Long(path[path.length - 1]), path[1]); - Long biggestBatchId = biggestIncomingByNode.get(batchId.getNodeId()); - if ((recordIncomingBatchesEnabled && !incomingBatches.contains(batchId) && - biggestBatchId != null && biggestBatchId > batchId.getBatchId() && - resourceClearsMinTimeHurdle) - || (!recordIncomingBatchesEnabled && resourceIsOld) || ttlInMs == 0) { - purgedFileCount++; - purgedFileSize+=resource.getSize(); - resource.delete(); - } - } catch (NumberFormatException e) { - if (resourceIsOld || ttlInMs == 0) { - purgedFileCount++; - purgedFileSize+=resource.getSize(); - resource.delete(); - } - } - } - } + @Override + protected boolean shouldCleanPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context) { + if (context.getBoolean("purgeBasedOnTTL")) { + return super.shouldCleanPath(resource, ttlInMs, context); + } + + if (resource.isInUse() || resource.getState() != State.DONE) { + return false; + } + + String[] path = resource.getPath().split("/"); + + boolean resourceIsOld = (System.currentTimeMillis() - resource.getLastUpdateTime()) > ttlInMs; + boolean resourceClearsMinTimeHurdle = (System.currentTimeMillis() - resource.getLastUpdateTime()) > context.getLong("minTtlInMs"); + + if (path[0].equals(STAGING_CATEGORY_OUTGOING)) { + return shouldCleanOutgoingPath(resource, ttlInMs, context, path, resourceIsOld); + } else if (path[0].equals(STAGING_CATEGORY_INCOMING)) { + return shouldCleanIncomingPath(resource, ttlInMs, context, path, resourceIsOld, resourceClearsMinTimeHurdle); + } else { + log.warn("Unrecognized path: " + resource.getPath()); + } + return false; + } + + protected boolean shouldCleanOutgoingPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context, String[] path, + boolean resourceIsOld) { + Set outgoingBatches = (Set) context.getContextValue("outgoingBatches"); + try { + Long batchId = new Long(path[path.length - 1]); + if (!outgoingBatches.contains(batchId) || ttlInMs == 0) { + return true; } - if (purgedFileCount > 0) { - if (purgedFileSize < 1000) { - log.info("Purged {} from stage, freeing {} bytes of space", purgedFileCount, (int) (purgedFileSize)); - } else { - log.info("Purged {} from stage, freeing {} kbytes of space", purgedFileCount, (int) (purgedFileSize / 1000)); - } + } catch (NumberFormatException e) { + if (resourceIsOld || ttlInMs == 0) { + return true; + } + } + + return false; + } + + protected boolean shouldCleanIncomingPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context, String[] path, boolean resourceIsOld, + boolean resourceClearsMinTimeHurdle) { + Set incomingBatches = (Set) context.getContextValue("incomingBatches"); + Map biggestIncomingByNode = (Map) context.getContextValue("biggestIncomingByNode"); + boolean recordIncomingBatchesEnabled = context.getBoolean("recordIncomingBatchesEnabled"); + try { + BatchId batchId = new BatchId(new Long(path[path.length - 1]), path[1]); + Long biggestBatchId = biggestIncomingByNode.get(batchId.getNodeId()); + if ((recordIncomingBatchesEnabled && !incomingBatches.contains(batchId) && + biggestBatchId != null && biggestBatchId > batchId.getBatchId() && + resourceClearsMinTimeHurdle) + || (!recordIncomingBatchesEnabled && resourceIsOld) || ttlInMs == 0) { + return true; + } + } catch (NumberFormatException e) { + if (resourceIsOld || ttlInMs == 0) { + return true; } - return purgedFileCount; } + + return false; } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index bfdd6620d9..1056cde480 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -45,6 +45,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -205,7 +206,7 @@ protected enum ExtractMode { FOR_SYM_CLIENT, FOR_PAYLOAD_CLIENT, EXTRACT_ONLY }; private IExtensionService extensionService; - private Map locks = new HashMap(); + private Map locks = new ConcurrentHashMap(); private CustomizableThreadFactory threadPoolFactory; @@ -1134,6 +1135,7 @@ private BatchLock acquireLock(OutgoingBatch batch, boolean useStagingDataWriter) throw new org.jumpmind.exception.InterruptedException(e); } + log.debug("Acquired {}", lock); return lock; } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/StagingDataWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/StagingDataWriter.java index 5e09e8dcd3..3620c87f65 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/StagingDataWriter.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/StagingDataWriter.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.jumpmind.exception.IoException; import org.jumpmind.symmetric.io.data.Batch; @@ -40,7 +41,7 @@ public class StagingDataWriter extends AbstractProtocolDataWriter { private String category; - private Map stagedResources = new HashMap(); + private Map stagedResources = new ConcurrentHashMap(); private long memoryThresholdInBytes; diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java index 722e971a2a..877d247a39 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java @@ -78,15 +78,19 @@ public StagedResource(File directory, String path, StagingManager stagingManager this.path = path; this.stagingManager = stagingManager; lastUpdateTime = System.currentTimeMillis(); + + File doneFile = buildFile(State.DONE); - if (buildFile(State.DONE).exists()){ - this.state = State.DONE; - } else { - this.state = State.CREATE; - } - this.file = buildFile(state); - if (file.exists()) { + if (doneFile.exists()) { // Only call exists once for done files. This can be expensive on some SAN type devices. + this.state = State.DONE; + this.file = doneFile; lastUpdateTime = file.lastModified(); + } else { + this.state = State.CREATE; + this.file = buildFile(state); + if (file.exists()) { + lastUpdateTime = file.lastModified(); + } } } @@ -177,6 +181,15 @@ protected void handleFailedRename(File oldFile, File newFile) { String msg = null; SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S"); + int tries = 5; + + while (!newFile.exists() && tries-- > 0) { + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + } + } + if (newFile.exists()) { if (isSameFile(oldFile, newFile)) { msg = String.format("Had trouble renaming file. The destination file already exists, and is the same size - will proceed. " + @@ -300,7 +313,7 @@ public OutputStream getOutputStream() { try { if (outputStream == null) { if (file != null && file.exists()) { - log.warn("We had to delete {} because it already existed", + log.warn("getOutputStream had to delete {} because it already existed", file.getAbsolutePath()); file.delete(); } @@ -345,7 +358,8 @@ protected InputStream createInputStream() throws FileNotFoundException { public BufferedWriter getWriter(long threshold) { if (writer == null) { if (file != null && file.exists()) { - log.warn("We had to delete {} because it already existed", file.getAbsolutePath()); + log.warn("getWriter had to delete {} because it already existed.", + file.getAbsolutePath(), new RuntimeException("Stack Trace")); file.delete(); } else if (this.memoryBuffer != null) { log.warn("We had to delete the memory buffer for {} because it already existed", getPath()); @@ -397,8 +411,7 @@ public boolean delete() { } if (deleted) { - stagingManager.resourcePaths.remove(path); - stagingManager.inUse.remove(path); + stagingManager.removeResourcePath(path); if (log.isDebugEnabled() && path.contains("outgoing")) { log.debug("Deleted staging resource {}", path); } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingFileLock.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingFileLock.java index 716d942d8b..b691d7bba4 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingFileLock.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingFileLock.java @@ -73,10 +73,26 @@ public void setLockFailureMessage(String lockFailureMessage) { } public void releaseLock() { - if (lockFile.delete()) { + int retries = 5; + + boolean ok = false; + + do { + ok = lockFile.delete(); + if (!ok) { + try { + Thread.sleep(1000); + } catch (Exception ex) { + // no action. + } + } + } while (!ok && retries-- > 0); + + if (ok) { log.debug("Lock {} released successfully.", lockFile); } else { - log.warn("Failed to release lock {}", lockFile); + boolean exists = lockFile.exists(); + log.warn("Failed to release lock {} exists={}", lockFile, exists); } } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java index aa0d4d32dc..3dcbf5f2ce 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java @@ -1,5 +1,5 @@ /** - * Licensed to JumpMind Inc under one or more contributor + * Licensed to JumpMind Inc under one or more contributor * license agreements. See the NOTICE file distributed * with this work for additional information regarding * copyright ownership. JumpMind Inc licenses this file @@ -22,8 +22,10 @@ import java.io.File; import java.io.IOException; -import java.util.Collection; -import java.util.Collections; +import java.nio.file.DirectoryStream; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Map; import java.util.Set; import java.util.TreeSet; @@ -31,140 +33,139 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.time.DurationFormatUtils; import org.jumpmind.symmetric.io.stage.IStagedResource.State; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StagingManager implements IStagingManager { - + private static final String LOCK_EXTENSION = ".lock"; protected static final Logger log = LoggerFactory.getLogger(StagingManager.class); - protected File directory; - - protected Set resourcePaths; - - protected Map inUse; - + private File directory; + + private Map resourcePathsCache = new ConcurrentHashMap(); + protected Map inUse = new ConcurrentHashMap(); + boolean clusterEnabled; public StagingManager(String directory, boolean clusterEnabled) { log.info("The staging directory was initialized at the following location: " + directory); this.directory = new File(directory); this.directory.mkdirs(); - this.resourcePaths = Collections.synchronizedSet(new TreeSet()); - this.inUse = new ConcurrentHashMap(); this.clusterEnabled = clusterEnabled; - refreshResourceList(); } + @Override public Set getResourceReferences() { - synchronized (resourcePaths) { - return new TreeSet(resourcePaths); - } + return new TreeSet(resourcePathsCache.keySet()); } - private void refreshResourceList() { - Collection files = FileUtils.listFiles(this.directory, - new String[] { State.CREATE.getExtensionName(), State.DONE.getExtensionName() }, true); - for (File file : files) { - try { - String path = StagedResource.toPath(directory, file); - if (path != null && !resourcePaths.contains(path)) { - resourcePaths.add(path); - } - } catch (IllegalStateException ex) { - log.warn(ex.getMessage()); + @Override + public long clean(long ttlInMs) { + return clean(ttlInMs, null); + } + + public synchronized long clean(long ttlInMs, StagingPurgeContext context) { + try { + log.info("Cleaning staging..."); + if (context == null) { + context = new StagingPurgeContext(); } + long start = System.currentTimeMillis(); + context.setStartTime(start); + + resourcePathsCache.clear(); + clean(FileSystems.getDefault().getPath(this.directory.getAbsolutePath()), ttlInMs, context); + logCleaningProgress(context); + long end = System.currentTimeMillis(); + log.info("Finished cleaning staging in " + DurationFormatUtils.formatDurationWords(end-start, true, true) + "."); + return context.getPurgedFileSize() + context.getPurgedMemSize(); + } catch (Exception ex) { + throw new RuntimeException("Failure while cleaning staging.", ex); } } - /** - * Clean up resources that are older than the passed in parameter. - * - * @param ttlInMs - * If resources are older than this number of milliseconds they - * will be purged - */ - public long clean(long ttlInMs) { - synchronized (StagingManager.class) { - log.trace("Cleaning staging area"); - Set keys = getResourceReferences(); - long purgedFileCount = 0; - long purgedFileSize = 0; - long purgedMemCount = 0; - long purgedMemSize = 0; - for (String key : keys) { - IStagedResource resource = new StagedResource(directory, key, this); - /* resource could have deleted itself between the time the keys were cloned and now */ - if (resource != null) { - boolean resourceIsOld = (System.currentTimeMillis() - resource - .getLastUpdateTime()) > ttlInMs; - if (resource.getState() == State.DONE && resourceIsOld) { - if (!resource.isInUse()) { - boolean file = resource.isFileResource(); - long size = resource.getSize(); - if (resource.delete()) { - if (file) { - purgedFileCount++; - purgedFileSize += size; - } else { - purgedMemCount++; - purgedMemSize += size; - } - resourcePaths.remove(key); + protected void logCleaningProgress(StagingPurgeContext context) { + if (context.getPurgedFileCount() > 0) { + log.info("Purged {} staging files, freed {} of disk space.", + context.getPurgedFileCount(), FileUtils.byteCountToDisplaySize(context.getPurgedFileSize())); + } + if (context.getPurgedMemCount() > 0) { + log.info("Purged {} staging memory buffers, freed {}.", + context.getPurgedMemCount(), FileUtils.byteCountToDisplaySize(context.getPurgedMemSize())); + } + } + + protected void clean(Path path, long ttlInMs, StagingPurgeContext context) throws IOException { + DirectoryStream stream = Files.newDirectoryStream(path, STAGING_FILE_FILTER); + + if (context.shouldLogStatus()) { + logCleaningProgress(context); + context.setLastLogTime(System.currentTimeMillis()); + } + + for (Path entry : stream) { + if (Files.isDirectory(entry)) { + clean(entry, ttlInMs, context); + } else { + try { + String stagingPath = StagedResource.toPath(directory, + new File((entry.getParent().toString() + "/" + entry.getFileName().toString()))); + IStagedResource resource = new StagedResource(directory, stagingPath, this); + if (stagingPath != null) { + if (shouldCleanPath(resource, ttlInMs, context)) { + if (resource.getFile() != null) { + context.incrementPurgedFileCount(); + context.addPurgedFileBytes(resource.getSize()); } else { - log.warn("Failed to delete the '{}' staging resource", - resource.getPath()); + context.incrementPurgedMemoryCount(); + context.addPurgedMemoryBytes(resource.getSize()); } + + cleanPath(resource, ttlInMs, context); // this comes after stat collection because + // once the file is gone we loose visibility to size } else { - log.info( - "The '{}' staging resource qualified for being cleaned, but was in use. It will not be cleaned right now", - resource.getPath()); + resourcePathsCache.put(stagingPath,stagingPath); } } - } - } - if (purgedFileCount > 0) { - if (purgedFileSize < 1000) { - log.debug("Purged {} staged files, freeing {} bytes of disk space", - purgedFileCount, (int) (purgedFileSize)); - } else { - log.debug("Purged {} staged files, freeing {} kbytes of disk space", - purgedFileCount, (int) (purgedFileSize / 1000)); - } + } catch (IllegalStateException ex) { + log.warn("Failure during refreshResourceList ", ex); + } } - if (purgedMemCount > 0) { - if (purgedMemSize < 1000) { - log.debug("Purged {} staged memory buffers, freeing {} bytes of memory", - purgedMemCount, (int) (purgedMemSize)); - } else { - log.debug("Purged {} staged memory buffers, freeing {} kbytes of memory", - purgedMemCount, (int) (purgedMemSize / 1000)); - } - } - return purgedFileCount + purgedMemCount; } - } + + stream.close(); + } + protected boolean shouldCleanPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context) { + boolean resourceIsOld = (System.currentTimeMillis() - resource.getLastUpdateTime()) > ttlInMs; + return (resourceIsOld && resource.getState() == State.DONE && !resource.isInUse()); + } + + protected boolean cleanPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context) { + boolean success = resource.delete(); + if (!success) { + log.warn("Failed to delete the '{}' staging resource", resource.getPath()); + } + return success; + } + /** * Create a handle that can be written to */ public IStagedResource create(Object... path) { String filePath = buildFilePath(path); - IStagedResource resource = createStagedResource(filePath); + IStagedResource resource = new StagedResource(directory, filePath, this); if (resource.exists()) { resource.delete(); } this.inUse.put(filePath, resource); - this.resourcePaths.add(filePath); + this.resourcePathsCache.put(filePath, filePath); return resource; } - - protected IStagedResource createStagedResource(String filePath) { - return new StagedResource(directory, filePath, this); - } protected String buildFilePath(Object... path) { StringBuilder buffer = new StringBuilder(); @@ -180,22 +181,22 @@ protected String buildFilePath(Object... path) { } return buffer.toString(); } - + public IStagedResource find(String path) { IStagedResource resource = inUse.get(path); if (resource == null) { - boolean foundResourcePath = resourcePaths.contains(path); + boolean foundResourcePath = resourcePathsCache.containsKey(path); if (!foundResourcePath && clusterEnabled) { synchronized (this) { - IStagedResource staged = createStagedResource(path); + StagedResource staged = new StagedResource(directory, path, this); if (staged.exists() && staged.getState() == State.DONE) { - resourcePaths.add(path); + resourcePathsCache.put(path, path); resource = staged; foundResourcePath = true; } } } else if (foundResourcePath) { - resource = createStagedResource(path); + resource = new StagedResource(directory, path, this); } } return resource; @@ -204,21 +205,26 @@ public IStagedResource find(String path) { public IStagedResource find(Object... path) { return find(buildFilePath(path)); } - + + public void removeResourcePath(String path) { + resourcePathsCache.remove(path); + inUse.remove(path); + } + @Override public StagingFileLock acquireFileLock(String serverInfo, Object... path) { String lockFilePath = String.format("%s/%s%s", directory, buildFilePath(path), LOCK_EXTENSION); log.debug("About to acquire lock at {}", lockFilePath); - + StagingFileLock stagingFileLock = new StagingFileLock(); - + File lockFile = new File(lockFilePath); File containingDirectory = lockFile.getParentFile(); - + if (containingDirectory != null) { containingDirectory.mkdirs(); - } - + } + boolean acquired = false; try { acquired = lockFile.createNewFile(); @@ -226,12 +232,12 @@ public StagingFileLock acquireFileLock(String serverInfo, Object... path) { FileUtils.write(lockFile, serverInfo); } } catch (IOException ex) { // Hitting this when file already exists. - log.warn("Failed to create lock file (" + lockFilePath + ")", ex); + log.debug("Failed to create lock file (" + lockFilePath + ")", ex); } - + stagingFileLock.setAcquired(acquired); stagingFileLock.setLockFile(lockFile); - + if (!acquired) { if (lockFile.exists()) { try { @@ -248,7 +254,19 @@ public StagingFileLock acquireFileLock(String serverInfo, Object... path) { } } - + return stagingFileLock; } + + protected static final DirectoryStream.Filter STAGING_FILE_FILTER = new DirectoryStream.Filter() { + @Override + public boolean accept(Path entry) { + boolean accept = Files.isDirectory(entry) || + entry.getFileName().toString().endsWith(".create") + || entry.getFileName().toString().endsWith(".done"); + return accept; + } + }; + + } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingPurgeContext.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingPurgeContext.java new file mode 100644 index 0000000000..3a6adcf7ff --- /dev/null +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingPurgeContext.java @@ -0,0 +1,108 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.io.stage; + +import java.util.HashMap; +import java.util.Map; + +public class StagingPurgeContext { + + private Map context = new HashMap(); + + private long startTime; + private long lastLogTime; + private long purgedFileCount = 0; + private long purgedFileSize = 0; + private long purgedMemCount = 0; + private long purgedMemSize = 0; + + public void incrementPurgedFileCount() { + purgedFileCount++; + } + public void addPurgedFileBytes(long bytes) { + this.purgedFileSize += bytes; + } + public void incrementPurgedMemoryCount() { + purgedMemCount++; + } + public void addPurgedMemoryBytes(long bytes) { + this.purgedMemCount += bytes; + } + public long getPurgedFileCount() { + return purgedFileCount; + } + public void setPurgedFileCount(long purgedFileCount) { + this.purgedFileCount = purgedFileCount; + } + public long getPurgedFileSize() { + return purgedFileSize; + } + public void setPurgedFileSize(long purgedFileSize) { + this.purgedFileSize = purgedFileSize; + } + public long getPurgedMemCount() { + return purgedMemCount; + } + public void setPurgedMemCount(long purgedMemCount) { + this.purgedMemCount = purgedMemCount; + } + public long getPurgedMemSize() { + return purgedMemSize; + } + public void setPurgedMemSize(long purgedMemSize) { + this.purgedMemSize = purgedMemSize; + } + + public Object getContextValue(String key) { + return context.get(key); + } + public Object putContextValue(String key, Object value) { + return context.put(key, value); + } + public boolean getBoolean(String key) { + return (Boolean) getContextValue(key); + } + public long getLong(String key) { + return (Long) getContextValue(key); + } + public long getStartTime() { + return startTime; + } + public void setStartTime(long startTime) { + this.startTime = startTime; + } + public long getLastLogTime() { + return lastLogTime; + } + public void setLastLogTime(long lastLogTime) { + this.lastLogTime = lastLogTime; + } + + public boolean shouldLogStatus() { + long now = System.currentTimeMillis(); + if (now - startTime > 60000 + && now - lastLogTime > 60000) { + return true; + } else { + return false; + } + } +}