From 8d59af6c5ca28b7aebea9c9500fef68b84ce04e2 Mon Sep 17 00:00:00 2001 From: Chris Henson Date: Wed, 15 Nov 2017 15:24:43 -0500 Subject: [PATCH] 0003282: Common batch extracts may fail when a cluster is using a shared staging area --- .../service/impl/DataExtractorService.java | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index cf615b414e..f6f16f2792 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -864,11 +864,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe long ts = System.currentTimeMillis(); long extractTimeInMs = 0l; long byteCount = 0l; - long transformTimeInMs = 0l; - - if (currentBatch.getStatus() == Status.NE) { - triggerReExtraction(currentBatch); - } + long transformTimeInMs = 0l; if (currentBatch.getStatus() == Status.IG) { cleanupIgnoredBatch(sourceNode, targetNode, currentBatch, writer); @@ -965,10 +961,14 @@ private BatchLock acquireLock(OutgoingBatch batch, boolean useStagingDataWriter) BatchLock batchLock = new BatchLock(semaphoreKey); - Semaphore lock = locks.get(semaphoreKey); - if (lock == null) { - lock = new Semaphore(1); - locks.put(semaphoreKey, lock); + Semaphore lock = null; + + synchronized (DataExtractorService.this) { + lock = locks.get(semaphoreKey); + if (lock == null) { + lock = new Semaphore(1); + locks.put(semaphoreKey, lock); + } } try { lock.acquire(); // In-memory, intra-process lock. @@ -1035,10 +1035,14 @@ private String getLockingServerInfo() { } protected void releaseLock(BatchLock lock, OutgoingBatch batch, boolean useStagingDataWriter) { - if (lock != null) { - if (lock.inMemoryLock != null) { - lock.inMemoryLock.release(); - locks.remove(lock.semaphoreKey); + if (lock != null) { + if (lock.inMemoryLock != null) { + synchronized (DataExtractorService.this) { + if (!lock.inMemoryLock.hasQueuedThreads()) { + locks.remove(lock.semaphoreKey); + } + lock.inMemoryLock.release(); + } } if (lock.fileLock != null) { lock.fileLock.releaseLock();