From 6f2f6382f1b235e0933000bafaa919a84b410ab8 Mon Sep 17 00:00:00 2001 From: Henning Schmiedehausen Date: Wed, 20 Nov 2013 15:12:07 -0800 Subject: [PATCH] Update the swift code to use the split timeouts for receive and read. Add a read timeout similar to the nifty code. There are now two timeouts: the receive timeout which is the overall time that a client is willing to wait for a response and the read timeout which is the maximum time that the client will wait for the server to make progress. And a couple of fixes to ensure that null values for any of the timeouts will not cause a NPE somewhere down the line. --- CHANGES.md | 11 +++ NEWS.md | 24 +++++ .../facebook/swift/service/ThriftClient.java | 9 ++ .../swift/service/ThriftClientConfig.java | 17 +++- .../swift/service/ThriftClientManager.java | 91 +++++++++++++------ .../swift/service/TestThriftClientConfig.java | 9 +- .../swift/service/async/AsyncClientTest.java | 2 - .../swift/service/async/AsyncService.java | 3 - .../swift/service/async/AsyncTestBase.java | 2 + .../TestThriftClientAndServerModules.java | 3 + .../service/protocol/TestClientProtocols.java | 1 + 11 files changed, 136 insertions(+), 36 deletions(-) create mode 100644 CHANGES.md create mode 100644 NEWS.md diff --git a/CHANGES.md b/CHANGES.md new file mode 100644 index 00000000..b796dcb7 --- /dev/null +++ b/CHANGES.md @@ -0,0 +1,11 @@ +Changes +======= + +* 0.11.0 + +- Add a config option (thrift.client.receive-timeout) to ThriftClientConfig +- Change default value for thrift.client.read-timeout from 1 minute to 10 seconds +- Set default value for thrift.client.receive-timeout to 1 minute +- Add constructors to ThriftClientManager that also take a receiveTimeout. + + diff --git a/NEWS.md b/NEWS.md new file mode 100644 index 00000000..8ea33c0f --- /dev/null +++ b/NEWS.md @@ -0,0 +1,24 @@ +News +==== + +* 0.11.0 + +Add support for Nifty 0.11.0 receiveTimeout. + +The nifty code now supports two timeouts: + +- receiveTimeout now reflects the amount of time that a client is + willing to wait for the server complete a response. + +- readTimeout is the amount of a time that can pass without the server + sending any data. + +Before 0.11.0, readTimeout had the semantics of receiveTimeout. + +The default value for readTimeout changed from 1 minute to 10 seconds. + +If a client configuration sets thrift.client.read-timeout, this setting +must be changed to be thrift.client.receive-timeout. + +See the Nifty CHANGES.md file for more details. + diff --git a/swift-service/src/main/java/com/facebook/swift/service/ThriftClient.java b/swift-service/src/main/java/com/facebook/swift/service/ThriftClient.java index 0e1390d2..a6452a7d 100644 --- a/swift-service/src/main/java/com/facebook/swift/service/ThriftClient.java +++ b/swift-service/src/main/java/com/facebook/swift/service/ThriftClient.java @@ -33,6 +33,7 @@ public class ThriftClient private final Class clientType; private final String clientName; private final Duration connectTimeout; + private final Duration receiveTimeout; private final Duration readTimeout; private final Duration writeTimeout; @@ -72,6 +73,7 @@ public ThriftClient( this.clientName = clientName; this.eventHandlers = eventHandlers; connectTimeout = clientConfig.getConnectTimeout(); + receiveTimeout = clientConfig.getReceiveTimeout(); readTimeout = clientConfig.getReadTimeout(); writeTimeout = clientConfig.getWriteTimeout(); socksProxy = clientConfig.getSocksProxy(); @@ -96,6 +98,12 @@ public String getConnectTimeout() return connectTimeout.toString(); } + @Managed + public String getReceiveTimeout() + { + return receiveTimeout.toString(); + } + @Managed public String getReadTimeout() { @@ -134,6 +142,7 @@ public ListenableFuture open(NiftyClientConnector globalEventHandlers; public ThriftClientManager() @@ -102,9 +105,9 @@ public ThriftClientManager(ThriftCodecManager codecManager) @Inject public ThriftClientManager(ThriftCodecManager codecManager, NiftyClient niftyClient, Set globalEventHandlers) { - this.codecManager = codecManager; - this.niftyClient = niftyClient; - this.globalEventHandlers = globalEventHandlers; + this.codecManager = checkNotNull(codecManager, "codecManager is null"); + this.niftyClient = checkNotNull(niftyClient, "niftyClient is null"); + this.globalEventHandlers = checkNotNull(globalEventHandlers, "globalEventHandlers is null"); } public ListenableFuture createClient( @@ -115,6 +118,7 @@ public ListenableFuture createClient( connector, type, DEFAULT_CONNECT_TIMEOUT, + DEFAULT_RECEIVE_TIMEOUT, DEFAULT_READ_TIMEOUT, DEFAULT_WRITE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, @@ -123,39 +127,72 @@ public ListenableFuture createClient( null); } + /** + * @deprecated Use {@link ThriftClientManager#createClient(NiftyClientConnector, Class, Duration, Duration, Duration, Duration, int, String, List, HostAndPort)}. + */ + @Deprecated + public ListenableFuture createClient( + final NiftyClientConnector connector, + final Class type, + @Nullable final Duration connectTimeout, + @Nullable final Duration readTimeout, + @Nullable final Duration writeTimeout, + final int maxFrameSize, + @Nullable final String clientName, + final List eventHandlers, + @Nullable HostAndPort socksProxy) + { + return createClient( + connector, + type, + connectTimeout, + readTimeout, + readTimeout, + writeTimeout, + maxFrameSize, + clientName, + eventHandlers, + socksProxy); + } + public ListenableFuture createClient( final NiftyClientConnector connector, final Class type, - final Duration connectTimeout, - final Duration readTimeout, - final Duration writeTimeout, + @Nullable final Duration connectTimeout, + @Nullable final Duration receiveTimeout, + @Nullable final Duration readTimeout, + @Nullable final Duration writeTimeout, final int maxFrameSize, - final String clientName, + @Nullable final String clientName, final List eventHandlers, - HostAndPort socksProxy) + @Nullable HostAndPort socksProxy) { - InetSocketAddress socksProxyAddress = this.toSocksProxyAddress(socksProxy); + checkNotNull(connector, "connector is null"); + checkNotNull(type, "type is null"); + checkNotNull(eventHandlers, "eventHandlers is null"); + + InetSocketAddress socksProxyAddress = toSocksProxyAddress(socksProxy); final ListenableFuture connectFuture = niftyClient.connectAsync( connector, connectTimeout, + receiveTimeout, readTimeout, writeTimeout, maxFrameSize, socksProxyAddress); + ListenableFuture clientFuture = Futures.transform(connectFuture, new Function() { @Nullable @Override - public T apply(@Nullable C channel) + public T apply(@NotNull C channel) { - try { - if (readTimeout.toMillis() > 0) { - channel.setReceiveTimeout(readTimeout); - } - if (writeTimeout.toMillis() > 0) { - channel.setSendTimeout(writeTimeout); - } + channel.setReceiveTimeout(receiveTimeout); + channel.setReadTimeout(readTimeout); + channel.setSendTimeout(writeTimeout); - String name = Strings.isNullOrEmpty(clientName) ? connector.toString() : clientName; + String name = Strings.isNullOrEmpty(clientName) ? connector.toString() : clientName; + + try { return createClient(channel, type, name, eventHandlers); } catch (Throwable t) { @@ -182,6 +219,11 @@ public T createClient(NiftyClientChannel channel, Class type, List T createClient(NiftyClientChannel channel, Class type, String name, List eventHandlers) { + checkNotNull(channel, "channel is null"); + checkNotNull(type, "type is null"); + checkNotNull(name, "name is null"); + checkNotNull(eventHandlers, "eventHandlers is null"); + ThriftClientMetadata clientMetadata = clientMetadataCache.getUnchecked(new TypeAndName(type, name)); String clientDescription = clientMetadata.getName() + " " + channel.toString(); @@ -197,12 +239,7 @@ public T createClient(NiftyClientChannel channel, Class type, String name )); } - private InetSocketAddress toInetSocketAddress(HostAndPort hostAndPort) - { - return new InetSocketAddress(hostAndPort.getHostText(), hostAndPort.getPort()); - } - - private InetSocketAddress toSocksProxyAddress(HostAndPort socksProxy) + private static InetSocketAddress toSocksProxyAddress(HostAndPort socksProxy) { if (socksProxy == null) { return null; diff --git a/swift-service/src/test/java/com/facebook/swift/service/TestThriftClientConfig.java b/swift-service/src/test/java/com/facebook/swift/service/TestThriftClientConfig.java index 0407f724..be2b7c5b 100644 --- a/swift-service/src/test/java/com/facebook/swift/service/TestThriftClientConfig.java +++ b/swift-service/src/test/java/com/facebook/swift/service/TestThriftClientConfig.java @@ -39,7 +39,8 @@ public void testDefaults() { ConfigAssertions.assertRecordedDefaults(ConfigAssertions.recordDefaults(ThriftClientConfig.class) .setConnectTimeout(Duration.valueOf("500ms")) - .setReadTimeout(Duration.valueOf("1m")) + .setReceiveTimeout(Duration.valueOf("1m")) + .setReadTimeout(Duration.valueOf("10s")) .setWriteTimeout(Duration.valueOf("1m")) .setSocksProxy(null) .setMaxFrameSize(16777216)); @@ -50,7 +51,8 @@ public void testExplicitPropertyMappings() { Map properties = new ImmutableMap.Builder() .put("thrift.client.connect-timeout", "10s") - .put("thrift.client.read-timeout", "1s") + .put("thrift.client.receive-timeout", "1d") + .put("thrift.client.read-timeout", "10h") .put("thrift.client.write-timeout", "1s") .put("thrift.client.socks-proxy", "localhost:8080") .put("thrift.client.max-frame-size", "200") @@ -58,7 +60,8 @@ public void testExplicitPropertyMappings() ThriftClientConfig expected = new ThriftClientConfig() .setConnectTimeout(Duration.valueOf("10s")) - .setReadTimeout(Duration.valueOf("1s")) + .setReceiveTimeout(Duration.valueOf("1d")) + .setReadTimeout(Duration.valueOf("10h")) .setWriteTimeout(Duration.valueOf("1s")) .setSocksProxy(HostAndPort.fromParts("localhost", 8080)) .setMaxFrameSize(200); diff --git a/swift-service/src/test/java/com/facebook/swift/service/async/AsyncClientTest.java b/swift-service/src/test/java/com/facebook/swift/service/async/AsyncClientTest.java index 5397a7c1..b30e7663 100644 --- a/swift-service/src/test/java/com/facebook/swift/service/async/AsyncClientTest.java +++ b/swift-service/src/test/java/com/facebook/swift/service/async/AsyncClientTest.java @@ -18,14 +18,12 @@ import com.facebook.swift.codec.ThriftCodecManager; import com.facebook.swift.service.ThriftClientManager; import com.facebook.swift.service.ThriftServer; -import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; -import io.airlift.units.Duration; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; import org.jboss.netty.handler.timeout.ReadTimeoutException; diff --git a/swift-service/src/test/java/com/facebook/swift/service/async/AsyncService.java b/swift-service/src/test/java/com/facebook/swift/service/async/AsyncService.java index 4e9e2405..ccffd2cb 100644 --- a/swift-service/src/test/java/com/facebook/swift/service/async/AsyncService.java +++ b/swift-service/src/test/java/com/facebook/swift/service/async/AsyncService.java @@ -18,9 +18,7 @@ import com.facebook.swift.codec.ThriftCodecManager; import com.facebook.swift.service.ThriftClientManager; import com.facebook.swift.service.ThriftServer; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.thrift.TException; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.Slf4JLoggerFactory; import org.testng.annotations.AfterMethod; @@ -29,7 +27,6 @@ import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static com.google.common.collect.Lists.newArrayList; diff --git a/swift-service/src/test/java/com/facebook/swift/service/async/AsyncTestBase.java b/swift-service/src/test/java/com/facebook/swift/service/async/AsyncTestBase.java index 65883789..23b41482 100644 --- a/swift-service/src/test/java/com/facebook/swift/service/async/AsyncTestBase.java +++ b/swift-service/src/test/java/com/facebook/swift/service/async/AsyncTestBase.java @@ -66,6 +66,7 @@ protected ListenableFuture createClient(Class clientClass, int serverP { HostAndPort address = HostAndPort.fromParts("localhost", serverPort); ThriftClientConfig config = new ThriftClientConfig().setConnectTimeout(new Duration(1, TimeUnit.SECONDS)) + .setReceiveTimeout(new Duration(10, TimeUnit.SECONDS)) .setReadTimeout(new Duration(1, TimeUnit.SECONDS)) .setWriteTimeout(new Duration(1, TimeUnit.SECONDS)); FramedClientConnector connector = new FramedClientConnector(address) { @@ -89,6 +90,7 @@ protected ListenableFuture createHttpClient(Class clientClass, int ser throws TTransportException, InterruptedException, ExecutionException { ThriftClientConfig config = new ThriftClientConfig().setConnectTimeout(new Duration(1, TimeUnit.SECONDS)) + .setReceiveTimeout(new Duration(10, TimeUnit.SECONDS)) .setReadTimeout(new Duration(1, TimeUnit.SECONDS)) .setWriteTimeout(new Duration(1, TimeUnit.SECONDS)); HttpClientConnector connector = diff --git a/swift-service/src/test/java/com/facebook/swift/service/guice/TestThriftClientAndServerModules.java b/swift-service/src/test/java/com/facebook/swift/service/guice/TestThriftClientAndServerModules.java index bf593013..6c04026c 100644 --- a/swift-service/src/test/java/com/facebook/swift/service/guice/TestThriftClientAndServerModules.java +++ b/swift-service/src/test/java/com/facebook/swift/service/guice/TestThriftClientAndServerModules.java @@ -262,6 +262,7 @@ public void testThriftClientWithConfiguration() ImmutableMap configMap = new ImmutableMap.Builder() .put("scribe.thrift.client.connect-timeout", TEST_MEDIUM_TIMEOUT) + .put("scribe.thrift.client.receive-timeout", TEST_MEDIUM_TIMEOUT) .put("scribe.thrift.client.read-timeout", TEST_SHORT_TIMEOUT) .put("scribe.thrift.client.max-frame-size", Integer.toString(SIXTEEN_MB_IN_BYTES)) .put("PumaReadService.thrift.client.write-timeout", TEST_LONG_TIMEOUT) @@ -284,6 +285,7 @@ public void configure(Binder binder) ThriftClient scribeClient = injector.getInstance(Key.get(new TypeLiteral>() {})); assertEquals(Duration.valueOf(scribeClient.getConnectTimeout()), Duration.valueOf(TEST_MEDIUM_TIMEOUT)); + assertEquals(Duration.valueOf(scribeClient.getReceiveTimeout()), Duration.valueOf(TEST_MEDIUM_TIMEOUT)); assertEquals(Duration.valueOf(scribeClient.getReadTimeout()), Duration.valueOf(TEST_SHORT_TIMEOUT)); assertEquals(Duration.valueOf(scribeClient.getWriteTimeout()), ThriftClientConfig.DEFAULT_WRITE_TIMEOUT); assertEquals(scribeClient.getMaxFrameSize(), SIXTEEN_MB_IN_BYTES); @@ -292,6 +294,7 @@ public void configure(Binder binder) ThriftClient pumaClient = injector.getInstance(Key.get(new TypeLiteral>() {})); assertEquals(Duration.valueOf(pumaClient.getConnectTimeout()), ThriftClientConfig.DEFAULT_CONNECT_TIMEOUT); assertEquals(Duration.valueOf(pumaClient.getReadTimeout()), ThriftClientConfig.DEFAULT_READ_TIMEOUT); + assertEquals(Duration.valueOf(pumaClient.getReceiveTimeout()), ThriftClientConfig.DEFAULT_RECEIVE_TIMEOUT); assertEquals(Duration.valueOf(pumaClient.getWriteTimeout()), Duration.valueOf(TEST_LONG_TIMEOUT)); assertEquals(pumaClient.getMaxFrameSize(), ThriftClientConfig.DEFAULT_MAX_FRAME_SIZE); assertEquals(HostAndPort.fromString(pumaClient.getSocksProxy()), proxy); diff --git a/swift-service/src/test/java/com/facebook/swift/service/protocol/TestClientProtocols.java b/swift-service/src/test/java/com/facebook/swift/service/protocol/TestClientProtocols.java index 8ab8e44c..a0d1f9d1 100644 --- a/swift-service/src/test/java/com/facebook/swift/service/protocol/TestClientProtocols.java +++ b/swift-service/src/test/java/com/facebook/swift/service/protocol/TestClientProtocols.java @@ -85,6 +85,7 @@ private Scribe createScribeClient( throws ExecutionException, InterruptedException, TException { ThriftClientConfig config = new ThriftClientConfig().setConnectTimeout(Duration.valueOf("1s")) + .setReceiveTimeout(Duration.valueOf("10s")) .setReadTimeout(Duration.valueOf("1s")) .setWriteTimeout(Duration.valueOf("1s")); ThriftClient thriftClient = new ThriftClient<>(manager, Scribe.class, config, "ScribeClient");