From d0a8d53cb71683d79048db8c02ea97b6bd32663e Mon Sep 17 00:00:00 2001 From: chenson42 Date: Tue, 12 Feb 2013 20:38:06 +0000 Subject: [PATCH] 0001044: Map of semaphores used to prevent concurrent extraction of individual batches are not cleaned up --- .../service/impl/DataExtractorService.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index c23265555a..f010ad107e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -117,6 +117,8 @@ public class DataExtractorService extends AbstractService implements IDataExtrac private IStatisticManager statisticManager; private IStagingManager stagingManager; + + private Map locks = new HashMap(); public DataExtractorService(IParameterService parameterService, ISymmetricDialect symmetricDialect, IOutgoingBatchService outgoingBatchService, @@ -433,8 +435,6 @@ final protected OutgoingBatch requeryIfEnoughTimeHasPassed(long ts, OutgoingBatc return currentBatch; } - private Map locks = new HashMap(); - protected OutgoingBatch extractOutgoingBatch(Node targetNode, IDataWriter dataWriter, OutgoingBatch currentBatch, boolean streamToFileEnabled) { @@ -477,14 +477,15 @@ protected OutgoingBatch extractOutgoingBatch(Node targetNode, transformExtractWriter.close(); } } else { - if (!isPreviouslyExtracted(currentBatch)) { + if (currentBatch.getStatus() == Status.NE || + !isPreviouslyExtracted(currentBatch)) { int maxPermits = parameterService.getInt(ParameterConstants.CONCURRENT_WORKERS); Semaphore lock = null; try { synchronized (locks) { lock = locks.get(currentBatch.getBatchId()); if (lock == null) { - lock = new Semaphore(100); + lock = new Semaphore(maxPermits); locks.put(currentBatch.getBatchId(), lock); } try { @@ -523,7 +524,7 @@ protected OutgoingBatch extractOutgoingBatch(Node targetNode, synchronized (locks) { lock.release(); if (lock.availablePermits() == maxPermits) { - locks.remove(lock); + locks.remove(currentBatch.getBatchId()); } } } @@ -1033,6 +1034,6 @@ public boolean containsData() { return data != null; } - } + } }