Skip to content

Commit

Permalink
AddressResolver should not bind locally - fixes #1479
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Jun 28, 2016
1 parent 4ddf1a2 commit f086b1b
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 37 deletions.
4 changes: 2 additions & 2 deletions src/main/asciidoc/java/override/dependencies.adoc
Expand Up @@ -8,13 +8,13 @@ project descriptor to access the Vert.x Core API:
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.3.0</version>
<version>3.3.1-SNAPSHOT</version>
</dependency>
----

* Gradle (in your `build.gradle` file):
[source,groovy,subs="+attributes"]
----
compile io.vertx:vertx-core:3.3.0
compile io.vertx:vertx-core:3.3.1-SNAPSHOT
----
42 changes: 23 additions & 19 deletions src/main/java/io/vertx/core/dns/impl/fix/DnsNameResolver.java
Expand Up @@ -17,12 +17,14 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.socket.DatagramChannel;
Expand Down Expand Up @@ -68,8 +70,6 @@ public class DnsNameResolver extends InetNameResolver {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(DnsNameResolver.class);

static final InetSocketAddress ANY_LOCAL_ADDR = new InetSocketAddress(0);

static final InternetProtocolFamily[] DEFAULT_RESOLVE_ADDRESS_TYPES = new InternetProtocolFamily[2];

static {
Expand All @@ -89,7 +89,7 @@ public class DnsNameResolver extends InetNameResolver {
private static final DatagramDnsQueryEncoder ENCODER = new DatagramDnsQueryEncoder();

final DnsServerAddresses nameServerAddresses;
final ChannelFuture bindFuture;
final Future<Channel> channelFuture;
final DatagramChannel ch;

/**
Expand Down Expand Up @@ -126,7 +126,6 @@ protected DnsServerAddressStream initialValue() throws Exception {
*
* @param eventLoop the {@link EventLoop} which will perform the communication with the DNS servers
* @param channelFactory the {@link ChannelFactory} that will create a {@link DatagramChannel}
* @param localAddress the local address of the {@link DatagramChannel}
* @param nameServerAddresses the addresses of the DNS server. For each DNS query, a new stream is created from
* this to determine which DNS server should be contacted for the next retry in case
* of failure.
Expand All @@ -145,7 +144,6 @@ protected DnsServerAddressStream initialValue() throws Exception {
public DnsNameResolver(
EventLoop eventLoop,
ChannelFactory<? extends DatagramChannel> channelFactory,
InetSocketAddress localAddress,
DnsServerAddresses nameServerAddresses,
DnsCache resolveCache,
long queryTimeoutMillis,
Expand All @@ -161,7 +159,6 @@ public DnsNameResolver(

super(eventLoop);
checkNotNull(channelFactory, "channelFactory");
checkNotNull(localAddress, "localAddress");
this.nameServerAddresses = checkNotNull(nameServerAddresses, "nameServerAddresses");
this.queryTimeoutMillis = checkPositive(queryTimeoutMillis, "queryTimeoutMillis");
this.resolvedAddressTypes = checkNonEmpty(resolvedAddressTypes, "resolvedAddressTypes");
Expand All @@ -175,34 +172,28 @@ public DnsNameResolver(
this.searchDomains = checkNotNull(searchDomains, "searchDomains");
this.ndots = checkPositive(ndots, "ndots");

bindFuture = newChannel(channelFactory, localAddress);
ch = (DatagramChannel) bindFuture.channel();
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(maxPayloadSize));
}

private ChannelFuture newChannel(
ChannelFactory<? extends DatagramChannel> channelFactory, InetSocketAddress localAddress) {

Bootstrap b = new Bootstrap();
b.group(executor());
b.channelFactory(channelFactory);
final DnsResponseHandler responseHandler = new DnsResponseHandler();
b.option(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
final DnsResponseHandler responseHandler = new DnsResponseHandler(executor().<Channel>newPromise());
b.handler(new ChannelInitializer<DatagramChannel>() {
@Override
protected void initChannel(DatagramChannel ch) throws Exception {
ch.pipeline().addLast(DECODER, ENCODER, responseHandler);
}
});

ChannelFuture bindFuture = b.bind(localAddress);
bindFuture.channel().closeFuture().addListener(new ChannelFutureListener() {
channelFuture = responseHandler.channelActivePromise;
ch = (DatagramChannel) b.register().channel();
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(maxPayloadSize));

ch.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
resolveCache.clear();
}
});

return bindFuture;
}

/**
Expand Down Expand Up @@ -663,6 +654,13 @@ private static Promise<AddressedEnvelope<DnsResponse, InetSocketAddress>> cast(P
}

private final class DnsResponseHandler extends ChannelInboundHandlerAdapter {

private final Promise<Channel> channelActivePromise;

DnsResponseHandler(Promise<Channel> channelActivePromise) {
this.channelActivePromise = channelActivePromise;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Expand All @@ -685,6 +683,12 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
channelActivePromise.setSuccess(ctx.channel());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.warn("{} Unexpected exception: ", ch, cause);
Expand Down
Expand Up @@ -61,7 +61,6 @@ public final class DnsNameResolverBuilder {

private final EventLoop eventLoop;
private ChannelFactory<? extends DatagramChannel> channelFactory;
private InetSocketAddress localAddress = DnsNameResolver.ANY_LOCAL_ADDR;
private DnsServerAddresses nameServerAddresses = DnsServerAddresses.defaultAddresses();
private DnsCache resolveCache;
private Integer minTtl;
Expand Down Expand Up @@ -117,17 +116,6 @@ public DnsNameResolverBuilder channelType(Class<? extends DatagramChannel> chann
return channelFactory(new ReflectiveChannelFactory<DatagramChannel>(channelType));
}

/**
* Sets the local address of the {@link DatagramChannel}
*
* @param localAddress the local address
* @return {@code this}
*/
public DnsNameResolverBuilder localAddress(InetSocketAddress localAddress) {
this.localAddress = localAddress;
return this;
}

/**
* Sets the addresses of the DNS server.
*
Expand Down Expand Up @@ -376,7 +364,6 @@ public DnsNameResolver build(EventLoop eventLoop) {
return new DnsNameResolver(
eventLoop,
channelFactory,
localAddress,
nameServerAddresses,
cache,
queryTimeoutMillis,
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/io/vertx/core/dns/impl/fix/DnsQueryContext.java
Expand Up @@ -17,6 +17,7 @@

import io.netty.buffer.Unpooled;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.dns.DatagramDnsQuery;
Expand All @@ -28,6 +29,8 @@
import io.netty.handler.codec.dns.DnsResponse;
import io.netty.handler.codec.dns.DnsSection;
import io.netty.resolver.dns.*;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.OneTimeTask;
Expand Down Expand Up @@ -109,12 +112,12 @@ void query() {
}

private void sendQuery(final DnsQuery query) {
if (parent.bindFuture.isDone()) {
if (parent.channelFuture.isDone()) {
writeQuery(query);
} else {
parent.bindFuture.addListener(new ChannelFutureListener() {
parent.channelFuture.addListener(new GenericFutureListener<Future<? super Channel>>() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
public void operationComplete(Future<? super Channel> future) throws Exception {
if (future.isSuccess()) {
writeQuery(query);
} else {
Expand Down

0 comments on commit f086b1b

Please sign in to comment.