Skip to content

Commit

Permalink
Remove Netty's DNS codec fork and reuse Netty's DNS codec in DnsClien…
Browse files Browse the repository at this point in the history
…t - fixes #1845
  • Loading branch information
vietj committed Mar 1, 2017
1 parent 1d80cbf commit 8174e2f
Show file tree
Hide file tree
Showing 31 changed files with 326 additions and 2,648 deletions.
165 changes: 56 additions & 109 deletions src/main/java/io/vertx/core/dns/impl/DnsClientImpl.java
Expand Up @@ -25,6 +25,14 @@
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.dns.DatagramDnsQuery;
import io.netty.handler.codec.dns.DatagramDnsQueryEncoder;
import io.netty.handler.codec.dns.DatagramDnsResponseDecoder;
import io.netty.handler.codec.dns.DefaultDnsQuestion;
import io.netty.handler.codec.dns.DnsRecord;
import io.netty.handler.codec.dns.DnsRecordType;
import io.netty.handler.codec.dns.DnsResponse;
import io.netty.handler.codec.dns.DnsSection;
import io.vertx.core.AsyncResult; import io.vertx.core.AsyncResult;
import io.vertx.core.Future; import io.vertx.core.Future;
import io.vertx.core.Handler; import io.vertx.core.Handler;
Expand All @@ -33,16 +41,7 @@
import io.vertx.core.dns.DnsResponseCode; import io.vertx.core.dns.DnsResponseCode;
import io.vertx.core.dns.MxRecord; import io.vertx.core.dns.MxRecord;
import io.vertx.core.dns.SrvRecord; import io.vertx.core.dns.SrvRecord;
import io.vertx.core.dns.impl.netty.DnsEntry; import io.vertx.core.dns.impl.decoder.RecordDecoder;
import io.vertx.core.dns.impl.netty.DnsQuery;
import io.vertx.core.dns.impl.netty.DnsQueryEncoder;
import io.vertx.core.dns.impl.netty.DnsQuestion;
import io.vertx.core.dns.impl.netty.DnsResource;
import io.vertx.core.dns.impl.netty.DnsResponse;
import io.vertx.core.dns.impl.netty.DnsResponseDecoder;
import io.vertx.core.dns.impl.netty.decoder.RecordDecoderFactory;
import io.vertx.core.dns.impl.netty.decoder.record.MailExchangerRecord;
import io.vertx.core.dns.impl.netty.decoder.record.ServiceRecord;
import io.vertx.core.impl.ContextImpl; import io.vertx.core.impl.ContextImpl;
import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.impl.PartialPooledByteBufAllocator; import io.vertx.core.net.impl.PartialPooledByteBufAllocator;
Expand All @@ -53,7 +52,6 @@
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -87,56 +85,51 @@ public DnsClientImpl(VertxInternal vertx, int port, String host) {
@Override @Override
protected void initChannel(DatagramChannel ch) throws Exception { protected void initChannel(DatagramChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DnsQueryEncoder()); pipeline.addLast(new DatagramDnsQueryEncoder());
pipeline.addLast(new DnsResponseDecoder()); pipeline.addLast(new DatagramDnsResponseDecoder());
} }
}); });
} }


@Override @Override
public DnsClient lookup4(String name, Handler<AsyncResult<String>> handler) { public DnsClient lookup4(String name, Handler<AsyncResult<String>> handler) {
lookup(name, new HandlerAdapter<String>(handler), DnsEntry.TYPE_A); lookupSingle(name, handler, DnsRecordType.A);
return this; return this;
} }


@Override @Override
public DnsClient lookup6(String name, Handler<AsyncResult<String>> handler) { public DnsClient lookup6(String name, Handler<AsyncResult<String>> handler) {
lookup(name, new HandlerAdapter<String>(handler), DnsEntry.TYPE_AAAA); lookupSingle(name, handler, DnsRecordType.AAAA);
return this; return this;
} }


