diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index f4dd701fa7f3..b09674c64174 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -88,10 +88,12 @@ import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Random; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentSkipListMap; @@ -152,18 +154,6 @@ private static class CallFuture { } } - /* - * This is the return value from {@link #waitForWork()} indicating whether run() method should: - * read response - * close the connection - * take no action - connection would be closed by others - */ - private enum WaitForWorkResult { - READ_RESPONSE, - CALLER_SHOULD_CLOSE, - CLOSED - } - /** Thread that reads responses and notifies callers. Each connection owns a * socket connected to a remote address. Calls are multiplexed through this * socket: responses may be delivered out of order. */ @@ -255,13 +245,12 @@ public void remove(CallFuture cts){ */ @Override public void run() { - boolean closeBySelf = false; while (!shouldCloseConnection.get()) { CallFuture cts = null; try { cts = callsToWrite.take(); } catch (InterruptedException e) { - closeBySelf = markClosed(new InterruptedIOException()); + markClosed(new InterruptedIOException()); } if (cts == null || cts == CallFuture.DEATH_PILL) { @@ -285,14 +274,11 @@ public void run() { + ", message =" + e.getMessage()); } cts.call.setException(e); - closeBySelf = markClosed(e); + markClosed(e); } } cleanup(); - if (closeBySelf) { - close(); - } } /** @@ -526,28 +512,27 @@ private void checkIsOpen() throws IOException { * it is idle too long, it is marked as to be closed, * or the client is marked as not running. * - * @return WaitForWorkResult indicating whether it is time to read response; - * if the caller should close; or otherwise + * @return true if it is time to read a response; false otherwise. */ - protected synchronized WaitForWorkResult waitForWork() throws InterruptedException { + protected synchronized boolean waitForWork() throws InterruptedException { // beware of the concurrent access to the calls list: we can add calls, but as well // remove them. long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose; while (true) { if (shouldCloseConnection.get()) { - return WaitForWorkResult.CLOSED; + return false; } if (!running.get()) { - if (markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"))) { - return WaitForWorkResult.CALLER_SHOULD_CLOSE; - } - return WaitForWorkResult.CLOSED; + markClosed(new IOException("stopped with " + calls.size() + " pending request(s)")); + return false; } if (!calls.isEmpty()) { - return WaitForWorkResult.READ_RESPONSE; + // shouldCloseConnection can be set to true by a parallel thread here. The caller + // will need to check anyway. + return true; } if (EnvironmentEdgeManager.currentTime() >= waitUntil) { @@ -555,11 +540,9 @@ protected synchronized WaitForWorkResult waitForWork() throws InterruptedExcepti // We expect the number of calls to be zero here, but actually someone can // adds a call at the any moment, as there is no synchronization between this task // and adding new calls. It's not a big issue, but it will get an exception. - if (markClosed(new IOException( - "idle connection closed with " + calls.size() + " pending request(s)"))) { - return WaitForWorkResult.CALLER_SHOULD_CLOSE; - } - return WaitForWorkResult.CLOSED; + markClosed(new IOException( + "idle connection closed with " + calls.size() + " pending request(s)")); + return false; } wait(Math.min(minIdleTimeBeforeClose, 1000)); @@ -576,38 +559,24 @@ public void run() { LOG.trace(getName() + ": starting, connections " + connections.size()); } - WaitForWorkResult result = WaitForWorkResult.CALLER_SHOULD_CLOSE; try { - result = waitForWork(); // Wait here for work - read or close connection - while (result == WaitForWorkResult.READ_RESPONSE) { - if (readResponse()) { - // shouldCloseConnection is set to true by readResponse(). Close the connection - result = WaitForWorkResult.CALLER_SHOULD_CLOSE; - } else { - result = waitForWork(); - } + while (waitForWork()) { // Wait here for work - read or close connection + readResponse(); } } catch (InterruptedException t) { if (LOG.isTraceEnabled()) { LOG.trace(getName() + ": interrupted while waiting for call responses"); } - if (markClosed(ExceptionUtil.asInterrupt(t))) { - // shouldCloseConnection is set to true. Close connection - result = WaitForWorkResult.CALLER_SHOULD_CLOSE; - } + markClosed(ExceptionUtil.asInterrupt(t)); } catch (Throwable t) { if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t); } - if (markClosed(new IOException("Unexpected throwable while waiting call responses", t))) { - // shouldCloseConnection is set to true. Close connection - result = WaitForWorkResult.CALLER_SHOULD_CLOSE; - } - } - if (result == WaitForWorkResult.CALLER_SHOULD_CLOSE) { - close(); + markClosed(new IOException("Unexpected throwable while waiting call responses", t)); } + close(); + if (LOG.isTraceEnabled()) { LOG.trace(getName() + ": stopped, connections " + connections.size()); } @@ -735,9 +704,8 @@ protected synchronized void setupIOstreams() throws IOException { } IOException e = new FailedServerException( "This server is in the failed servers list: " + server); - if (markClosed(e)) { - close(); - } + markClosed(e); + close(); throw e; } @@ -815,9 +783,8 @@ public Boolean run() throws IOException { e = new IOException("Could not set up IO Streams to " + server, t); } } - if (markClosed(e)) { - close(); - } + markClosed(e); + close(); throw e; } } @@ -948,9 +915,8 @@ private void writeRequest(Call call, final int priority, Span span) throws IOExc } catch (IOException e) { // We set the value inside the synchronized block, this way the next in line // won't even try to write - if (markClosed(e)) { - close(); - } + markClosed(e); + close(); writeException = e; interrupt(); } @@ -968,10 +934,9 @@ private void writeRequest(Call call, final int priority, Span span) throws IOExc /* Receive a response. * Because only one receiver, so no synchronization on in. - * @return true if connection should be closed by caller */ - protected boolean readResponse() { - if (shouldCloseConnection.get()) return false; + protected void readResponse() { + if (shouldCloseConnection.get()) return; Call call = null; boolean expectedCall = false; try { @@ -993,14 +958,14 @@ protected boolean readResponse() { int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); int whatIsLeftToRead = totalSize - readSoFar; IOUtils.skipFully(in, whatIsLeftToRead); - return false; + return; } if (responseHeader.hasException()) { ExceptionResponse exceptionResponse = responseHeader.getException(); RemoteException re = createRemoteException(exceptionResponse); call.setException(re); if (isFatalConnectionException(exceptionResponse)) { - return markClosed(re); + markClosed(re); } } else { Message value = null; @@ -1027,12 +992,11 @@ protected boolean readResponse() { if (LOG.isTraceEnabled()) LOG.trace("ignored", e); } else { // Treat this as a fatal condition and close this connection - return markClosed(e); + markClosed(e); } } finally { cleanupCalls(false); } - return false; } /** @@ -1058,22 +1022,18 @@ private RemoteException createRemoteException(final ExceptionResponse e) { e.getStackTrace(), doNotRetry); } - /* - * @return true if shouldCloseConnection is set true by this thread; false otherwise - */ - protected boolean markClosed(IOException e) { + protected synchronized void markClosed(IOException e) { if (e == null) throw new NullPointerException(); - boolean ret = shouldCloseConnection.compareAndSet(false, true); - if (ret) { + if (shouldCloseConnection.compareAndSet(false, true)) { if (LOG.isTraceEnabled()) { LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage()); } if (callSender != null) { callSender.close(); } + notifyAll(); } - return ret; } @@ -1161,6 +1121,7 @@ public void close() { if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client"); if (!running.compareAndSet(true, false)) return; + Set connsToClose = null; // wake up all connections synchronized (connections) { for (Connection conn : connections.values()) { @@ -1172,13 +1133,19 @@ public void close() { // In case the CallSender did not setupIOStreams() yet, the Connection may not be started // at all (if CallSender has a cancelled Call it can happen). See HBASE-13851 if (!conn.isAlive()) { - if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) { - conn.close(); + if (connsToClose == null) { + connsToClose = new HashSet(); } + connsToClose.add(conn); } } } - + if (connsToClose != null) { + for (Connection conn : connsToClose) { + conn.markClosed(new InterruptedIOException("RpcClient is closing")); + conn.close(); + } + } // wait until all connections are closed while (!connections.isEmpty()) { try {