Skip to content

Commit

Permalink
0001044: Map of semaphores used to prevent concurrent extraction of i…
Browse files Browse the repository at this point in the history
…ndividual batches are not cleaned up
  • Loading branch information
chenson42 committed Feb 12, 2013
1 parent db39ac9 commit d0a8d53
Showing 1 changed file with 7 additions and 6 deletions.
Expand Up @@ -117,6 +117,8 @@ public class DataExtractorService extends AbstractService implements IDataExtrac
private IStatisticManager statisticManager;

private IStagingManager stagingManager;

private Map<Long, Semaphore> locks = new HashMap<Long, Semaphore>();

public DataExtractorService(IParameterService parameterService,
ISymmetricDialect symmetricDialect, IOutgoingBatchService outgoingBatchService,
Expand Down Expand Up @@ -433,8 +435,6 @@ final protected OutgoingBatch requeryIfEnoughTimeHasPassed(long ts, OutgoingBatc
return currentBatch;
}

private Map<Long, Semaphore> locks = new HashMap<Long, Semaphore>();

protected OutgoingBatch extractOutgoingBatch(Node targetNode,
IDataWriter dataWriter, OutgoingBatch currentBatch,
boolean streamToFileEnabled) {
Expand Down Expand Up @@ -477,14 +477,15 @@ protected OutgoingBatch extractOutgoingBatch(Node targetNode,
transformExtractWriter.close();
}
} else {
if (!isPreviouslyExtracted(currentBatch)) {
if (currentBatch.getStatus() == Status.NE ||
!isPreviouslyExtracted(currentBatch)) {
int maxPermits = parameterService.getInt(ParameterConstants.CONCURRENT_WORKERS);
Semaphore lock = null;
try {
synchronized (locks) {
lock = locks.get(currentBatch.getBatchId());
if (lock == null) {
lock = new Semaphore(100);
lock = new Semaphore(maxPermits);
locks.put(currentBatch.getBatchId(), lock);
}
try {
Expand Down Expand Up @@ -523,7 +524,7 @@ protected OutgoingBatch extractOutgoingBatch(Node targetNode,
synchronized (locks) {
lock.release();
if (lock.availablePermits() == maxPermits) {
locks.remove(lock);
locks.remove(currentBatch.getBatchId());
}
}
}
Expand Down Expand Up @@ -1033,6 +1034,6 @@ public boolean containsData() {
return data != null;
}

}
}

}

0 comments on commit d0a8d53

Please sign in to comment.