Skip to content

Commit

Permalink
Client connections on GlobalEventExecutor thread that are not propaga…
Browse files Browse the repository at this point in the history
…ted to the application - fixes #2670
  • Loading branch information
vietj committed Oct 16, 2018
1 parent dfb0b06 commit 1d9debd
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 22 deletions.
Expand Up @@ -204,7 +204,7 @@ private void doConnect(
}
};

channelProvider.connect(client.getVertx(), bootstrap, options.getProxyOptions(), SocketAddress.inetSocketAddress(port, host), channelInitializer, channelHandler);
channelProvider.connect(context, bootstrap, options.getProxyOptions(), SocketAddress.inetSocketAddress(port, host), channelInitializer, channelHandler);
}

private void applyConnectionOptions(Bootstrap bootstrap) {
Expand Down
23 changes: 10 additions & 13 deletions src/main/java/io/vertx/core/impl/VertxImpl.java
Expand Up @@ -91,13 +91,19 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
}

static VertxImpl vertx(VertxOptions options) {
VertxImpl vertx = new VertxImpl(options);
VertxImpl vertx = new VertxImpl(options, Transport.foo(options.getPreferNativeTransport()));
vertx.init();
return vertx;
}

static VertxImpl vertx(VertxOptions options, Transport transport) {
VertxImpl vertx = new VertxImpl(options, transport);
vertx.init();
return vertx;
}

static void clusteredVertx(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
VertxImpl vertx = new VertxImpl(options);
VertxImpl vertx = new VertxImpl(options, Transport.foo(options.getPreferNativeTransport()));
vertx.joinCluster(options, resultHandler);
}

