Skip to content

Commit

Permalink
THRIFT-4251: Fix JDK Epoll Bug in Thrift of TThreadedSelectorServer m…
Browse files Browse the repository at this point in the history
…odel.

Client: Java

This closes #1313
  • Loading branch information
Johnny-Liao authored and jeking3 committed Sep 21, 2017
1 parent 8506121 commit 9ffb41d
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 39 deletions.
Expand Up @@ -19,15 +19,6 @@

package org.apache.thrift.server;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.thrift.TAsyncProcessor;
import org.apache.thrift.TByteArrayOutputStream;
import org.apache.thrift.TException;
Expand All @@ -42,6 +33,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

/**
* Provides common methods and classes used by nonblocking TServer
* implementations.
Expand Down Expand Up @@ -102,7 +102,7 @@ public void serve() {

/**
* Starts any threads required for serving.
*
*
* @return true if everything went ok, false if threads could not be started.
*/
protected abstract boolean startThreads();
Expand All @@ -115,7 +115,7 @@ public void serve() {

/**
* Have the server transport start accepting connections.
*
*
* @return true if we started listening successfully, false if something went
* wrong.
*/
Expand All @@ -139,7 +139,7 @@ protected void stopListening() {
/**
* Perform an invocation. This method could behave several different ways -
* invoke immediately inline, queue for separate execution, etc.
*
*
* @return true if invocation was successfully requested, which is not a
* guarantee that invocation has completed. False if the request
* failed.
Expand All @@ -152,7 +152,7 @@ protected void stopListening() {
* corresponding to requests.
*/
protected abstract class AbstractSelectThread extends Thread {
protected final Selector selector;
protected Selector selector;

// List of FrameBuffers that want to change their selection interests.
protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
Expand Down Expand Up @@ -285,21 +285,21 @@ public class FrameBuffer {
protected ByteBuffer buffer_;

protected final TByteArrayOutputStream response_;

// the frame that the TTransport should wrap.
protected final TMemoryInputTransport frameTrans_;

// the transport that should be used to connect to clients
protected final TTransport inTrans_;

protected final TTransport outTrans_;

// the input protocol to use on frames
protected final TProtocol inProt_;

// the output protocol to use on frames
protected final TProtocol outProt_;

// context associated with this connection
protected final ServerContext context_;

Expand Down Expand Up @@ -328,7 +328,7 @@ public FrameBuffer(final TNonblockingTransport trans,
/**
* Give this FrameBuffer a chance to read. The selector loop should have
* received a read event for this FrameBuffer.
*
*
* @return true if the connection should live on, false if it should be
* closed
*/
Expand Down Expand Up @@ -455,7 +455,7 @@ public void changeSelectInterests() {
public void close() {
// if we're being closed due to an error, we might have allocated a
// buffer that we need to subtract for our memory accounting.
if (state_ == FrameBufferState.READING_FRAME ||
if (state_ == FrameBufferState.READING_FRAME ||
state_ == FrameBufferState.READ_FRAME_COMPLETE ||
state_ == FrameBufferState.AWAITING_CLOSE) {
readBufferBytesAllocated.addAndGet(-buffer_.array().length);
Expand Down Expand Up @@ -510,7 +510,7 @@ public void responseReady() {
public void invoke() {
frameTrans_.reset(buffer_.array());
response_.reset();

try {
if (eventHandler_ != null) {
eventHandler_.processContext(context_, inTrans_, outTrans_);
Expand All @@ -530,7 +530,7 @@ public void invoke() {

/**
* Perform a read into buffer.
*
*
* @return true if the read succeeded, false if there was an error or the
* connection closed.
*/
Expand Down
108 changes: 92 additions & 16 deletions lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
Expand Up @@ -19,7 +19,15 @@

package org.apache.thrift.server;

import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
Expand All @@ -37,24 +45,18 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A Half-Sync/Half-Async server with a separate pool of threads to handle
* non-blocking I/O. Accepts are handled on a single thread, and a configurable
* number of nonblocking selector threads manage reading and writing of client
* connections. A synchronous worker thread pool handles processing of requests.
*
*
* Performs better than TNonblockingServer/THsHaServer in multi-core
* environments when the the bottleneck is CPU on the single selector thread
* handling I/O. In addition, because the accept handling is decoupled from
* reads/writes and invocation, the server has better ability to handle back-
* pressure from new connections (e.g. stop accepting when busy).
*
*
* Like TNonblockingServer, it relies on the use of TFramedTransport.
*/
public class TThreadedSelectorServer extends AbstractNonblockingServer {
Expand Down Expand Up @@ -205,7 +207,7 @@ public TThreadedSelectorServer(Args args) {

/**
* Start the accept and selector threads running to deal with clients.
*
*
* @return true if everything went ok, false if we couldn't start for some
* reason.
*/
Expand Down Expand Up @@ -349,7 +351,7 @@ protected class AcceptThread extends Thread {

/**
* Set up the AcceptThead
*
*
* @throws IOException
*/
public AcceptThread(TNonblockingServerTransport serverTransport,
Expand Down Expand Up @@ -478,10 +480,13 @@ protected class SelectorThread extends AbstractSelectThread {

// Accepted connections added by the accept thread.
private final BlockingQueue<TNonblockingTransport> acceptedQueue;
private int SELECTOR_AUTO_REBUILD_THRESHOLD = 512;
private long MONITOR_PERIOD = 1000L;
private int jvmBug = 0;

/**
* Set up the SelectorThread with an unbounded queue for incoming accepts.
*
*
* @throws IOException
* if a selector cannot be created
*/
Expand All @@ -491,7 +496,7 @@ public SelectorThread() throws IOException {

/**
* Set up the SelectorThread with an bounded queue for incoming accepts.
*
*
* @throws IOException
* if a selector cannot be created
*/
Expand All @@ -501,7 +506,7 @@ public SelectorThread(int maxPendingAccepts) throws IOException {

/**
* Set up the SelectorThread with a specified queue for connections.
*
*
* @param acceptedQueue
* The BlockingQueue implementation for holding incoming accepted
* connections.
Expand All @@ -515,7 +520,7 @@ public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue) throws
/**
* Hands off an accepted connection to be handled by this thread. This
* method will block if the queue for new connections is at capacity.
*
*
* @param accepted
* The connection that has been accepted.
* @return true if the connection has been successfully added.
Expand Down Expand Up @@ -566,8 +571,8 @@ public void run() {
*/
private void select() {
try {
// wait for io events.
selector.select();

doSelect();

// process the io events we received
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
Expand Down Expand Up @@ -596,6 +601,77 @@ private void select() {
}
}

/**
* Do select and judge epoll bug happen.
* See : https://issues.apache.org/jira/browse/THRIFT-4251
*/
private void doSelect() throws IOException {
long beforeSelect = System.currentTimeMillis();
int selectedNums = selector.select();
long afterSelect = System.currentTimeMillis();

if (selectedNums == 0) {
jvmBug++;
} else {
jvmBug = 0;
}

long selectedTime = afterSelect - beforeSelect;
if (selectedTime >= MONITOR_PERIOD) {
jvmBug = 0;
} else if (jvmBug > SELECTOR_AUTO_REBUILD_THRESHOLD) {
LOGGER.warn("In {} ms happen {} times jvm bug; rebuilding selector.", MONITOR_PERIOD, jvmBug);
rebuildSelector();
selector.selectNow();
jvmBug = 0;
}

}

/**
* Replaces the current Selector of this SelectorThread with newly created Selector to work
* around the infamous epoll 100% CPU bug.
*/
private synchronized void rebuildSelector() {
final Selector oldSelector = selector;
if (oldSelector == null) {
return;
}
Selector newSelector = null;
try {
newSelector = Selector.open();
LOGGER.warn("Created new Selector.");
} catch (IOException e) {
LOGGER.error("Create new Selector error.", e);
}

for (SelectionKey key : oldSelector.selectedKeys()) {
if (!key.isValid() && key.readyOps() == 0)
continue;
SelectableChannel channel = key.channel();
Object attachment = key.attachment();

try {
if (attachment == null) {
channel.register(newSelector, key.readyOps());
} else {
channel.register(newSelector, key.readyOps(), attachment);
}
} catch (ClosedChannelException e) {
LOGGER.error("Register new selector key error.", e);
}

}

selector = newSelector;
try {
oldSelector.close();
} catch (IOException e) {
LOGGER.error("Close old selector error.", e);
}
LOGGER.warn("Replace new selector success.");
}

private void processAcceptedConnections() {
// Register accepted connections
while (!stopped_) {
Expand Down

0 comments on commit 9ffb41d

Please sign in to comment.