Skip to content
This repository has been archived by the owner on Oct 15, 2019. It is now read-only.

Commit

Permalink
Move max frame size off of NiftyClient and into connectSync/connectAs…
Browse files Browse the repository at this point in the history
…ync parameters

This makes it easier to integrate into the swift client configuration settings, which are per client type. The expected max size for a frame will be different depending on the type of service.

Also changing default max frame size to 16 MB instead of 1 MB, as several of our services return more than 1 MB on a regular basis.
  • Loading branch information
andrewcox committed Feb 8, 2013
1 parent 8d5fda8 commit 7e3be79
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 27 deletions.
Expand Up @@ -34,66 +34,52 @@
import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static com.google.common.util.concurrent.MoreExecutors.getExitingExecutorService;
import static java.util.concurrent.Executors.newCachedThreadPool;

public class NiftyClient implements Closeable
{
// 1MB default
private static final int DEFAULT_MAX_FRAME_SIZE = 1048576;
public static final Duration DEFAULT_CONNECT_TIMEOUT = new Duration(2, TimeUnit.SECONDS);
public static final Duration DEFAULT_READ_TIMEOUT = new Duration(2, TimeUnit.SECONDS);
private static final Duration DEFAULT_WRITE_TIMEOUT = new Duration(2, TimeUnit.SECONDS);
private static final int DEFAULT_MAX_FRAME_SIZE = 16777216;

private final NettyClientConfigBuilder configBuilder;
private final ExecutorService bossExecutor;
private final ExecutorService workerExecutor;
private final int maxFrameSize;
private final NioClientSocketChannelFactory channelFactory;
private final InetSocketAddress defaultSocksProxyAddress;
private final ChannelGroup allChannels = new DefaultChannelGroup();
private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();

/**
* Creates a new NiftyClient with defaults : frame size 1MB, 30 secs
* connect and read timeout and cachedThreadPool for bossExecutor and workerExecutor.
* Creates a new NiftyClient with defaults: cachedThreadPool for bossExecutor and workerExecutor
*/
public NiftyClient()
{
this(DEFAULT_MAX_FRAME_SIZE);
}

public NiftyClient(int maxFrameSize)
{
this(new NettyClientConfigBuilder(),
getExitingExecutorService((ThreadPoolExecutor) newCachedThreadPool()),
getExitingExecutorService((ThreadPoolExecutor) newCachedThreadPool()),
maxFrameSize,
null);
newCachedThreadPool(),
newCachedThreadPool(),
null);
}

public NiftyClient(NettyClientConfigBuilder configBuilder,
ExecutorService boss,
ExecutorService worker,
int maxFrameSize)
ExecutorService worker)
{
this(configBuilder, boss, worker, maxFrameSize, null);
this(configBuilder, boss, worker, null);
}

public NiftyClient(
NettyClientConfigBuilder configBuilder,
ExecutorService boss,
ExecutorService worker,
int maxFrameSize,
@Nullable InetSocketAddress defaultSocksProxyAddress)
{
this.configBuilder = configBuilder;
this.bossExecutor = boss;
this.workerExecutor = worker;
this.maxFrameSize = maxFrameSize;
this.defaultSocksProxyAddress = defaultSocksProxyAddress;
this.channelFactory = new NioClientSocketChannelFactory(boss, worker);
}
Expand All @@ -105,19 +91,22 @@ public <T extends NiftyClientChannel> ListenableFuture<T> connectAsync(
DEFAULT_CONNECT_TIMEOUT,
DEFAULT_READ_TIMEOUT,
DEFAULT_WRITE_TIMEOUT,
DEFAULT_MAX_FRAME_SIZE,
defaultSocksProxyAddress);
}

public <T extends NiftyClientChannel> ListenableFuture<T> connectAsync(
NiftyClientConnector<T> clientChannelConnector,
Duration connectTimeout,
Duration receiveTimeout,
Duration sendTimeout)
Duration sendTimeout,
int maxFrameSize)
{
return connectAsync(clientChannelConnector,
connectTimeout,
receiveTimeout,
sendTimeout,
maxFrameSize,
defaultSocksProxyAddress);
}

Expand All @@ -126,6 +115,7 @@ public <T extends NiftyClientChannel> ListenableFuture<T> connectAsync(
Duration connectTimeout,
Duration receiveTimeout,
Duration sendTimeout,
int maxFrameSize,
@Nullable InetSocketAddress socksProxyAddress)
{
ClientBootstrap bootstrap = createClientBootstrap(socksProxyAddress);
Expand Down Expand Up @@ -160,24 +150,26 @@ public void operationComplete(ChannelFuture future) throws Exception {
public TNiftyClientTransport connectSync(InetSocketAddress addr)
throws TTransportException, InterruptedException
{
return connectSync(addr, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT, DEFAULT_WRITE_TIMEOUT);
return connectSync(addr, DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT, DEFAULT_WRITE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE);
}

public TNiftyClientTransport connectSync(
InetSocketAddress addr,
Duration connectTimeout,
Duration receiveTimeout,
Duration sendTimeout)
Duration sendTimeout,
int maxFrameSize)
throws TTransportException, InterruptedException
{
return connectSync(addr, connectTimeout, receiveTimeout, sendTimeout, defaultSocksProxyAddress);
return connectSync(addr, connectTimeout, receiveTimeout, sendTimeout, maxFrameSize, defaultSocksProxyAddress);
}

public TNiftyClientTransport connectSync(
InetSocketAddress addr,
Duration connectTimeout,
Duration receiveTimeout,
Duration sendTimeout,
int maxFrameSize,
@Nullable InetSocketAddress socksProxyAddress)
throws TTransportException, InterruptedException
{
Expand Down
Expand Up @@ -39,6 +39,7 @@ public class TestNiftyClientTimeout
private static final Duration TEST_CONNECT_TIMEOUT = new Duration(500, TimeUnit.MILLISECONDS);
private static final Duration TEST_READ_TIMEOUT = new Duration(500, TimeUnit.MILLISECONDS);
private static final Duration TEST_WRITE_TIMEOUT = new Duration(500, TimeUnit.MILLISECONDS);
private static final int TEST_MAX_FRAME_SIZE = 16777216;

@BeforeTest(alwaysRun = true)
public void init() {
Expand Down Expand Up @@ -67,7 +68,8 @@ public void testSyncConnectTimeout() throws ConnectException, IOException
client.connectSync(new InetSocketAddress(port),
TEST_CONNECT_TIMEOUT,
TEST_READ_TIMEOUT,
TEST_WRITE_TIMEOUT);
TEST_WRITE_TIMEOUT,
TEST_MAX_FRAME_SIZE);
}
catch (Throwable throwable) {
if (isTimeoutException(throwable)) {
Expand Down Expand Up @@ -96,7 +98,8 @@ public void testAsyncConnectTimeout() throws IOException
client.connectAsync(new FramedClientConnector(new InetSocketAddress(port)),
TEST_CONNECT_TIMEOUT,
TEST_READ_TIMEOUT,
TEST_WRITE_TIMEOUT);
TEST_WRITE_TIMEOUT,
TEST_MAX_FRAME_SIZE);
// Wait while NiftyClient attempts to connect the channel
future.get();
}
Expand Down

0 comments on commit 7e3be79

Please sign in to comment.