diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java index 9c94b7657f5..5c62b991d41 100644 --- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java +++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java @@ -19,15 +19,6 @@ package org.apache.thrift.server; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.spi.SelectorProvider; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.thrift.TAsyncProcessor; import org.apache.thrift.TByteArrayOutputStream; import org.apache.thrift.TException; @@ -42,6 +33,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.SelectorProvider; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + /** * Provides common methods and classes used by nonblocking TServer * implementations. @@ -102,7 +102,7 @@ public void serve() { /** * Starts any threads required for serving. - * + * * @return true if everything went ok, false if threads could not be started. */ protected abstract boolean startThreads(); @@ -115,7 +115,7 @@ public void serve() { /** * Have the server transport start accepting connections. - * + * * @return true if we started listening successfully, false if something went * wrong. */ @@ -139,7 +139,7 @@ protected void stopListening() { /** * Perform an invocation. This method could behave several different ways - * invoke immediately inline, queue for separate execution, etc. - * + * * @return true if invocation was successfully requested, which is not a * guarantee that invocation has completed. False if the request * failed. @@ -152,7 +152,7 @@ protected void stopListening() { * corresponding to requests. */ protected abstract class AbstractSelectThread extends Thread { - protected final Selector selector; + protected Selector selector; // List of FrameBuffers that want to change their selection interests. protected final Set selectInterestChanges = new HashSet(); @@ -285,21 +285,21 @@ public class FrameBuffer { protected ByteBuffer buffer_; protected final TByteArrayOutputStream response_; - + // the frame that the TTransport should wrap. protected final TMemoryInputTransport frameTrans_; - + // the transport that should be used to connect to clients protected final TTransport inTrans_; - + protected final TTransport outTrans_; - + // the input protocol to use on frames protected final TProtocol inProt_; - + // the output protocol to use on frames protected final TProtocol outProt_; - + // context associated with this connection protected final ServerContext context_; @@ -328,7 +328,7 @@ public FrameBuffer(final TNonblockingTransport trans, /** * Give this FrameBuffer a chance to read. The selector loop should have * received a read event for this FrameBuffer. - * + * * @return true if the connection should live on, false if it should be * closed */ @@ -455,7 +455,7 @@ public void changeSelectInterests() { public void close() { // if we're being closed due to an error, we might have allocated a // buffer that we need to subtract for our memory accounting. - if (state_ == FrameBufferState.READING_FRAME || + if (state_ == FrameBufferState.READING_FRAME || state_ == FrameBufferState.READ_FRAME_COMPLETE || state_ == FrameBufferState.AWAITING_CLOSE) { readBufferBytesAllocated.addAndGet(-buffer_.array().length); @@ -510,7 +510,7 @@ public void responseReady() { public void invoke() { frameTrans_.reset(buffer_.array()); response_.reset(); - + try { if (eventHandler_ != null) { eventHandler_.processContext(context_, inTrans_, outTrans_); @@ -530,7 +530,7 @@ public void invoke() { /** * Perform a read into buffer. - * + * * @return true if the read succeeded, false if there was an error or the * connection closed. */ diff --git a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java index 353b8e07138..038507e9c68 100644 --- a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java +++ b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java @@ -19,7 +19,15 @@ package org.apache.thrift.server; +import org.apache.thrift.transport.TNonblockingServerTransport; +import org.apache.thrift.transport.TNonblockingTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.spi.SelectorProvider; @@ -37,24 +45,18 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import org.apache.thrift.transport.TNonblockingServerTransport; -import org.apache.thrift.transport.TNonblockingTransport; -import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * A Half-Sync/Half-Async server with a separate pool of threads to handle * non-blocking I/O. Accepts are handled on a single thread, and a configurable * number of nonblocking selector threads manage reading and writing of client * connections. A synchronous worker thread pool handles processing of requests. - * + * * Performs better than TNonblockingServer/THsHaServer in multi-core * environments when the the bottleneck is CPU on the single selector thread * handling I/O. In addition, because the accept handling is decoupled from * reads/writes and invocation, the server has better ability to handle back- * pressure from new connections (e.g. stop accepting when busy). - * + * * Like TNonblockingServer, it relies on the use of TFramedTransport. */ public class TThreadedSelectorServer extends AbstractNonblockingServer { @@ -205,7 +207,7 @@ public TThreadedSelectorServer(Args args) { /** * Start the accept and selector threads running to deal with clients. - * + * * @return true if everything went ok, false if we couldn't start for some * reason. */ @@ -349,7 +351,7 @@ protected class AcceptThread extends Thread { /** * Set up the AcceptThead - * + * * @throws IOException */ public AcceptThread(TNonblockingServerTransport serverTransport, @@ -478,10 +480,13 @@ protected class SelectorThread extends AbstractSelectThread { // Accepted connections added by the accept thread. private final BlockingQueue acceptedQueue; + private int SELECTOR_AUTO_REBUILD_THRESHOLD = 512; + private long MONITOR_PERIOD = 1000L; + private int jvmBug = 0; /** * Set up the SelectorThread with an unbounded queue for incoming accepts. - * + * * @throws IOException * if a selector cannot be created */ @@ -491,7 +496,7 @@ public SelectorThread() throws IOException { /** * Set up the SelectorThread with an bounded queue for incoming accepts. - * + * * @throws IOException * if a selector cannot be created */ @@ -501,7 +506,7 @@ public SelectorThread(int maxPendingAccepts) throws IOException { /** * Set up the SelectorThread with a specified queue for connections. - * + * * @param acceptedQueue * The BlockingQueue implementation for holding incoming accepted * connections. @@ -515,7 +520,7 @@ public SelectorThread(BlockingQueue acceptedQueue) throws /** * Hands off an accepted connection to be handled by this thread. This * method will block if the queue for new connections is at capacity. - * + * * @param accepted * The connection that has been accepted. * @return true if the connection has been successfully added. @@ -566,8 +571,8 @@ public void run() { */ private void select() { try { - // wait for io events. - selector.select(); + + doSelect(); // process the io events we received Iterator selectedKeys = selector.selectedKeys().iterator(); @@ -596,6 +601,77 @@ private void select() { } } + /** + * Do select and judge epoll bug happen. + * See : https://issues.apache.org/jira/browse/THRIFT-4251 + */ + private void doSelect() throws IOException { + long beforeSelect = System.currentTimeMillis(); + int selectedNums = selector.select(); + long afterSelect = System.currentTimeMillis(); + + if (selectedNums == 0) { + jvmBug++; + } else { + jvmBug = 0; + } + + long selectedTime = afterSelect - beforeSelect; + if (selectedTime >= MONITOR_PERIOD) { + jvmBug = 0; + } else if (jvmBug > SELECTOR_AUTO_REBUILD_THRESHOLD) { + LOGGER.warn("In {} ms happen {} times jvm bug; rebuilding selector.", MONITOR_PERIOD, jvmBug); + rebuildSelector(); + selector.selectNow(); + jvmBug = 0; + } + + } + + /** + * Replaces the current Selector of this SelectorThread with newly created Selector to work + * around the infamous epoll 100% CPU bug. + */ + private synchronized void rebuildSelector() { + final Selector oldSelector = selector; + if (oldSelector == null) { + return; + } + Selector newSelector = null; + try { + newSelector = Selector.open(); + LOGGER.warn("Created new Selector."); + } catch (IOException e) { + LOGGER.error("Create new Selector error.", e); + } + + for (SelectionKey key : oldSelector.selectedKeys()) { + if (!key.isValid() && key.readyOps() == 0) + continue; + SelectableChannel channel = key.channel(); + Object attachment = key.attachment(); + + try { + if (attachment == null) { + channel.register(newSelector, key.readyOps()); + } else { + channel.register(newSelector, key.readyOps(), attachment); + } + } catch (ClosedChannelException e) { + LOGGER.error("Register new selector key error.", e); + } + + } + + selector = newSelector; + try { + oldSelector.close(); + } catch (IOException e) { + LOGGER.error("Close old selector error.", e); + } + LOGGER.warn("Replace new selector success."); + } + private void processAcceptedConnections() { // Register accepted connections while (!stopped_) {