Skip to content

Commit

Permalink
Do lookups on worker thread to avoid blocking event loop
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox authored and vietj committed Apr 8, 2016
1 parent 0a620d2 commit 50f353f
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 92 deletions.
46 changes: 37 additions & 9 deletions src/main/java/io/vertx/core/datagram/impl/DatagramSocketImpl.java
Expand Up @@ -41,11 +41,7 @@
import io.vertx.core.spi.metrics.MetricsProvider; import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.spi.metrics.NetworkMetrics; import io.vertx.core.spi.metrics.NetworkMetrics;


import java.net.InetAddress; import java.net.*;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Objects; import java.util.Objects;


/** /**
Expand Down Expand Up @@ -184,14 +180,30 @@ public synchronized DatagramSocket exceptionHandler(Handler<Throwable> handler)


private DatagramSocket listen(SocketAddress local, Handler<AsyncResult<DatagramSocket>> handler) { private DatagramSocket listen(SocketAddress local, Handler<AsyncResult<DatagramSocket>> handler) {
Objects.requireNonNull(handler, "no null handler accepted"); Objects.requireNonNull(handler, "no null handler accepted");
/*
<<<<<<< HEAD
InetSocketAddress is = new InetSocketAddress(local.host(), local.port()); InetSocketAddress is = new InetSocketAddress(local.host(), local.port());
ChannelFuture future = channel().bind(is); ChannelFuture future = channel().bind(is);
addListener(future, ar -> { addListener(future, ar -> {
if (ar.succeeded()) { if (ar.succeeded()) {
((DatagramSocketMetrics) metrics).listening(local); ((DatagramSocketMetrics) metrics).listening(local);
=======
*/
vertx.resolveAsync(local.host(), res -> {
if (res.succeeded()) {
ChannelFuture future = channel().bind(new InetSocketAddress(res.result(), local.port()));
addListener(future, ar -> {
if (ar.succeeded()) {
((DatagramSocketMetrics) metrics).listening(localAddress());
}
handler.handle(ar);
});
} else {
handler.handle(Future.failedFuture(res.cause()));
//>>>>>>> a2a6fd9... Do lookups on worker thread to avoid blocking event loop
} }
handler.handle(ar);
}); });

return this; return this;
} }


