Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27384 Backport HBASE-27064 in 2.4 #4794

Merged
merged 1 commit into from
Sep 27, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand All @@ -39,62 +39,22 @@
* {@link BlockingQueue}.</li>
* <li>Allows a producer to insert an item at the head of the queue, if desired.</li>
* </ul>
* Assumes low-frequency and low-parallelism concurrent access, so protects state using a simplistic
* synchronization strategy.
*/
@InterfaceAudience.Private
class RegionNormalizerWorkQueue<E> {

/** Underlying storage structure that gives us the Set behavior and FIFO retrieval policy. */
private LinkedHashSet<E> delegate;

// the locking structure used here follows the example found in LinkedBlockingQueue. The
// difference is that our locks guard access to `delegate` rather than the head node.

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock;

/** Lock for puts and takes **/
private final ReentrantReadWriteLock lock;
/** Wait queue for waiting takes */
private final Condition notEmpty;

/** Lock held by put, offer, etc */
private final ReentrantLock putLock;

RegionNormalizerWorkQueue() {
delegate = new LinkedHashSet<>();
takeLock = new ReentrantLock();
notEmpty = takeLock.newCondition();
putLock = new ReentrantLock();
}

/**
* Signals a waiting take. Called only from put/offer (which do not otherwise ordinarily lock
* takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

/**
* Locks to prevent both puts and takes.
*/
private void fullyLock() {
putLock.lock();
takeLock.lock();
}

/**
* Unlocks to allow both puts and takes.
*/
private void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
lock = new ReentrantReadWriteLock();
notEmpty = lock.writeLock().newCondition();
}

/**
Expand All @@ -105,16 +65,14 @@ public void put(E e) {
if (e == null) {
throw new NullPointerException();
}

putLock.lock();
lock.writeLock().lock();
try {
delegate.add(e);
if (!delegate.isEmpty()) {
notEmpty.signal();
}
} finally {
putLock.unlock();
}

if (!delegate.isEmpty()) {
signalNotEmpty();
lock.writeLock().unlock();
}
}

Expand All @@ -138,16 +96,14 @@ public void putAll(Collection<? extends E> c) {
if (c == null) {
throw new NullPointerException();
}

putLock.lock();
lock.writeLock().lock();
try {
delegate.addAll(c);
if (!delegate.isEmpty()) {
notEmpty.signal();
}
} finally {
putLock.unlock();
}

if (!delegate.isEmpty()) {
signalNotEmpty();
lock.writeLock().unlock();
}
}

Expand All @@ -159,19 +115,17 @@ public void putAllFirst(Collection<? extends E> c) {
if (c == null) {
throw new NullPointerException();
}

fullyLock();
lock.writeLock().lock();
try {
final LinkedHashSet<E> copy = new LinkedHashSet<>(c.size() + delegate.size());
copy.addAll(c);
copy.addAll(delegate);
delegate = copy;
if (!delegate.isEmpty()) {
notEmpty.signal();
}
} finally {
fullyUnlock();
}

if (!delegate.isEmpty()) {
signalNotEmpty();
lock.writeLock().unlock();
}
}

Expand All @@ -183,10 +137,13 @@ public void putAllFirst(Collection<? extends E> c) {
*/
public E take() throws InterruptedException {
E x;
takeLock.lockInterruptibly();
// Take a write lock. If the delegate's queue is empty we need it to await(), which will
// drop the lock, then reacquire it; or if the queue is not empty we will use an iterator
// to mutate the head.
lock.writeLock().lockInterruptibly();
try {
while (delegate.isEmpty()) {
notEmpty.await();
notEmpty.await(); // await drops the lock, then reacquires it
}
final Iterator<E> iter = delegate.iterator();
x = iter.next();
Expand All @@ -195,7 +152,7 @@ public E take() throws InterruptedException {
notEmpty.signal();
}
} finally {
takeLock.unlock();
lock.writeLock().unlock();
}
return x;
}
Expand All @@ -205,11 +162,11 @@ public E take() throws InterruptedException {
* returns.
*/
public void clear() {
putLock.lock();
lock.writeLock().lock();
try {
delegate.clear();
} finally {
putLock.unlock();
lock.writeLock().unlock();
}
}

Expand All @@ -218,21 +175,21 @@ public void clear() {
* @return the number of elements in this queue
*/
public int size() {
takeLock.lock();
lock.readLock().lock();
try {
return delegate.size();
} finally {
takeLock.unlock();
lock.readLock().unlock();
}
}

@Override
public String toString() {
takeLock.lock();
lock.readLock().lock();
try {
return delegate.toString();
} finally {
takeLock.unlock();
lock.readLock().unlock();
}
}
}