Skip to content
Permalink
Browse files

Ensure `TcpConnector#connect` does not emit error after success (#967)

__Motivation__

`TcpConnector#connect` eagerly terminates the `Single` when the channel is registered. It also terminates the `Single` with failure if connect fails.
This violates the contract of the `Subscriber` that there should only be a single termination signal.

__Modification__

As we have to complete the returned `Single` with the channel as soon as the channel is registered to ensure we do not miss channel events and we also have to propagate connect failures, we can not abide by the `Single<Channel>` contract. Instead of returning `Single<Channel>` now returning `Single<Connection>` and do the translation of `Channel` -> `Connection` using a supplied `Function`.

__Result__

`TcpConnector#connect` abides by the `Subscriber` contract.
  • Loading branch information
NiteshKant committed Mar 17, 2020
1 parent e5f4c56 commit c4d421fc6f4339c3fc3ac2da75da5962f5df9d0d
@@ -62,8 +62,8 @@
final ReadOnlyTcpClientConfig roTcpClientConfig = config.tcpConfig();
// We disable auto read by default so we can handle stuff in the ConnectionFilter before we accept any content.
// In case ALPN negotiates h2, h2 connection MUST enable auto read for its Channel.
return TcpConnector.connect(null, resolvedAddress, roTcpClientConfig, false, executionContext)
.flatMap(this::createConnection);
return TcpConnector.connect(null, resolvedAddress, roTcpClientConfig, false,
executionContext, this::createConnection);
}

private Single<FilterableStreamingHttpConnection> createConnection(final Channel channel) {
@@ -55,8 +55,8 @@
// This state is read only, so safe to keep a copy across Subscribers
final ReadOnlyTcpClientConfig roTcpClientConfig = config.tcpConfig();
// Auto read is required for h2
return TcpConnector.connect(null, resolvedAddress, roTcpClientConfig, true, executionContext)
.flatMap(channel -> H2ClientParentConnectionContext.initChannel(channel,
return TcpConnector.connect(null, resolvedAddress, roTcpClientConfig, true,
executionContext, channel -> H2ClientParentConnectionContext.initChannel(channel,
executionContext.bufferAllocator(), executionContext.executor(),
config.h2Config(), reqRespFactory, roTcpClientConfig.flushStrategy(),
roTcpClientConfig.idleTimeoutMs(), executionContext.executionStrategy(),
@@ -42,8 +42,8 @@ private StreamingConnectionFactory() {
final HttpExecutionContext executionContext, final ResolvedAddress resolvedAddress,
final ReadOnlyHttpClientConfig roConfig) {
// We disable auto read so we can handle stuff in the ConnectionFilter before we accept any content.
return TcpConnector.connect(null, resolvedAddress, roConfig.tcpConfig(), false, executionContext)
.flatMap(channel -> createConnection(channel, executionContext, roConfig,
return TcpConnector.connect(null, resolvedAddress, roConfig.tcpConfig(), false,
executionContext, channel -> createConnection(channel, executionContext, roConfig,
new TcpClientChannelInitializer(roConfig.tcpConfig(), roConfig.hasProxy())));
}

@@ -426,28 +426,33 @@ public void protocolPayloadEndOutboundShouldNotTriggerOnFailedFlush() throws Exc
assert cConfig.h1Config() != null;

NettyConnection<Object, Object> conn = resources.prepend(
TcpConnector.connect(null, serverHostAndPort(serverContext), cConfig.tcpConfig(), false, CEC)
.flatMap(channel -> {
CloseHandler closeHandler = spy(forPipelinedRequestResponse(true, channel.config()));
closeHandlerRef.compareAndSet(null, closeHandler);
return DefaultNettyConnection.initChannel(channel, CEC.bufferAllocator(), CEC.executor(),
new TerminalPredicate<>(o -> o instanceof HttpHeaders), closeHandler, defaultFlushStrategy(),
null, new TcpClientChannelInitializer(cConfig.tcpConfig())
.andThen(new HttpClientChannelInitializer(getByteBufAllocator(CEC.bufferAllocator()),
cConfig.h1Config(), closeHandler))
.andThen(channel2 -> channel2.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
// Propagate the user event in the pipeline before triggering the test
// condition.
ctx.fireUserEventTriggered(evt);
if (evt instanceof ChannelInputShutdownReadComplete) {
serverCloseTrigger.onComplete();
}
}
})), defaultStrategy(), HTTP_1_1);
}
).toFuture().get());
TcpConnector.connect(null, serverHostAndPort(serverContext), cConfig.tcpConfig(), false,
CEC, channel -> {
CloseHandler closeHandler = spy(forPipelinedRequestResponse(true, channel.config()));
closeHandlerRef.compareAndSet(null, closeHandler);
return DefaultNettyConnection.initChannel(channel, CEC.bufferAllocator(),
CEC.executor(),
new TerminalPredicate<>(o -> o instanceof HttpHeaders), closeHandler,
defaultFlushStrategy(), null,
new TcpClientChannelInitializer(cConfig.tcpConfig())
.andThen(new HttpClientChannelInitializer(
getByteBufAllocator(CEC.bufferAllocator()),
cConfig.h1Config(), closeHandler))
.andThen(channel2 -> channel2.pipeline()
.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx,
Object evt) {
// Propagate the user event in the pipeline before
// triggering the test condition.
ctx.fireUserEventTriggered(evt);
if (evt instanceof ChannelInputShutdownReadComplete) {
serverCloseTrigger.onComplete();
}
}
})), defaultStrategy(), HTTP_1_1);
}
).toFuture().get());

// The server needs to wait to close the conneciton until after the client has established the connection.
serverChannelLatch.await();

0 comments on commit c4d421f

Please sign in to comment.
You can’t perform that action at this time.