@Override @Override
public DnsClient lookup(String name, Handler<AsyncResult<String>> handler) { public DnsClient lookup(String name, Handler<AsyncResult<String>> handler) {
lookup(name, new HandlerAdapter<String>(handler), DnsEntry.TYPE_A, DnsEntry.TYPE_AAAA); lookupSingle(name, handler, DnsRecordType.A, DnsRecordType.AAAA);
return this; return this;
} }


@Override @Override
public DnsClient resolveA(String name, Handler<AsyncResult<List<String>>> handler) { public DnsClient resolveA(String name, Handler<AsyncResult<List<String>>> handler) {
lookup(name, handler, DnsEntry.TYPE_A); lookupList(name, handler, DnsRecordType.A);
return this; return this;
} }


@Override @Override
public DnsClient resolveCNAME(String name, Handler<AsyncResult<List<String> >> handler) { public DnsClient resolveCNAME(String name, Handler<AsyncResult<List<String> >> handler) {
lookup(name, handler, DnsEntry.TYPE_CNAME); lookupList(name, handler, DnsRecordType.CNAME);
return this; return this;
} }


@Override @Override
public DnsClient resolveMX(String name, Handler<AsyncResult<List<MxRecord>>> handler) { public DnsClient resolveMX(String name, Handler<AsyncResult<List<MxRecord>>> handler) {
lookup(name, new ConvertingHandler<MailExchangerRecord, MxRecord>(handler, MxRecordComparator.INSTANCE) { lookupList(name, handler, DnsRecordType.MX);
@Override
protected MxRecord convert(MailExchangerRecord entry) {
return new MxRecordImpl(entry);
}
}, DnsEntry.TYPE_MX);
return this; return this;
} }


@Override @Override
public DnsClient resolveTXT(String name, Handler<AsyncResult<List<String>>> handler) { public DnsClient resolveTXT(String name, Handler<AsyncResult<List<String>>> handler) {
lookup(name, new Handler<AsyncResult>() { lookupList(name, new Handler<AsyncResult<List<String>>>() {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public void handle(AsyncResult event) { public void handle(AsyncResult event) {
Expand All @@ -151,36 +144,31 @@ public void handle(AsyncResult event) {
handler.handle(Future.succeededFuture(txts)); handler.handle(Future.succeededFuture(txts));
} }
} }
}, DnsEntry.TYPE_TXT); }, DnsRecordType.TXT);
return this; return this;
} }


@Override @Override
public DnsClient resolvePTR(String name, Handler<AsyncResult<String>> handler) { public DnsClient resolvePTR(String name, Handler<AsyncResult<String>> handler) {
lookup(name, new HandlerAdapter<String>(handler), DnsEntry.TYPE_PTR); lookupSingle(name, handler, DnsRecordType.PTR);
return this; return this;
} }


@Override @Override
public DnsClient resolveAAAA(String name, Handler<AsyncResult<List<String>>> handler) { public DnsClient resolveAAAA(String name, Handler<AsyncResult<List<String>>> handler) {
lookup(name, handler, DnsEntry.TYPE_AAAA); lookupList(name, handler, DnsRecordType.AAAA);
return this; return this;
} }


@Override @Override
public DnsClient resolveNS(String name, Handler<AsyncResult<List<String>>> handler) { public DnsClient resolveNS(String name, Handler<AsyncResult<List<String>>> handler) {
lookup(name, handler, DnsEntry.TYPE_NS); lookupList(name, handler, DnsRecordType.NS);
return this; return this;
} }


@Override @Override
public DnsClient resolveSRV(String name, Handler<AsyncResult<List<SrvRecord>>> handler) { public DnsClient resolveSRV(String name, Handler<AsyncResult<List<SrvRecord>>> handler) {
lookup(name, new ConvertingHandler<ServiceRecord, SrvRecord>(handler, SrvRecordComparator.INSTANCE) { lookupList(name, handler, DnsRecordType.SRV);
@Override
protected SrvRecord convert(ServiceRecord entry) {
return new SrcRecordImpl(entry);
}
}, DnsEntry.TYPE_SRV);
return this; return this;
} }


