From d9aaf3f593dd8bc47f3d69ed001d569381896c93 Mon Sep 17 00:00:00 2001 From: chenson42 Date: Mon, 1 Jul 2013 20:35:31 +0000 Subject: [PATCH] 0001305: Possible lock up during extraction. If the max number of locks are reached on a batch the extract method can hang forever --- .../symmetric/service/impl/DataExtractorService.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index b524ba237d..12eb8b5159 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -551,10 +551,11 @@ public OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targetNo if (currentBatch.getStatus() == Status.NE || !isPreviouslyExtracted(currentBatch)) { int maxPermits = parameterService.getInt(ParameterConstants.CONCURRENT_WORKERS); + String semaphoreKey = streamToFileEnabled ? Long.toString(currentBatch.getBatchId()) : currentBatch.getNodeBatchId(); Semaphore lock = null; try { synchronized (locks) { - lock = locks.get(currentBatch.getBatchId()); + lock = locks.get(semaphoreKey); if (lock == null) { lock = new Semaphore(maxPermits); locks.put(currentBatch.getBatchId(), lock); @@ -592,10 +593,10 @@ public OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targetNo } throw ex; } finally { + lock.release(); synchronized (locks) { - lock.release(); if (lock.availablePermits() == maxPermits) { - locks.remove(currentBatch.getBatchId()); + locks.remove(semaphoreKey); } } }