Skip to content
This repository has been archived by the owner on Oct 15, 2019. It is now read-only.

Commit

Permalink
Update the swift code to use the split timeouts for receive and read.
Browse files Browse the repository at this point in the history
Add a read timeout similar to the nifty code. There are now two
timeouts: the receive timeout which is the overall time that a client
is willing to wait for a response and the read timeout which is the
maximum time that the client will wait for the server to make
progress.

And a couple of fixes to ensure that null values for any of the
timeouts will not cause a NPE somewhere down the line.
  • Loading branch information
hgschmie committed Nov 22, 2013
1 parent b789f8d commit 6f2f638
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 36 deletions.
11 changes: 11 additions & 0 deletions CHANGES.md
@@ -0,0 +1,11 @@
Changes
=======

* 0.11.0

- Add a config option (thrift.client.receive-timeout) to ThriftClientConfig
- Change default value for thrift.client.read-timeout from 1 minute to 10 seconds
- Set default value for thrift.client.receive-timeout to 1 minute
- Add constructors to ThriftClientManager that also take a receiveTimeout.


24 changes: 24 additions & 0 deletions NEWS.md
@@ -0,0 +1,24 @@
News
====

* 0.11.0

Add support for Nifty 0.11.0 receiveTimeout.

The nifty code now supports two timeouts:

- receiveTimeout now reflects the amount of time that a client is
willing to wait for the server complete a response.

- readTimeout is the amount of a time that can pass without the server
sending any data.

Before 0.11.0, readTimeout had the semantics of receiveTimeout.

The default value for readTimeout changed from 1 minute to 10 seconds.

If a client configuration sets thrift.client.read-timeout, this setting
must be changed to be thrift.client.receive-timeout.

See the Nifty CHANGES.md file for more details.

Expand Up @@ -33,6 +33,7 @@ public class ThriftClient<T>
private final Class<T> clientType;
private final String clientName;
private final Duration connectTimeout;
private final Duration receiveTimeout;
private final Duration readTimeout;
private final Duration writeTimeout;

Expand Down Expand Up @@ -72,6 +73,7 @@ public ThriftClient(
this.clientName = clientName;
this.eventHandlers = eventHandlers;
connectTimeout = clientConfig.getConnectTimeout();
receiveTimeout = clientConfig.getReceiveTimeout();
readTimeout = clientConfig.getReadTimeout();
writeTimeout = clientConfig.getWriteTimeout();
socksProxy = clientConfig.getSocksProxy();
Expand All @@ -96,6 +98,12 @@ public String getConnectTimeout()
return connectTimeout.toString();
}

@Managed
public String getReceiveTimeout()
{
return receiveTimeout.toString();
}

@Managed
public String getReadTimeout()
{
Expand Down Expand Up @@ -134,6 +142,7 @@ public ListenableFuture<T> open(NiftyClientConnector<? extends NiftyClientChanne
connector,
clientType,
connectTimeout,
receiveTimeout,
readTimeout,
writeTimeout,
maxFrameSize,
Expand Down
Expand Up @@ -27,13 +27,15 @@
public class ThriftClientConfig
{
public static final Duration DEFAULT_CONNECT_TIMEOUT = new Duration(500, TimeUnit.MILLISECONDS);
public static final Duration DEFAULT_READ_TIMEOUT = new Duration(1, TimeUnit.MINUTES);
public static final Duration DEFAULT_RECEIVE_TIMEOUT = new Duration(1, TimeUnit.MINUTES);
public static final Duration DEFAULT_READ_TIMEOUT = new Duration(10, TimeUnit.SECONDS);
public static final Duration DEFAULT_WRITE_TIMEOUT = new Duration(1, TimeUnit.MINUTES);
// Default max frame size of 16 MB
public static final int DEFAULT_MAX_FRAME_SIZE = 16777216;

private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
private Duration connectTimeout = DEFAULT_CONNECT_TIMEOUT;
private Duration receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
private Duration readTimeout = DEFAULT_READ_TIMEOUT;
private Duration writeTimeout = DEFAULT_WRITE_TIMEOUT;
private HostAndPort socksProxy;
Expand All @@ -51,6 +53,19 @@ public ThriftClientConfig setConnectTimeout(Duration connectTimeout)
return this;
}

@MinDuration("1ms")
public Duration getReceiveTimeout()
{
return receiveTimeout;
}

@Config("thrift.client.receive-timeout")
public ThriftClientConfig setReceiveTimeout(Duration receiveTimeout)
{
this.receiveTimeout = receiveTimeout;
return this;
}

@MinDuration("1ms")
public Duration getReadTimeout()
{
Expand Down
Expand Up @@ -43,8 +43,6 @@
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolException;
import org.apache.thrift.transport.TMemoryBuffer;
import org.apache.thrift.transport.TMemoryInputTransport;
import org.apache.thrift.transport.TTransportException;
import org.jboss.netty.channel.Channel;

Expand All @@ -62,12 +60,16 @@
import javax.annotation.Nullable;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.Immutable;
import javax.validation.constraints.NotNull;

import static com.facebook.nifty.duplex.TTransportPair.fromSeparateTransports;
import static com.facebook.swift.service.ThriftClientConfig.DEFAULT_CONNECT_TIMEOUT;
import static com.facebook.swift.service.ThriftClientConfig.DEFAULT_MAX_FRAME_SIZE;
import static com.facebook.swift.service.ThriftClientConfig.DEFAULT_READ_TIMEOUT;
import static com.facebook.swift.service.ThriftClientConfig.DEFAULT_RECEIVE_TIMEOUT;
import static com.facebook.swift.service.ThriftClientConfig.DEFAULT_WRITE_TIMEOUT;
import static com.facebook.swift.service.ThriftClientConfig.DEFAULT_MAX_FRAME_SIZE;
import static com.google.common.base.Preconditions.checkNotNull;

import static org.apache.thrift.TApplicationException.UNKNOWN_METHOD;

public class ThriftClientManager implements Closeable
Expand All @@ -87,6 +89,7 @@ public ThriftClientMetadata load(TypeAndName typeAndName)
return new ThriftClientMetadata(typeAndName.getType(), typeAndName.getName(), codecManager);
}
});