Expand Down Expand Up @@ -130,21 +136,11 @@ static void clusteredVertx(VertxOptions options, Handler<AsyncResult<Vertx>> res
private final CloseHooks closeHooks;
private final Transport transport;

private VertxImpl(VertxOptions options) {
private VertxImpl(VertxOptions options, Transport transport) {
// Sanity check
if (Vertx.currentContext() != null) {
log.warn("You're already on a Vert.x context, are you sure you want to create a new Vertx instance?");
}
if (options.getPreferNativeTransport()) {
Transport nativeTransport = Transport.nativeTransport();
if (nativeTransport != null && nativeTransport.isAvailable()) {
transport = nativeTransport;
} else {
transport = Transport.JDK;
}
} else {
transport = Transport.JDK;
}
closeHooks = new CloseHooks(log);
checker = new BlockedThreadChecker(options.getBlockedThreadCheckInterval(), options.getBlockedThreadCheckIntervalUnit(), options.getWarningExceptionTime(), options.getWarningExceptionTimeUnit());
eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false, options.getMaxEventLoopExecuteTime(), options.getMaxEventLoopExecuteTimeUnit());
Expand All @@ -169,6 +165,7 @@ private VertxImpl(VertxOptions options) {
defaultWorkerMaxExecTime = options.getMaxWorkerExecuteTime();
defaultWorkerMaxExecTimeUnit = options.getMaxWorkerExecuteTimeUnit();

this.transport = transport;
this.fileResolver = new FileResolver(options.getFileSystemOptions());
this.addressResolverOptions = options.getAddressResolverOptions();
this.addressResolver = new AddressResolver(this, options.getAddressResolverOptions());
Expand Down
27 changes: 22 additions & 5 deletions src/main/java/io/vertx/core/net/impl/ChannelProvider.java
Expand Up @@ -15,9 +15,12 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.ProxyOptions;
import io.vertx.core.net.SocketAddress;
Expand All @@ -37,8 +40,22 @@ public class ChannelProvider {
protected ChannelProvider() {
}

public void connect(VertxInternal vertx, Bootstrap bootstrap, ProxyOptions options, SocketAddress remoteAddress,
Handler<Channel> channelInitializer, Handler<AsyncResult<Channel>> channelHandler) {
public final void connect(ContextInternal context, Bootstrap bootstrap, ProxyOptions options, SocketAddress remoteAddress,
Handler<Channel> channelInitializer, Handler<AsyncResult<Channel>> channelHandler) {
doConnect(context, bootstrap, options, remoteAddress, channelInitializer, res -> {
if (Context.isOnEventLoopThread()) {
channelHandler.handle(res);
} else {
// We are on the GlobalEventExecutor
context.nettyEventLoop().execute(() -> channelHandler.handle(res));
}
});
}


public void doConnect(ContextInternal context, Bootstrap bootstrap, ProxyOptions options, SocketAddress remoteAddress,
Handler<Channel> channelInitializer, Handler<AsyncResult<Channel>> channelHandler) {
VertxInternal vertx = context.owner();
bootstrap.resolver(vertx.nettyAddressResolverGroup());
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
Expand All @@ -49,9 +66,9 @@ protected void initChannel(Channel channel) throws Exception {
ChannelFuture fut = bootstrap.connect(vertx.transport().convert(remoteAddress, false));
fut.addListener(res -> {
if (res.isSuccess()) {
channelHandler.handle(Future.succeededFuture(fut.channel()));
channelHandler.handle(io.vertx.core.Future.succeededFuture(fut.channel()));
} else {
channelHandler.handle(Future.failedFuture(res.cause()));
channelHandler.handle(io.vertx.core.Future.failedFuture(res.cause()));
}
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/net/impl/NetClientImpl.java
Expand Up @@ -229,7 +229,7 @@ protected void doConnect(SocketAddress remoteAddress, String serverName, Handler
}
};

channelProvider.connect(vertx, bootstrap, options.getProxyOptions(), remoteAddress, channelInitializer, channelHandler);
channelProvider.connect(context, bootstrap, options.getProxyOptions(), remoteAddress, channelInitializer, channelHandler);
}

private void connected(ContextInternal context, Channel ch, Handler<AsyncResult<NetSocket>> connectHandler, SocketAddress remoteAddress) {
Expand Down
Expand Up @@ -27,6 +27,7 @@
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.ProxyOptions;
import io.vertx.core.net.ProxyType;
Expand All @@ -48,13 +49,14 @@ private ProxyChannelProvider() {
}

@Override
public void connect(VertxInternal vertx,
public void doConnect(ContextInternal context,
Bootstrap bootstrap,
ProxyOptions options,
SocketAddress remoteAddress,
Handler<Channel> channelInitializer,
Handler<AsyncResult<Channel>> channelHandler) {

final VertxInternal vertx = context.owner();
final String proxyHost = options.getHost();
final int proxyPort = options.getPort();
final String proxyUsername = options.getUsername();
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/io/vertx/core/net/impl/transport/Transport.java
Expand Up @@ -39,6 +39,19 @@
*/
public class Transport {

public static Transport foo(boolean preferNative) {
if (preferNative) {
Transport nativeTransport = Transport.nativeTransport();
if (nativeTransport != null && nativeTransport.isAvailable()) {
return nativeTransport;
} else {
return Transport.JDK;
}
} else {
return Transport.JDK;
}
}

/**
* The JDK transport, always there.
*/
Expand Down Expand Up @@ -72,7 +85,7 @@ public static Transport nativeTransport() {
return transport;
}

Transport() {
protected Transport() {
}

/**
Expand Down
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2014 Red Hat, Inc. and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.impl;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.ProxyOptions;
import io.vertx.core.net.ProxyType;
import io.vertx.core.net.impl.transport.Transport;
import io.vertx.test.core.AsyncTestBase;
import org.junit.After;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;

public class GlobalEventExecutorNotificationTest extends AsyncTestBase {

private Vertx vertx;

@After
public void after() throws Exception {
if (vertx != null) {
CountDownLatch latch = new CountDownLatch(1);
vertx.close(v -> latch.countDown());
awaitLatch(latch);
}
}

@Test
public void testConnectErrorNotifiesOnEventLoop() {
testConnectErrorNotifiesOnEventLoop(new NetClientOptions());
}

@Test
public void testConnectToProxyErrorNotifiesOnEventLoop() {
testConnectErrorNotifiesOnEventLoop(new NetClientOptions()
.setProxyOptions(new ProxyOptions()
.setPort(1234)
.setType(ProxyType.SOCKS5)
.setHost("localhost")));
}

private void testConnectErrorNotifiesOnEventLoop(NetClientOptions options) {
RuntimeException cause = new RuntimeException();
vertx = VertxImpl.vertx(new VertxOptions(), new Transport() {
@Override
public ChannelFactory<? extends Channel> channelFactory(boolean domain) {
return new ChannelFactory<Channel>() {
@Override
public Channel newChannel() {
throw cause;
}
};
}
});

vertx.createNetServer().connectHandler(so -> {
fail();
}).listen(1234, "localhost", onSuccess(v -> {
vertx.createNetClient(options).connect(1234, "localhost", onFailure(err -> {
assertSame(err, cause);
testComplete();
}));
}));
await();
}
}

0 comments on commit 1d9debd

Please sign in to comment.