Expand Down Expand Up @@ -213,14 +201,7 @@ public DnsClient reverseLookup(String address, Handler<AsyncResult<String>> hand
} }
reverseName.append(".in-addr.arpa"); reverseName.append(".in-addr.arpa");


return resolvePTR(reverseName.toString(), ar -> { return resolvePTR(reverseName.toString(), handler);
if (ar.failed()) {
handler.handle(Future.failedFuture(ar.cause()));
} else {
String result = ar.result();
handler.handle(Future.succeededFuture(result));
}
});
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
// Should never happen as we work with ip addresses as input // Should never happen as we work with ip addresses as input
// anyway just in case notify the handler // anyway just in case notify the handler
Expand All @@ -229,52 +210,60 @@ public DnsClient reverseLookup(String address, Handler<AsyncResult<String>> hand
return this; return this;
} }


private <T> void lookupSingle(String name, Handler<AsyncResult<T>> handler, DnsRecordType... types) {
this.<T>lookupList(name, ar -> handler.handle(ar.map(result -> result.isEmpty() ? null : result.get(0))), types);
}

@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void lookup(String name, Handler handler, int... types) { private <T> void lookupList(String name, Handler<AsyncResult<List<T>>> handler, DnsRecordType... types) {
Future result = Future.future(); Future<List<T>> result = Future.future();
result.setHandler(handler); result.setHandler(handler);
lookup(name, result, types); lookup(name, result, types);
} }


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void lookup(String name, Future result, int... types) { private <T> void lookup(String name, Future<List<T>> result, DnsRecordType... types) {
Objects.requireNonNull(name, "no null name accepted"); Objects.requireNonNull(name, "no null name accepted");
bootstrap.connect(dnsServer).addListener(new RetryChannelFutureListener(result) { bootstrap.connect(dnsServer).addListener(new RetryChannelFutureListener(result) {
@Override @Override
public void onSuccess(ChannelFuture future) throws Exception { public void onSuccess(ChannelFuture future) throws Exception {
DnsQuery query = new DnsQuery(ThreadLocalRandom.current().nextInt()); DatagramDnsQuery query = new DatagramDnsQuery(null, dnsServer, ThreadLocalRandom.current().nextInt());
for (int type: types) { for (DnsRecordType type: types) {
query.addQuestion(new DnsQuestion(name, type)); query.addRecord(DnsSection.QUESTION, new DefaultDnsQuestion(name, type, DnsRecord.CLASS_IN));
} }
future.channel().writeAndFlush(query).addListener(new RetryChannelFutureListener(result) { future.channel().writeAndFlush(query).addListener(new RetryChannelFutureListener(result) {
@Override @Override
public void onSuccess(ChannelFuture future) throws Exception { public void onSuccess(ChannelFuture future) throws Exception {
future.channel().pipeline().addLast(new SimpleChannelInboundHandler<DnsResponse>() { future.channel().pipeline().addLast(new SimpleChannelInboundHandler<DnsResponse>() {
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, DnsResponse msg) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, DnsResponse msg) throws Exception {
DnsResponseCode code = DnsResponseCode.valueOf(msg.getHeader().getResponseCode()); DnsResponseCode code = DnsResponseCode.valueOf(msg.code().intValue());


if (code == DnsResponseCode.NOERROR) { if (code == DnsResponseCode.NOERROR) {
List<DnsResource> resources = msg.getAnswers();
List<Object> records = new ArrayList<>(resources.size()); int count = msg.count(DnsSection.ANSWER);
for (DnsResource resource : msg.getAnswers()) {
Object record = RecordDecoderFactory.getFactory().decode(resource.type(), msg, resource); List<T> records = new ArrayList<>(count);
if (record instanceof InetAddress) { for (int idx = 0;idx < count;idx++) {
record = ((InetAddress)record).getHostAddress(); DnsRecord a = msg.recordAt(DnsSection.ANSWER);
} T record = RecordDecoder.decode(a);
records.add(record); records.add(record);
} }


if (records.size() > 0 && (records.get(0) instanceof MxRecordImpl || records.get(0) instanceof SrvRecordImpl)) {
Collections.sort((List) records);
}

setResult(result, records); setResult(result, records);
} else { } else {
setResult(result, new DnsException(code)); setFailure(result, new DnsException(code));
} }
ctx.close(); ctx.close();
} }


@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
setResult(result, cause); setFailure(result, cause);
ctx.close(); ctx.close();
} }
}); });
Expand All @@ -285,7 +274,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
} }


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void setResult(Future r, Object result) { private <T> void setResult(Future<List<T>> r, List<T> result) {
if (r.isComplete()) { if (r.isComplete()) {
return; return;
} }
Expand All @@ -298,56 +287,14 @@ private void setResult(Future r, Object result) {
}); });
} }


private static class HandlerAdapter<T> implements Handler<AsyncResult<List<T>>> { @SuppressWarnings("unchecked")
private final Handler handler; private <T> void setFailure(Future<List<T>> r, Throwable err) {

if (r.isComplete()) {
HandlerAdapter(Handler handler) { return;
this.handler = handler;
}

@SuppressWarnings("unchecked")
@Override
public void handle(AsyncResult<List<T>> event) {
if (event.failed()) {
handler.handle(event);
} else {
List<T> result = event.result();
if (result.isEmpty()) {
handler.handle(Future.succeededFuture());
} else {
handler.handle(Future.succeededFuture(result.get(0)));
}
}
}
}

protected abstract class ConvertingHandler<F, T> implements Handler<AsyncResult<List<F>>> {
private final Handler handler;
private final Comparator comparator;

ConvertingHandler(Handler<AsyncResult<List<T>>> handler, Comparator comparator) {
this.handler = handler;
this.comparator = comparator;
}

@SuppressWarnings("unchecked")
@Override
public void handle(AsyncResult<List<F>> event) {
if (event.failed()) {
handler.handle(event);
} else {
List records = (List) event.result();
for (int i = 0; i < records.size(); i++) {
F record = (F) records.get(i);
records.set(i, convert(record));
}

Collections.sort(records, comparator);
handler.handle(Future.succeededFuture(records));
}
} }

actualCtx.executeFromIO(() -> {
protected abstract T convert(F entry); r.fail(err);
});
} }


private abstract class RetryChannelFutureListener implements ChannelFutureListener { private abstract class RetryChannelFutureListener implements ChannelFutureListener {
Expand Down
33 changes: 0 additions & 33 deletions src/main/java/io/vertx/core/dns/impl/MxRecordComparator.java

This file was deleted.

17 changes: 9 additions & 8 deletions src/main/java/io/vertx/core/dns/impl/MxRecordImpl.java
Expand Up @@ -16,27 +16,28 @@
package io.vertx.core.dns.impl; package io.vertx.core.dns.impl;


import io.vertx.core.dns.MxRecord; import io.vertx.core.dns.MxRecord;
import io.vertx.core.dns.impl.netty.decoder.record.MailExchangerRecord;



/** /**
* @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a> * @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
*/ */
final class MxRecordImpl implements MxRecord, Comparable<MxRecord> { public final class MxRecordImpl implements MxRecord, Comparable<MxRecord> {
private final MailExchangerRecord record;
private final int priority;
private final String name;


MxRecordImpl(MailExchangerRecord record) { public MxRecordImpl(int priority, String name) {
this.record = record; this.priority = priority;
this.name = name;
} }


@Override @Override
public int priority() { public int priority() {
return record.priority(); return priority;
} }


@Override @Override
public String name() { public String name() {
return record.name(); return name;
} }


@Override @Override
Expand Down

0 comments on commit 8174e2f

Please sign in to comment.