Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27064 CME in TestRegionNormalizerWorkQueue #4468

Merged
merged 1 commit into from May 31, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -25,7 +25,7 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand All @@ -39,62 +39,22 @@
* {@link BlockingQueue}.</li>
* <li>Allows a producer to insert an item at the head of the queue, if desired.</li>
* </ul>
* Assumes low-frequency and low-parallelism concurrent access, so protects state using a simplistic
* synchronization strategy.
*/
@InterfaceAudience.Private
class RegionNormalizerWorkQueue<E> {

/** Underlying storage structure that gives us the Set behavior and FIFO retrieval policy. */
private LinkedHashSet<E> delegate;

// the locking structure used here follows the example found in LinkedBlockingQueue. The
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's super concerning that there's an issue here because, per this comment, the implementation follows the locking implementation of LinkedBlockingQueue.

// difference is that our locks guard access to `delegate` rather than the head node.

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock;

/** Lock for puts and takes **/
private final ReentrantReadWriteLock lock;
/** Wait queue for waiting takes */
private final Condition notEmpty;

/** Lock held by put, offer, etc */
private final ReentrantLock putLock;

RegionNormalizerWorkQueue() {
delegate = new LinkedHashSet<>();
takeLock = new ReentrantLock();
notEmpty = takeLock.newCondition();
putLock = new ReentrantLock();
}

/**
* Signals a waiting take. Called only from put/offer (which do not otherwise ordinarily lock
* takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

/**
* Locks to prevent both puts and takes.
*/
private void fullyLock() {
putLock.lock();
takeLock.lock();
}

/**
* Unlocks to allow both puts and takes.
*/
private void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
lock = new ReentrantReadWriteLock();
notEmpty = lock.writeLock().newCondition();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically we are changing notEmpty from being readLock Condition to writeLock Condition, this seems good.

Copy link
Contributor Author

@apurtell apurtell May 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReentrantReadWriteLock does not support Conditions on the readLock. It would throw UnsupportedOperationException.

}

/**
Expand All @@ -105,16 +65,14 @@ public void put(E e) {
if (e == null) {
throw new NullPointerException();
}

putLock.lock();
lock.writeLock().lock();
try {
delegate.add(e);
if (!delegate.isEmpty()) {
notEmpty.signal();
}
} finally {
putLock.unlock();
}

if (!delegate.isEmpty()) {
signalNotEmpty();
lock.writeLock().unlock();
}
}

Expand All @@ -138,16 +96,14 @@ public void putAll(Collection<? extends E> c) {
if (c == null) {
throw new NullPointerException();
}

putLock.lock();
lock.writeLock().lock();
try {
delegate.addAll(c);
if (!delegate.isEmpty()) {
notEmpty.signal();
}
} finally {
putLock.unlock();
}

if (!delegate.isEmpty()) {
signalNotEmpty();
lock.writeLock().unlock();
}
}

Expand All @@ -159,19 +115,17 @@ public void putAllFirst(Collection<? extends E> c) {
if (c == null) {
throw new NullPointerException();
}

fullyLock();
lock.writeLock().lock();
try {
final LinkedHashSet<E> copy = new LinkedHashSet<>(c.size() + delegate.size());
copy.addAll(c);
copy.addAll(delegate);
delegate = copy;
if (!delegate.isEmpty()) {
notEmpty.signal();
}
} finally {
fullyUnlock();
}

if (!delegate.isEmpty()) {
signalNotEmpty();
lock.writeLock().unlock();
}
}

Expand All @@ -183,10 +137,13 @@ public void putAllFirst(Collection<? extends E> c) {
*/
public E take() throws InterruptedException {
E x;
takeLock.lockInterruptibly();
// Take a write lock. If the delegate's queue is empty we need it to await(), which will
// drop the lock, then reacquire it; or if the queue is not empty we will use an iterator
// to mutate the head.
lock.writeLock().lockInterruptibly();
try {
while (delegate.isEmpty()) {
notEmpty.await();
notEmpty.await(); // await drops the lock, then reacquires it
}
final Iterator<E> iter = delegate.iterator();
x = iter.next();
Expand All @@ -195,7 +152,7 @@ public E take() throws InterruptedException {
notEmpty.signal();
}
} finally {
takeLock.unlock();
lock.writeLock().unlock();
}
return x;
}
Expand All @@ -205,11 +162,11 @@ public E take() throws InterruptedException {
* returns.
*/
public void clear() {
putLock.lock();
lock.writeLock().lock();
try {
delegate.clear();
} finally {
putLock.unlock();
lock.writeLock().unlock();
}
}

Expand All @@ -218,21 +175,21 @@ public void clear() {
* @return the number of elements in this queue
*/
public int size() {
takeLock.lock();
lock.readLock().lock();
try {
return delegate.size();
} finally {
takeLock.unlock();
lock.readLock().unlock();
}
}

@Override
public String toString() {
takeLock.lock();
lock.readLock().lock();
try {
return delegate.toString();
} finally {
takeLock.unlock();
lock.readLock().unlock();
}
}
}