private final Set<ThriftClientEventHandler> globalEventHandlers;

public ThriftClientManager()
Expand All @@ -102,9 +105,9 @@ public ThriftClientManager(ThriftCodecManager codecManager)
@Inject
public ThriftClientManager(ThriftCodecManager codecManager, NiftyClient niftyClient, Set<ThriftClientEventHandler> globalEventHandlers)
{
this.codecManager = codecManager;
this.niftyClient = niftyClient;
this.globalEventHandlers = globalEventHandlers;
this.codecManager = checkNotNull(codecManager, "codecManager is null");
this.niftyClient = checkNotNull(niftyClient, "niftyClient is null");
this.globalEventHandlers = checkNotNull(globalEventHandlers, "globalEventHandlers is null");
}

public <T, C extends NiftyClientChannel> ListenableFuture<T> createClient(
Expand All @@ -115,6 +118,7 @@ public <T, C extends NiftyClientChannel> ListenableFuture<T> createClient(
connector,
type,
DEFAULT_CONNECT_TIMEOUT,
DEFAULT_RECEIVE_TIMEOUT,
DEFAULT_READ_TIMEOUT,
DEFAULT_WRITE_TIMEOUT,
DEFAULT_MAX_FRAME_SIZE,
Expand All @@ -123,39 +127,72 @@ public <T, C extends NiftyClientChannel> ListenableFuture<T> createClient(
null);
}

/**
* @deprecated Use {@link ThriftClientManager#createClient(NiftyClientConnector, Class, Duration, Duration, Duration, Duration, int, String, List, HostAndPort)}.
*/
@Deprecated
public <T, C extends NiftyClientChannel> ListenableFuture<T> createClient(
final NiftyClientConnector<C> connector,
final Class<T> type,
@Nullable final Duration connectTimeout,
@Nullable final Duration readTimeout,
@Nullable final Duration writeTimeout,
final int maxFrameSize,
@Nullable final String clientName,
final List<? extends ThriftClientEventHandler> eventHandlers,
@Nullable HostAndPort socksProxy)
{
return createClient(
connector,
type,
connectTimeout,
readTimeout,
readTimeout,
writeTimeout,
maxFrameSize,
clientName,
eventHandlers,
socksProxy);
}

public <T, C extends NiftyClientChannel> ListenableFuture<T> createClient(
final NiftyClientConnector<C> connector,
final Class<T> type,
final Duration connectTimeout,
final Duration readTimeout,
final Duration writeTimeout,
@Nullable final Duration connectTimeout,
@Nullable final Duration receiveTimeout,
@Nullable final Duration readTimeout,
@Nullable final Duration writeTimeout,
final int maxFrameSize,
final String clientName,
@Nullable final String clientName,
final List<? extends ThriftClientEventHandler> eventHandlers,
HostAndPort socksProxy)
@Nullable HostAndPort socksProxy)
{
InetSocketAddress socksProxyAddress = this.toSocksProxyAddress(socksProxy);
checkNotNull(connector, "connector is null");
checkNotNull(type, "type is null");
checkNotNull(eventHandlers, "eventHandlers is null");

InetSocketAddress socksProxyAddress = toSocksProxyAddress(socksProxy);
final ListenableFuture<C> connectFuture = niftyClient.connectAsync(
connector,
connectTimeout,
receiveTimeout,
readTimeout,
writeTimeout,
maxFrameSize,
socksProxyAddress);

ListenableFuture<T> clientFuture = Futures.transform(connectFuture, new Function<C, T>() {
@Nullable
@Override
public T apply(@Nullable C channel)
public T apply(@NotNull C channel)
{
try {
if (readTimeout.toMillis() > 0) {
channel.setReceiveTimeout(readTimeout);
}
if (writeTimeout.toMillis() > 0) {
channel.setSendTimeout(writeTimeout);
}
channel.setReceiveTimeout(receiveTimeout);
channel.setReadTimeout(readTimeout);
channel.setSendTimeout(writeTimeout);

String name = Strings.isNullOrEmpty(clientName) ? connector.toString() : clientName;
String name = Strings.isNullOrEmpty(clientName) ? connector.toString() : clientName;

try {
return createClient(channel, type, name, eventHandlers);
}
catch (Throwable t) {
Expand All @@ -182,6 +219,11 @@ public <T> T createClient(NiftyClientChannel channel, Class<T> type, List<? exte

public <T> T createClient(NiftyClientChannel channel, Class<T> type, String name, List<? extends ThriftClientEventHandler> eventHandlers)
{
checkNotNull(channel, "channel is null");
checkNotNull(type, "type is null");
checkNotNull(name, "name is null");
checkNotNull(eventHandlers, "eventHandlers is null");

ThriftClientMetadata clientMetadata = clientMetadataCache.getUnchecked(new TypeAndName(type, name));

String clientDescription = clientMetadata.getName() + " " + channel.toString();
Expand All @@ -197,12 +239,7 @@ public <T> T createClient(NiftyClientChannel channel, Class<T> type, String name
));
}

private InetSocketAddress toInetSocketAddress(HostAndPort hostAndPort)
{
return new InetSocketAddress(hostAndPort.getHostText(), hostAndPort.getPort());
}

private InetSocketAddress toSocksProxyAddress(HostAndPort socksProxy)
private static InetSocketAddress toSocksProxyAddress(HostAndPort socksProxy)
{
if (socksProxy == null) {
return null;
Expand Down
Expand Up @@ -39,7 +39,8 @@ public void testDefaults()
{
ConfigAssertions.assertRecordedDefaults(ConfigAssertions.recordDefaults(ThriftClientConfig.class)
.setConnectTimeout(Duration.valueOf("500ms"))
.setReadTimeout(Duration.valueOf("1m"))
.setReceiveTimeout(Duration.valueOf("1m"))
.setReadTimeout(Duration.valueOf("10s"))
.setWriteTimeout(Duration.valueOf("1m"))
.setSocksProxy(null)
.setMaxFrameSize(16777216));
Expand All @@ -50,15 +51,17 @@ public void testExplicitPropertyMappings()
{
Map<String, String> properties = new ImmutableMap.Builder<String, String>()
.put("thrift.client.connect-timeout", "10s")
.put("thrift.client.read-timeout", "1s")
.put("thrift.client.receive-timeout", "1d")
.put("thrift.client.read-timeout", "10h")
.put("thrift.client.write-timeout", "1s")
.put("thrift.client.socks-proxy", "localhost:8080")
.put("thrift.client.max-frame-size", "200")
.build();

ThriftClientConfig expected = new ThriftClientConfig()
.setConnectTimeout(Duration.valueOf("10s"))
.setReadTimeout(Duration.valueOf("1s"))
.setReceiveTimeout(Duration.valueOf("1d"))
.setReadTimeout(Duration.valueOf("10h"))
.setWriteTimeout(Duration.valueOf("1s"))
.setSocksProxy(HostAndPort.fromParts("localhost", 8080))
.setMaxFrameSize(200);
Expand Down
Expand Up @@ -18,14 +18,12 @@
import com.facebook.swift.codec.ThriftCodecManager;
import com.facebook.swift.service.ThriftClientManager;
import com.facebook.swift.service.ThriftServer;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.units.Duration;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
Expand Down
Expand Up @@ -18,9 +18,7 @@
import com.facebook.swift.codec.ThriftCodecManager;
import com.facebook.swift.service.ThriftClientManager;
import com.facebook.swift.service.ThriftServer;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.thrift.TException;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Slf4JLoggerFactory;
import org.testng.annotations.AfterMethod;
Expand All @@ -29,7 +27,6 @@

import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static com.google.common.collect.Lists.newArrayList;
Expand Down
Expand Up @@ -66,6 +66,7 @@ protected <T> ListenableFuture<T> createClient(Class<T> clientClass, int serverP
{
HostAndPort address = HostAndPort.fromParts("localhost", serverPort);
ThriftClientConfig config = new ThriftClientConfig().setConnectTimeout(new Duration(1, TimeUnit.SECONDS))
.setReceiveTimeout(new Duration(10, TimeUnit.SECONDS))
.setReadTimeout(new Duration(1, TimeUnit.SECONDS))
.setWriteTimeout(new Duration(1, TimeUnit.SECONDS));
FramedClientConnector connector = new FramedClientConnector(address) {
Expand All @@ -89,6 +90,7 @@ protected <T> ListenableFuture<T> createHttpClient(Class<T> clientClass, int ser
throws TTransportException, InterruptedException, ExecutionException
{
ThriftClientConfig config = new ThriftClientConfig().setConnectTimeout(new Duration(1, TimeUnit.SECONDS))
.setReceiveTimeout(new Duration(10, TimeUnit.SECONDS))
.setReadTimeout(new Duration(1, TimeUnit.SECONDS))
.setWriteTimeout(new Duration(1, TimeUnit.SECONDS));
HttpClientConnector connector =
Expand Down
Expand Up @@ -262,6 +262,7 @@ public void testThriftClientWithConfiguration()

ImmutableMap<String, String> configMap = new ImmutableMap.Builder<String, String>()
.put("scribe.thrift.client.connect-timeout", TEST_MEDIUM_TIMEOUT)
.put("scribe.thrift.client.receive-timeout", TEST_MEDIUM_TIMEOUT)
.put("scribe.thrift.client.read-timeout", TEST_SHORT_TIMEOUT)
.put("scribe.thrift.client.max-frame-size", Integer.toString(SIXTEEN_MB_IN_BYTES))
.put("PumaReadService.thrift.client.write-timeout", TEST_LONG_TIMEOUT)
Expand All @@ -284,6 +285,7 @@ public void configure(Binder binder)

ThriftClient<Scribe> scribeClient = injector.getInstance(Key.get(new TypeLiteral<ThriftClient<Scribe>>() {}));
assertEquals(Duration.valueOf(scribeClient.getConnectTimeout()), Duration.valueOf(TEST_MEDIUM_TIMEOUT));
assertEquals(Duration.valueOf(scribeClient.getReceiveTimeout()), Duration.valueOf(TEST_MEDIUM_TIMEOUT));
assertEquals(Duration.valueOf(scribeClient.getReadTimeout()), Duration.valueOf(TEST_SHORT_TIMEOUT));
assertEquals(Duration.valueOf(scribeClient.getWriteTimeout()), ThriftClientConfig.DEFAULT_WRITE_TIMEOUT);
assertEquals(scribeClient.getMaxFrameSize(), SIXTEEN_MB_IN_BYTES);
Expand All @@ -292,6 +294,7 @@ public void configure(Binder binder)
ThriftClient<PumaReadService> pumaClient = injector.getInstance(Key.get(new TypeLiteral<ThriftClient<PumaReadService>>() {}));
assertEquals(Duration.valueOf(pumaClient.getConnectTimeout()), ThriftClientConfig.DEFAULT_CONNECT_TIMEOUT);
assertEquals(Duration.valueOf(pumaClient.getReadTimeout()), ThriftClientConfig.DEFAULT_READ_TIMEOUT);
assertEquals(Duration.valueOf(pumaClient.getReceiveTimeout()), ThriftClientConfig.DEFAULT_RECEIVE_TIMEOUT);
assertEquals(Duration.valueOf(pumaClient.getWriteTimeout()), Duration.valueOf(TEST_LONG_TIMEOUT));
assertEquals(pumaClient.getMaxFrameSize(), ThriftClientConfig.DEFAULT_MAX_FRAME_SIZE);
assertEquals(HostAndPort.fromString(pumaClient.getSocksProxy()), proxy);
Expand Down
Expand Up @@ -85,6 +85,7 @@ private Scribe createScribeClient(
throws ExecutionException, InterruptedException, TException
{
ThriftClientConfig config = new ThriftClientConfig().setConnectTimeout(Duration.valueOf("1s"))
.setReceiveTimeout(Duration.valueOf("10s"))
.setReadTimeout(Duration.valueOf("1s"))
.setWriteTimeout(Duration.valueOf("1s"));
ThriftClient<Scribe> thriftClient = new ThriftClient<>(manager, Scribe.class, config, "ScribeClient");
Expand Down

0 comments on commit 6f2f638

Please sign in to comment.