Skip to content

Commit

Permalink
0003282: Common batch extracts may fail when a cluster is using a shared
Browse files Browse the repository at this point in the history
staging area
  • Loading branch information
chenson42 committed Nov 15, 2017
1 parent 7dfd584 commit 8d59af6
Showing 1 changed file with 17 additions and 13 deletions.
Expand Up @@ -864,11 +864,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe
long ts = System.currentTimeMillis();
long extractTimeInMs = 0l;
long byteCount = 0l;
long transformTimeInMs = 0l;

if (currentBatch.getStatus() == Status.NE) {
triggerReExtraction(currentBatch);
}
long transformTimeInMs = 0l;

if (currentBatch.getStatus() == Status.IG) {
cleanupIgnoredBatch(sourceNode, targetNode, currentBatch, writer);
Expand Down Expand Up @@ -965,10 +961,14 @@ private BatchLock acquireLock(OutgoingBatch batch, boolean useStagingDataWriter)

BatchLock batchLock = new BatchLock(semaphoreKey);

Semaphore lock = locks.get(semaphoreKey);
if (lock == null) {
lock = new Semaphore(1);
locks.put(semaphoreKey, lock);
Semaphore lock = null;

synchronized (DataExtractorService.this) {
lock = locks.get(semaphoreKey);
if (lock == null) {
lock = new Semaphore(1);
locks.put(semaphoreKey, lock);
}
}
try {
lock.acquire(); // In-memory, intra-process lock.
Expand Down Expand Up @@ -1035,10 +1035,14 @@ private String getLockingServerInfo() {
}

protected void releaseLock(BatchLock lock, OutgoingBatch batch, boolean useStagingDataWriter) {
if (lock != null) {
if (lock.inMemoryLock != null) {
lock.inMemoryLock.release();
locks.remove(lock.semaphoreKey);
if (lock != null) {
if (lock.inMemoryLock != null) {
synchronized (DataExtractorService.this) {
if (!lock.inMemoryLock.hasQueuedThreads()) {
locks.remove(lock.semaphoreKey);
}
lock.inMemoryLock.release();
}
}
if (lock.fileLock != null) {
lock.fileLock.releaseLock();
Expand Down

0 comments on commit 8d59af6

Please sign in to comment.