From ab9cc9b3ea89de9b6269dcadfaabc3af675ad37e Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Wed, 29 Mar 2023 17:32:59 -0400 Subject: [PATCH 1/2] HBASE-27768 Race conditions in BlockingRpcConnection --- .../hbase/ipc/BlockingRpcConnection.java | 77 +++++++++++++++++-- 1 file changed, 69 insertions(+), 8 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index d63d14940e78..7961d10d27bd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; import javax.security.sasl.SaslException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; @@ -96,6 +97,13 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { justification = "We are always under lock actually") private Thread thread; + // Used for ensuring two reader threads don't run over each other. Should only be used + // in reader thread run() method, to avoid deadlocks with synchronization on BlockingRpcConnection + private final Object readerThreadLock = new Object(); + + // Used to suffix the threadName in a way that we can differentiate them in logs/thread dumps. + private final AtomicInteger attempts = new AtomicInteger(); + // connected socket. protected for writing UT. protected Socket socket = null; private DataInputStream in; @@ -323,6 +331,17 @@ private synchronized boolean waitForWork() { if (thread == null) { return false; } + + // If closeConn is called while we are in the readResponse method, it's possible that a new + // call to setupIOStreams comes in and creates a new value for "thread" before readResponse + // finishes. Once readResponse finishes, it will come in here and thread will be non-null + // above, but pointing at a new thread. In that case, we should end to avoid a situation + // where two threads are forever competing for the same socket. + if (!isCurrentThreadExpected()) { + LOG.debug("Thread replaced by new connection thread. Ending waitForWork loop."); + return false; + } + if (!calls.isEmpty()) { return true; } @@ -336,6 +355,20 @@ private synchronized boolean waitForWork() { } catch (InterruptedException e) { // Restore interrupt status Thread.currentThread().interrupt(); + + String msg = "Interrupted while waiting for work"; + LOG.debug(msg); + // If we were interrupted by closeConn, it would have set thread to null. + // We are synchronized here and if we somehow got interrupted without setting thread to + // null, we want to make sure the connection is closed since the read thread would be dead. + // Rather than do a null check here, we check if the current thread is the expected thread. + // This guards against the case where a call to setupIOStreams got the synchronized lock + // first after closeConn, thus changing the thread to a new thread. + if (isCurrentThreadExpected()) { + closeConn(new InterruptedIOException(msg)); + } + + return false; } } } @@ -343,13 +376,24 @@ private synchronized boolean waitForWork() { @Override public void run() { if (LOG.isTraceEnabled()) { - LOG.trace(threadName + ": starting"); + LOG.trace("starting"); } - while (waitForWork()) { - readResponse(); + + // We have a synchronization here because it's possible in error scenarios for a new + // thread to be started while readResponse is still reading on the socket. We don't want + // two threads to be reading from the same socket/inputstream. + // The below calls can synchronize on "BlockingRpcConnection.this". + // We should not synchronize on readerThreadLock anywhere else, to avoid deadlocks + synchronized (readerThreadLock) { + if (LOG.isTraceEnabled()) { + LOG.trace("started"); + } + while (waitForWork()) { + readResponse(); + } } if (LOG.isTraceEnabled()) { - LOG.trace(threadName + ": stopped"); + LOG.trace("stopped"); } } @@ -522,7 +566,7 @@ public Boolean run() throws IOException { } // start the receiver thread after the socket connection has been set up - thread = new Thread(this, threadName); + thread = new Thread(this, threadName + " (attempt: " + attempts.incrementAndGet() + ")"); thread.setDaemon(true); thread.start(); } @@ -629,7 +673,7 @@ private void writeRequest(Call call) throws IOException { call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock)); } catch (Throwable t) { if (LOG.isTraceEnabled()) { - LOG.trace("Error while writing {}", call.toShortString()); + LOG.trace("Error while writing {}", call.toShortString(), t); } IOException e = IPCUtil.toIOE(t); closeConn(e); @@ -716,16 +760,33 @@ private void readResponse() { // since we expect certain responses to not make it by the specified // {@link ConnectionId#rpcTimeout}. if (LOG.isTraceEnabled()) { - LOG.trace("ignored", e); + LOG.trace("ignored ex for call {}", call, e); } } else { synchronized (this) { - closeConn(e); + // The exception we received may have been caused by another thread closing + // this connection. It's possible that before getting to this point, a new connection was + // created. In that case, it doesn't help and can actually hurt to close again here. + if (isCurrentThreadExpected()) { + LOG.debug("Closing connection after error in call {}", call, e); + closeConn(e); + } } } } } + /** + * For use in the reader thread, tests if the current reader thread is the one expected to be + * running. When closeConn is called, the reader thread is expected to end. setupIOStreams then + * creates a new thread and updates the thread pointer. At that point, the new thread should be + * the only one running. We use this method to guard against cases where the old thread may be + * erroneously running or closing the connection in error states. + */ + private boolean isCurrentThreadExpected() { + return thread == Thread.currentThread(); + } + @Override protected synchronized void callTimeout(Call call) { // call sender From bd38cb2e1b079610cfb2b888e214424303d68732 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Sat, 8 Apr 2023 10:37:51 -0400 Subject: [PATCH 2/2] minor log change --- .../org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index 7961d10d27bd..f0e71ae68cb2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -357,7 +357,7 @@ private synchronized boolean waitForWork() { Thread.currentThread().interrupt(); String msg = "Interrupted while waiting for work"; - LOG.debug(msg); + // If we were interrupted by closeConn, it would have set thread to null. // We are synchronized here and if we somehow got interrupted without setting thread to // null, we want to make sure the connection is closed since the read thread would be dead. @@ -365,7 +365,10 @@ private synchronized boolean waitForWork() { // This guards against the case where a call to setupIOStreams got the synchronized lock // first after closeConn, thus changing the thread to a new thread. if (isCurrentThreadExpected()) { + LOG.debug(msg + ", closing connection"); closeConn(new InterruptedIOException(msg)); + } else { + LOG.debug(msg); } return false;