Expand All @@ -217,16 +229,32 @@ public DatagramSocket resume() {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public DatagramSocket send(Buffer packet, int port, String host, Handler<AsyncResult<DatagramSocket>> handler) { public DatagramSocket send(Buffer packet, int port, String host, Handler<AsyncResult<DatagramSocket>> handler) {
Objects.requireNonNull(packet, "no null packet accepted");
Objects.requireNonNull(host, "no null host accepted"); Objects.requireNonNull(host, "no null host accepted");
ChannelFuture future = channel().writeAndFlush(new DatagramPacket(packet.getByteBuf(), new InetSocketAddress(host, port))); InetSocketAddress addr = InetSocketAddress.createUnresolved(host, port);
addListener(future, handler); if (addr.isUnresolved()) {
vertx.resolveAsync(host, res -> {
if (res.succeeded()) {
doSend(packet, new InetSocketAddress(res.result(), port), handler);
} else {
handler.handle(Future.failedFuture(res.cause()));
}
});
} else {
// If it's immediately resolved it means it was just an IP address so no need to async resolve
doSend(packet, addr, handler);
}
if (metrics.isEnabled()) { if (metrics.isEnabled()) {
metrics.bytesWritten(null, new SocketAddressImpl(port, host), packet.length()); metrics.bytesWritten(null, new SocketAddressImpl(port, host), packet.length());
} }

return this; return this;
} }


private void doSend(Buffer packet, InetSocketAddress addr, Handler<AsyncResult<DatagramSocket>> handler) {
ChannelFuture future = channel().writeAndFlush(new DatagramPacket(packet.getByteBuf(), addr));
addListener(future, handler);
}

@Override @Override
public PacketWritestream sender(int port, String host) { public PacketWritestream sender(int port, String host) {
Arguments.requireInRange(port, 0, 65535, "port p must be in range 0 <= p <= 65535"); Arguments.requireInRange(port, 0, 65535, "port p must be in range 0 <= p <= 65535");
Expand Down
@@ -0,0 +1,101 @@
/*
* Copyright 2014 Red Hat, Inc.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/

package io.vertx.core.http.impl;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.impl.VertxInternal;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class AsyncResolveBindConnectHelper<T> implements Handler<AsyncResult<T>> {

private List<Handler<AsyncResult<T>>> handlers = new ArrayList<>();
private boolean complete;
private AsyncResult<T> result;

public synchronized void addListener(Handler<AsyncResult<T>> handler) {
if (complete) {
handler.handle(result);
} else {
handlers.add(handler);
}
}

@Override
public synchronized void handle(AsyncResult<T> res) {
if (!complete) {
for (Handler<AsyncResult<T>> handler: handlers) {
handler.handle(res);
}
complete = true;
result = res;
} else {
throw new IllegalStateException("Already complete!");
}
}

private static void checkPort(int port) {
if (port < 0 || port > 65535) {
throw new IllegalArgumentException("Invalid port " + port);
}
}

public static AsyncResolveBindConnectHelper<ChannelFuture> doBind(VertxInternal vertx, int port, String host,
ServerBootstrap bootstrap) {
return doBindConnect(vertx, port, host, bootstrap::bind);
}

public static AsyncResolveBindConnectHelper<ChannelFuture> doConnect(VertxInternal vertx, int port, String host,
Bootstrap bootstrap) {
return doBindConnect(vertx, port, host, bootstrap::connect);
}

private static AsyncResolveBindConnectHelper<ChannelFuture> doBindConnect(VertxInternal vertx, int port, String host,
Function<InetSocketAddress,
ChannelFuture> cfProducer) {
checkPort(port);
AsyncResolveBindConnectHelper<ChannelFuture> asyncResolveBindConnectHelper = new AsyncResolveBindConnectHelper<>();
vertx.resolveAsync(host, res -> {
if (res.succeeded()) {
// At this point the name is an IP address so there will be no resolve hit
ChannelFuture future = cfProducer.apply(new InetSocketAddress(res.result(), port));
future.addListener(f -> {
if (f.isSuccess()) {
asyncResolveBindConnectHelper.handle(Future.succeededFuture(future));
} else {
asyncResolveBindConnectHelper.handle(Future.failedFuture(f.cause()));
}
});
} else {
asyncResolveBindConnectHelper.handle(Future.failedFuture(res.cause()));
}
});
return asyncResolveBindConnectHelper;
}

}
10 changes: 5 additions & 5 deletions src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -330,10 +330,10 @@ public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeRespons
} }
}); });
applyConnectionOptions(options, bootstrap); applyConnectionOptions(options, bootstrap);
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); AsyncResolveBindConnectHelper<ChannelFuture> future = AsyncResolveBindConnectHelper.doConnect(vertx, port, host, bootstrap);
future.addListener((ChannelFuture channelFuture) -> { future.addListener(res -> {
Channel ch = channelFuture.channel(); if (res.succeeded()) {
if (channelFuture.isSuccess()) { Channel ch = res.result().channel();
if (!options.isUseAlpn()) { if (!options.isUseAlpn()) {
if (options.isSsl()) { if (options.isSsl()) {
// TCP connected, so now we must do the SSL handshake // TCP connected, so now we must do the SSL handshake
Expand All @@ -359,7 +359,7 @@ public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeRespons
} }
} }
} else { } else {
connectionFailed(context, ch, waiter::handleFailure, channelFuture.cause()); connectionFailed(context, null, waiter::handleFailure, res.cause());
} }
}); });
} }
Expand Down
29 changes: 14 additions & 15 deletions src/main/java/io/vertx/core/http/impl/HttpServerImpl.java
Expand Up @@ -55,7 +55,6 @@
import io.vertx.core.spi.metrics.MetricsProvider; import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.ReadStream;


import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
Expand Down Expand Up @@ -101,7 +100,7 @@ public class HttpServerImpl implements HttpServer, Closeable, MetricsProvider {


private ChannelGroup serverChannelGroup; private ChannelGroup serverChannelGroup;
private volatile boolean listening; private volatile boolean listening;
private ChannelFuture bindFuture; private AsyncResolveBindConnectHelper<ChannelFuture> bindFuture;
private ServerID id; private ServerID id;
private HttpServerImpl actualServer; private HttpServerImpl actualServer;
private volatile int actualPort; private volatile int actualPort;
Expand Down Expand Up @@ -250,17 +249,17 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) thr


addHandlers(this, listenContext); addHandlers(this, listenContext);
try { try {
bindFuture = bootstrap.bind(new InetSocketAddress(InetAddress.getByName(host), port)); bindFuture = AsyncResolveBindConnectHelper.doBind(vertx, port, host, bootstrap);
Channel serverChannel = bindFuture.channel(); bindFuture.addListener(res -> {
serverChannelGroup.add(serverChannel); if (res.failed()) {
bindFuture.addListener(channelFuture -> { vertx.sharedHttpServers().remove(id);
if (!channelFuture.isSuccess()) { } else {
vertx.sharedHttpServers().remove(id); Channel serverChannel = res.result().channel();
} else { HttpServerImpl.this.actualPort = ((InetSocketAddress)serverChannel.localAddress()).getPort();
HttpServerImpl.this.actualPort = ((InetSocketAddress)bindFuture.channel().localAddress()).getPort(); serverChannelGroup.add(serverChannel);
metrics = vertx.metricsSPI().createMetrics(this, new SocketAddressImpl(port, host), options); metrics = vertx.metricsSPI().createMetrics(this, new SocketAddressImpl(port, host), options);
} }
}); });
} catch (final Throwable t) { } catch (final Throwable t) {
// Make sure we send the exception back through the handler (if any) // Make sure we send the exception back through the handler (if any)
if (listenHandler != null) { if (listenHandler != null) {
Expand All @@ -284,14 +283,14 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) thr
actualServer.bindFuture.addListener(future -> { actualServer.bindFuture.addListener(future -> {
if (listenHandler != null) { if (listenHandler != null) {
final AsyncResult<HttpServer> res; final AsyncResult<HttpServer> res;
if (future.isSuccess()) { if (future.succeeded()) {
res = Future.succeededFuture(HttpServerImpl.this); res = Future.succeededFuture(HttpServerImpl.this);
} else { } else {
res = Future.failedFuture(future.cause()); res = Future.failedFuture(future.cause());
listening = false; listening = false;
} }
listenContext.runOnContext((v) -> listenHandler.handle(res)); listenContext.runOnContext((v) -> listenHandler.handle(res));
} else if (!future.isSuccess()) { } else if (future.failed()) {
listening = false; listening = false;
// No handler - log so user can see failure // No handler - log so user can see failure
log.error(future.cause()); log.error(future.cause());
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/io/vertx/core/impl/VertxImpl.java
Expand Up @@ -61,6 +61,8 @@
import io.vertx.core.spi.metrics.VertxMetrics; import io.vertx.core.spi.metrics.VertxMetrics;


import java.io.File; import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -658,6 +660,19 @@ public File resolveFile(String fileName) {
return fileResolver.resolveFile(fileName); return fileResolver.resolveFile(fileName);
} }


@Override
public void resolveAsync(String host, Handler<AsyncResult<String>> resultHandler) {
// For now just do a blocking resolve
// When Netty 4.1 is released we can use async DNS resolution
executeBlockingInternal(() -> {
try {
return InetAddress.getByName(host).getHostAddress();
} catch (UnknownHostException e) {
throw new VertxException(e);
}
}, resultHandler);
}

@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void deleteCacheDirAndShutdown(Handler<AsyncResult<Void>> completionHandler) { private void deleteCacheDirAndShutdown(Handler<AsyncResult<Void>> completionHandler) {
fileResolver.close(res -> { fileResolver.close(res -> {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/vertx/core/impl/VertxInternal.java
Expand Up @@ -90,4 +90,6 @@ public interface VertxInternal extends Vertx {


ClusterManager getClusterManager(); ClusterManager getClusterManager();


void resolveAsync(String host, Handler<AsyncResult<String>> resultHandler);

} }
30 changes: 14 additions & 16 deletions src/main/java/io/vertx/core/net/impl/NetClientImpl.java
Expand Up @@ -17,12 +17,7 @@
package io.vertx.core.net.impl; package io.vertx.core.net.impl;


import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.*;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.stream.ChunkedWriteHandler;
Expand All @@ -31,18 +26,18 @@
import io.vertx.core.Future; import io.vertx.core.Future;
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.Closeable; import io.vertx.core.Closeable;
import io.vertx.core.http.impl.AsyncResolveBindConnectHelper;
import io.vertx.core.impl.ContextImpl; import io.vertx.core.impl.ContextImpl;
import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger; import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory; import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.metrics.Metrics;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.spi.metrics.TCPMetrics;
import io.vertx.core.net.NetClient; import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions; import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket; import io.vertx.core.net.NetSocket;
import io.vertx.core.spi.metrics.Metrics;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.spi.metrics.TCPMetrics;


import java.net.InetSocketAddress;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -177,11 +172,12 @@ protected void initChannel(Channel ch) throws Exception {
}); });


applyConnectionOptions(bootstrap); applyConnectionOptions(bootstrap);
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); AsyncResolveBindConnectHelper<ChannelFuture> future = AsyncResolveBindConnectHelper.doConnect(vertx, port, host, bootstrap);
future.addListener((ChannelFuture channelFuture) -> { future.addListener(res -> {
Channel ch = channelFuture.channel();


if (channelFuture.isSuccess()) { if (res.succeeded()) {

Channel ch = res.result().channel();


if (sslHelper.isSSL()) { if (sslHelper.isSSL()) {
// TCP connected, so now we must do the SSL handshake // TCP connected, so now we must do the SSL handshake
Expand Down Expand Up @@ -210,7 +206,7 @@ protected void initChannel(Channel ch) throws Exception {
); );
}); });
} else { } else {
failed(context, ch, channelFuture.cause(), connectHandler); failed(context, null, res.cause(), connectHandler);
} }
} }
}); });
Expand All @@ -228,7 +224,9 @@ private void connected(ContextImpl context, Channel ch, Handler<AsyncResult<NetS
} }


private void failed(ContextImpl context, Channel ch, Throwable t, Handler<AsyncResult<NetSocket>> connectHandler) { private void failed(ContextImpl context, Channel ch, Throwable t, Handler<AsyncResult<NetSocket>> connectHandler) {
ch.close(); if (ch != null) {
ch.close();
}
context.executeFromIO(() -> doFailed(connectHandler, t)); context.executeFromIO(() -> doFailed(connectHandler, t));
} }


Expand Down

0 comments on commit 50f353f

Please sign in to comment.