Skip to content

Commit

Permalink
THRIFT-5494 fix cpu full caused by infinite select() when frameSize <…
Browse files Browse the repository at this point in the history
… maxReadBufferBytes but readBufferBytesAllocated.get() + frameSize always greater than MAX_READ_BUFFER_BYTES

Client: Java
Patch: wangfan <wangfan8@xiaomi.com>

This closes #2533
  • Loading branch information
wangfan authored and Jens-G committed Mar 6, 2022
1 parent b8920b0 commit 66ac7b4
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,10 @@ public boolean read() {

// if this frame will always be too large for this server, log the
// error and close the connection.
if (frameSize > MAX_READ_BUFFER_BYTES) {
if (frameSize > trans_.getMaxFrameSize()) {
LOGGER.error("Read a frame size of " + frameSize
+ ", which is bigger than the maximum allowable buffer size for ALL connections.");
+ ", which is bigger than the maximum allowable frame size "
+ trans_.getMaxFrameSize() + " for ALL connections.");
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,20 @@ public abstract class TEndpointTransport extends TTransport{

protected long getMaxMessageSize() { return getConfiguration().getMaxMessageSize(); }

public int getMaxFrameSize() { return getConfiguration().getMaxFrameSize(); }

public void setMaxFrameSize(int maxFrameSize) { getConfiguration().setMaxFrameSize(maxFrameSize); }

protected long knownMessageSize;
protected long remainingMessageSize;

private TConfiguration _configuration;

public TConfiguration getConfiguration() {
return _configuration;
}

public TEndpointTransport( TConfiguration config) throws TTransportException {
public TEndpointTransport(TConfiguration config) throws TTransportException {
_configuration = Objects.isNull(config) ? new TConfiguration() : config;

resetConsumedMessageSize(-1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

import org.apache.thrift.TConfiguration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -54,6 +56,11 @@ public class TNonblockingServerSocket extends TNonblockingServerTransport {
*/
private int clientTimeout_ = 0;

/**
* Limit for client sockets request size
*/
private int maxFrameSize_ = 0;

public static class NonblockingAbstractServerSocketArgs extends
AbstractServerTransportArgs<NonblockingAbstractServerSocketArgs> {}

Expand All @@ -68,19 +75,28 @@ public TNonblockingServerSocket(int port) throws TTransportException {
* Creates just a port listening server socket
*/
public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException {
this(new NonblockingAbstractServerSocketArgs().port(port).clientTimeout(clientTimeout));
this(port, clientTimeout, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
}

public TNonblockingServerSocket(int port, int clientTimeout, int maxFrameSize) throws TTransportException {
this(new NonblockingAbstractServerSocketArgs().port(port).clientTimeout(clientTimeout).maxFrameSize(maxFrameSize));
}

public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException {
this(bindAddr, 0);
}

public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
this(new NonblockingAbstractServerSocketArgs().bindAddr(bindAddr).clientTimeout(clientTimeout));
this(bindAddr, clientTimeout, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
}

public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout, int maxFrameSize) throws TTransportException {
this(new NonblockingAbstractServerSocketArgs().bindAddr(bindAddr).clientTimeout(clientTimeout).maxFrameSize(maxFrameSize));
}

public TNonblockingServerSocket(NonblockingAbstractServerSocketArgs args) throws TTransportException {
clientTimeout_ = args.clientTimeout;
maxFrameSize_ = args.maxFrameSize;
try {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
Expand Down Expand Up @@ -121,6 +137,7 @@ public TNonblockingSocket accept() throws TTransportException {

TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
tsocket.setTimeout(clientTimeout_);
tsocket.setMaxFrameSize(maxFrameSize_);
return tsocket;
} catch (IOException iox) {
throw new TTransportException(iox);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.Closeable;
import java.net.InetSocketAddress;

import org.apache.thrift.TConfiguration;

/**
* Server transport. Object which provides client transports.
*
Expand All @@ -32,6 +34,7 @@ public static abstract class AbstractServerTransportArgs<T extends AbstractServe
int backlog = 0; // A value of 0 means the default value will be used (currently set at 50)
int clientTimeout = 0;
InetSocketAddress bindAddr;
int maxFrameSize = TConfiguration.DEFAULT_MAX_FRAME_SIZE;

public T backlog(int backlog) {
this.backlog = backlog;
Expand All @@ -52,6 +55,11 @@ public T bindAddr(InetSocketAddress bindAddr) {
this.bindAddr = bindAddr;
return (T) this;
}

public T maxFrameSize(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
return (T) this;
}
}

public abstract void listen() throws TTransportException;
Expand Down

0 comments on commit 66ac7b4

Please sign in to comment.