diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 3e57aa7d71..371c1ef073 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -574,7 +574,7 @@ protected List extract(final ProcessInfo processInfo, final Node long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE); Node sourceNode = nodeService.findIdentity(); final FutureExtractStatus status = new FutureExtractStatus(); - executor = Executors.newFixedThreadPool(1, new CustomizableThreadFactory(String.format("dataextractor-%s-%s", targetNode.getNodeGroupId(), targetNode.getNodeGroupId()))); + executor = Executors.newFixedThreadPool(1, new CustomizableThreadFactory(String.format("dataextractor-%s-%s", targetNode.getNodeGroupId(), targetNode.getNodeId()))); List> futures = new ArrayList>(); processInfo.setBatchCount(activeBatches.size()); @@ -1016,19 +1016,21 @@ protected IStagedResource getStagedResource(OutgoingBatch currentBatch) { protected boolean isPreviouslyExtracted(OutgoingBatch currentBatch, boolean acquireReference) { IStagedResource previouslyExtracted = getStagedResource(currentBatch); - if (previouslyExtracted != null && previouslyExtracted.exists() - && previouslyExtracted.getState() != State.CREATE) { - if (log.isDebugEnabled()) { - log.debug("We have already extracted batch {}. Using the existing extraction: {}", - currentBatch.getBatchId(), previouslyExtracted); - } - if (acquireReference) { - previouslyExtracted.reference(); + if (previouslyExtracted != null && previouslyExtracted.exists() && previouslyExtracted.getState() != State.CREATE) { + synchronized (DataExtractorService.this) { + if (previouslyExtracted.exists()) { + if (log.isDebugEnabled()) { + log.debug("We have already extracted batch {}. Using the existing extraction: {}", currentBatch.getBatchId(), + previouslyExtracted); + } + if (acquireReference) { + previouslyExtracted.reference(); + } + return true; + } } - return true; - } else { - return false; } + return false; } protected boolean isRetry(OutgoingBatch currentBatch, Node remoteNode) { @@ -1199,7 +1201,11 @@ protected void transferFromStaging(ExtractMode mode, BatchType batchType, Outgoi stagedResource.close(); stagedResource.dereference(); if (!stagedResource.isFileResource() && !stagedResource.isInUse()) { - stagedResource.delete(); + synchronized(DataExtractorService.this) { + if (!stagedResource.isFileResource() && !stagedResource.isInUse()) { + stagedResource.delete(); + } + } } } } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java index 256ff2c62c..df86a5fdf3 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java @@ -100,11 +100,13 @@ protected static String toPath(File directory, File file) { @Override public void reference() { references++; + log.debug("Increased reference to {} for {} by {}", references, path, Thread.currentThread().getName()); } @Override public void dereference() { references--; + log.debug("Decreased reference to {} for {} by {}", references, path, Thread.currentThread().getName()); } public boolean isInUse() { @@ -329,10 +331,6 @@ public void refreshLastUpdateTime() { public boolean delete() { close(); - return deleteInternal(); - } - - private boolean deleteInternal() { boolean deleted = false; if (file != null && file.exists()) { FileUtils.deleteQuietly(file); @@ -347,7 +345,10 @@ private boolean deleteInternal() { if (deleted) { stagingManager.resourcePaths.remove(path); stagingManager.inUse.remove(path); - } + if (log.isDebugEnabled() && path.contains("outgoing")) { + log.debug("Deleted staging resource {}", path); + } + } return deleted; }