Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27768 Race conditions in BlockingRpcConnection #5154

Merged
merged 2 commits into from
Apr 10, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import javax.security.sasl.SaslException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
Expand Down Expand Up @@ -96,6 +97,13 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
justification = "We are always under lock actually")
private Thread thread;

// Used for ensuring two reader threads don't run over each other. Should only be used
// in reader thread run() method, to avoid deadlocks with synchronization on BlockingRpcConnection
private final Object readerThreadLock = new Object();

// Used to suffix the threadName in a way that we can differentiate them in logs/thread dumps.
private final AtomicInteger attempts = new AtomicInteger();

// connected socket. protected for writing UT.
protected Socket socket = null;
private DataInputStream in;
Expand Down Expand Up @@ -323,6 +331,17 @@ private synchronized boolean waitForWork() {
if (thread == null) {
return false;
}

// If closeConn is called while we are in the readResponse method, it's possible that a new
// call to setupIOStreams comes in and creates a new value for "thread" before readResponse
// finishes. Once readResponse finishes, it will come in here and thread will be non-null
// above, but pointing at a new thread. In that case, we should end to avoid a situation
// where two threads are forever competing for the same socket.
if (!isCurrentThreadExpected()) {
LOG.debug("Thread replaced by new connection thread. Ending waitForWork loop.");
return false;
}

if (!calls.isEmpty()) {
return true;
}
Expand All @@ -336,20 +355,48 @@ private synchronized boolean waitForWork() {
} catch (InterruptedException e) {
// Restore interrupt status
Thread.currentThread().interrupt();

String msg = "Interrupted while waiting for work";

// If we were interrupted by closeConn, it would have set thread to null.
// We are synchronized here and if we somehow got interrupted without setting thread to
// null, we want to make sure the connection is closed since the read thread would be dead.
// Rather than do a null check here, we check if the current thread is the expected thread.
// This guards against the case where a call to setupIOStreams got the synchronized lock
// first after closeConn, thus changing the thread to a new thread.
if (isCurrentThreadExpected()) {
LOG.debug(msg + ", closing connection");
closeConn(new InterruptedIOException(msg));
} else {
LOG.debug(msg);
}

return false;
}
}
}

@Override
public void run() {
if (LOG.isTraceEnabled()) {
LOG.trace(threadName + ": starting");
LOG.trace("starting");
}
while (waitForWork()) {
readResponse();

// We have a synchronization here because it's possible in error scenarios for a new
// thread to be started while readResponse is still reading on the socket. We don't want
// two threads to be reading from the same socket/inputstream.
// The below calls can synchronize on "BlockingRpcConnection.this".
// We should not synchronize on readerThreadLock anywhere else, to avoid deadlocks
synchronized (readerThreadLock) {
if (LOG.isTraceEnabled()) {
LOG.trace("started");
}
while (waitForWork()) {
readResponse();
}
}
if (LOG.isTraceEnabled()) {
LOG.trace(threadName + ": stopped");
LOG.trace("stopped");
}
}

Expand Down Expand Up @@ -522,7 +569,7 @@ public Boolean run() throws IOException {
}

// start the receiver thread after the socket connection has been set up
thread = new Thread(this, threadName);
thread = new Thread(this, threadName + " (attempt: " + attempts.incrementAndGet() + ")");
thread.setDaemon(true);
thread.start();
}
Expand Down Expand Up @@ -629,7 +676,7 @@ private void writeRequest(Call call) throws IOException {
call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
} catch (Throwable t) {
if (LOG.isTraceEnabled()) {
LOG.trace("Error while writing {}", call.toShortString());
LOG.trace("Error while writing {}", call.toShortString(), t);
}
IOException e = IPCUtil.toIOE(t);
closeConn(e);
Expand Down Expand Up @@ -716,16 +763,33 @@ private void readResponse() {
// since we expect certain responses to not make it by the specified
// {@link ConnectionId#rpcTimeout}.
if (LOG.isTraceEnabled()) {
LOG.trace("ignored", e);
LOG.trace("ignored ex for call {}", call, e);
}
} else {
synchronized (this) {
closeConn(e);
// The exception we received may have been caused by another thread closing
// this connection. It's possible that before getting to this point, a new connection was
// created. In that case, it doesn't help and can actually hurt to close again here.
if (isCurrentThreadExpected()) {
LOG.debug("Closing connection after error in call {}", call, e);
closeConn(e);
}
}
}
}
}

/**
* For use in the reader thread, tests if the current reader thread is the one expected to be
* running. When closeConn is called, the reader thread is expected to end. setupIOStreams then
* creates a new thread and updates the thread pointer. At that point, the new thread should be
* the only one running. We use this method to guard against cases where the old thread may be
* erroneously running or closing the connection in error states.
*/
private boolean isCurrentThreadExpected() {
return thread == Thread.currentThread();
}

@Override
protected synchronized void callTimeout(Call call) {
// call sender
Expand Down