From 0a9947d65632116a006c81b4c04d272275fd9ded Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Fri, 27 May 2022 14:07:22 -0700 Subject: [PATCH] HBASE-27064 CME in TestRegionNormalizerWorkQueue --- .../normalizer/RegionNormalizerWorkQueue.java | 107 ++++++------------ 1 file changed, 32 insertions(+), 75 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java index c1cab5f97b44..f8c969a9f617 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorkQueue.java @@ -25,7 +25,7 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.yetus.audience.InterfaceAudience; /** @@ -39,8 +39,6 @@ * {@link BlockingQueue}. *
  • Allows a producer to insert an item at the head of the queue, if desired.
  • * - * Assumes low-frequency and low-parallelism concurrent access, so protects state using a simplistic - * synchronization strategy. */ @InterfaceAudience.Private class RegionNormalizerWorkQueue { @@ -48,53 +46,15 @@ class RegionNormalizerWorkQueue { /** Underlying storage structure that gives us the Set behavior and FIFO retrieval policy. */ private LinkedHashSet delegate; - // the locking structure used here follows the example found in LinkedBlockingQueue. The - // difference is that our locks guard access to `delegate` rather than the head node. - - /** Lock held by take, poll, etc */ - private final ReentrantLock takeLock; - + /** Lock for puts and takes **/ + private final ReentrantReadWriteLock lock; /** Wait queue for waiting takes */ private final Condition notEmpty; - /** Lock held by put, offer, etc */ - private final ReentrantLock putLock; - RegionNormalizerWorkQueue() { delegate = new LinkedHashSet<>(); - takeLock = new ReentrantLock(); - notEmpty = takeLock.newCondition(); - putLock = new ReentrantLock(); - } - - /** - * Signals a waiting take. Called only from put/offer (which do not otherwise ordinarily lock - * takeLock.) - */ - private void signalNotEmpty() { - final ReentrantLock takeLock = this.takeLock; - takeLock.lock(); - try { - notEmpty.signal(); - } finally { - takeLock.unlock(); - } - } - - /** - * Locks to prevent both puts and takes. - */ - private void fullyLock() { - putLock.lock(); - takeLock.lock(); - } - - /** - * Unlocks to allow both puts and takes. - */ - private void fullyUnlock() { - takeLock.unlock(); - putLock.unlock(); + lock = new ReentrantReadWriteLock(); + notEmpty = lock.writeLock().newCondition(); } /** @@ -105,16 +65,14 @@ public void put(E e) { if (e == null) { throw new NullPointerException(); } - - putLock.lock(); + lock.writeLock().lock(); try { delegate.add(e); + if (!delegate.isEmpty()) { + notEmpty.signal(); + } } finally { - putLock.unlock(); - } - - if (!delegate.isEmpty()) { - signalNotEmpty(); + lock.writeLock().unlock(); } } @@ -138,16 +96,14 @@ public void putAll(Collection c) { if (c == null) { throw new NullPointerException(); } - - putLock.lock(); + lock.writeLock().lock(); try { delegate.addAll(c); + if (!delegate.isEmpty()) { + notEmpty.signal(); + } } finally { - putLock.unlock(); - } - - if (!delegate.isEmpty()) { - signalNotEmpty(); + lock.writeLock().unlock(); } } @@ -159,19 +115,17 @@ public void putAllFirst(Collection c) { if (c == null) { throw new NullPointerException(); } - - fullyLock(); + lock.writeLock().lock(); try { final LinkedHashSet copy = new LinkedHashSet<>(c.size() + delegate.size()); copy.addAll(c); copy.addAll(delegate); delegate = copy; + if (!delegate.isEmpty()) { + notEmpty.signal(); + } } finally { - fullyUnlock(); - } - - if (!delegate.isEmpty()) { - signalNotEmpty(); + lock.writeLock().unlock(); } } @@ -183,10 +137,13 @@ public void putAllFirst(Collection c) { */ public E take() throws InterruptedException { E x; - takeLock.lockInterruptibly(); + // Take a write lock. If the delegate's queue is empty we need it to await(), which will + // drop the lock, then reacquire it; or if the queue is not empty we will use an iterator + // to mutate the head. + lock.writeLock().lockInterruptibly(); try { while (delegate.isEmpty()) { - notEmpty.await(); + notEmpty.await(); // await drops the lock, then reacquires it } final Iterator iter = delegate.iterator(); x = iter.next(); @@ -195,7 +152,7 @@ public E take() throws InterruptedException { notEmpty.signal(); } } finally { - takeLock.unlock(); + lock.writeLock().unlock(); } return x; } @@ -205,11 +162,11 @@ public E take() throws InterruptedException { * returns. */ public void clear() { - putLock.lock(); + lock.writeLock().lock(); try { delegate.clear(); } finally { - putLock.unlock(); + lock.writeLock().unlock(); } } @@ -218,21 +175,21 @@ public void clear() { * @return the number of elements in this queue */ public int size() { - takeLock.lock(); + lock.readLock().lock(); try { return delegate.size(); } finally { - takeLock.unlock(); + lock.readLock().unlock(); } } @Override public String toString() { - takeLock.lock(); + lock.readLock().lock(); try { return delegate.toString(); } finally { - takeLock.unlock(); + lock.readLock().unlock(); } } }