Skip to content

Commit

Permalink
Pluggable I/O for SimpleResolver
Browse files Browse the repository at this point in the history
Closes #253
  • Loading branch information
chrisruffalo authored and ibauersachs committed Nov 11, 2023
1 parent 4c51bf2 commit 5da2770
Show file tree
Hide file tree
Showing 12 changed files with 335 additions and 113 deletions.
46 changes: 46 additions & 0 deletions src/main/java/org/xbill/DNS/DefaultIoClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// SPDX-License-Identifier: BSD-3-Clause
package org.xbill.DNS;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.xbill.DNS.io.TcpIoClient;
import org.xbill.DNS.io.UdpIoClient;

/**
* An implementation of the IO clients that use the internal NIO-based clients.
*
* @see NioUdpClient
* @see NioTcpClient
* @since 3.6
*/
public class DefaultIoClient implements TcpIoClient, UdpIoClient {
private final TcpIoClient tcpIoClient;
private final UdpIoClient udpIoClient;

public DefaultIoClient() {
tcpIoClient = new NioTcpClient();
udpIoClient = new NioUdpClient();
}

@Override
public CompletableFuture<byte[]> sendAndReceiveTcp(
InetSocketAddress local,
InetSocketAddress remote,
Message query,
byte[] data,
Duration timeout) {
return tcpIoClient.sendAndReceiveTcp(local, remote, query, data, timeout);
}

@Override
public CompletableFuture<byte[]> sendAndReceiveUdp(
InetSocketAddress local,
InetSocketAddress remote,
Message query,
byte[] data,
int max,
Duration timeout) {
return udpIoClient.sendAndReceiveUdp(local, remote, query, data, max, timeout);
}
}
3 changes: 2 additions & 1 deletion src/main/java/org/xbill/DNS/Lookup.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ private static List<Name> convertSearchPathDomainList(List<Name> domains) {
}

/**
* Sets a custom logger that will be used to log the sent and received packets.
* Sets a custom logger that will be used to log the sent and received packets. This is only
* applicable to the default I/O implementations.
*
* @param logger The logger
*/
Expand Down
30 changes: 15 additions & 15 deletions src/main/java/org/xbill/DNS/NioTcpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,21 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.xbill.DNS.io.TcpIoClient;

@Slf4j
@UtilityClass
final class NioTcpClient extends NioClient {
private static final Queue<ChannelState> registrationQueue = new ConcurrentLinkedQueue<>();
private static final Map<ChannelKey, ChannelState> channelMap = new ConcurrentHashMap<>();
final class NioTcpClient extends NioClient implements TcpIoClient {
private final Queue<ChannelState> registrationQueue = new ConcurrentLinkedQueue<>();
private final Map<ChannelKey, ChannelState> channelMap = new ConcurrentHashMap<>();

static {
setRegistrationsTask(NioTcpClient::processPendingRegistrations, true);
setTimeoutTask(NioTcpClient::checkTransactionTimeouts, true);
setCloseTask(NioTcpClient::closeTcp, true);
NioTcpClient() {
setRegistrationsTask(this::processPendingRegistrations, true);
setTimeoutTask(this::checkTransactionTimeouts, true);
setCloseTask(this::closeTcp, true);
}

private static void processPendingRegistrations() {
private void processPendingRegistrations() {
while (!registrationQueue.isEmpty()) {
ChannelState state = registrationQueue.remove();
try {
Expand All @@ -49,7 +48,7 @@ private static void processPendingRegistrations() {
}
}

private static void checkTransactionTimeouts() {
private void checkTransactionTimeouts() {
for (ChannelState state : channelMap.values()) {
for (Iterator<Transaction> it = state.pendingTransactions.iterator(); it.hasNext(); ) {
Transaction t = it.next();
Expand All @@ -61,7 +60,7 @@ private static void checkTransactionTimeouts() {
}
}

private static void closeTcp() {
private void closeTcp() {
registrationQueue.clear();
EOFException closing = new EOFException("Client is closing");
channelMap.forEach((key, state) -> state.handleTransactionException(closing));
Expand Down Expand Up @@ -112,8 +111,8 @@ void send() throws IOException {
}

@RequiredArgsConstructor
private static class ChannelState implements KeyProcessor {
final SocketChannel channel;
private class ChannelState implements KeyProcessor {
private final SocketChannel channel;
final Queue<Transaction> pendingTransactions = new ConcurrentLinkedQueue<>();
ByteBuffer responseLengthData = ByteBuffer.allocate(2);
ByteBuffer responseData = ByteBuffer.allocate(Message.MAXLENGTH);
Expand Down Expand Up @@ -259,7 +258,8 @@ private static class ChannelKey {
final InetSocketAddress remote;
}

static CompletableFuture<byte[]> sendrecv(
@Override
public CompletableFuture<byte[]> sendAndReceiveTcp(
InetSocketAddress local,
InetSocketAddress remote,
Message query,
Expand Down
42 changes: 21 additions & 21 deletions src/main/java/org/xbill/DNS/NioUdpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import lombok.RequiredArgsConstructor;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.xbill.DNS.io.UdpIoClient;

@Slf4j
@UtilityClass
final class NioUdpClient extends NioClient {
private static final int EPHEMERAL_START;
private static final int EPHEMERAL_RANGE;
final class NioUdpClient extends NioClient implements UdpIoClient {
private final int ephemeralStart;
private final int ephemeralRange;

private static final SecureRandom prng;
private static final Queue<Transaction> registrationQueue = new ConcurrentLinkedQueue<>();
private static final Queue<Transaction> pendingTransactions = new ConcurrentLinkedQueue<>();
private final SecureRandom prng;
private final Queue<Transaction> registrationQueue = new ConcurrentLinkedQueue<>();
private final Queue<Transaction> pendingTransactions = new ConcurrentLinkedQueue<>();

static {
NioUdpClient() {
// https://tools.ietf.org/html/rfc6335#section-6
int ephemeralStartDefault = 49152;
int ephemeralEndDefault = 65535;
Expand All @@ -41,21 +40,21 @@ final class NioUdpClient extends NioClient {
ephemeralEndDefault = 60999;
}

EPHEMERAL_START = Integer.getInteger("dnsjava.udp.ephemeral.start", ephemeralStartDefault);
ephemeralStart = Integer.getInteger("dnsjava.udp.ephemeral.start", ephemeralStartDefault);
int ephemeralEnd = Integer.getInteger("dnsjava.udp.ephemeral.end", ephemeralEndDefault);
EPHEMERAL_RANGE = ephemeralEnd - EPHEMERAL_START;
ephemeralRange = ephemeralEnd - ephemeralStart;

if (Boolean.getBoolean("dnsjava.udp.ephemeral.use_ephemeral_port")) {
prng = null;
} else {
prng = new SecureRandom();
}
setRegistrationsTask(NioUdpClient::processPendingRegistrations, false);
setTimeoutTask(NioUdpClient::checkTransactionTimeouts, false);
setCloseTask(NioUdpClient::closeUdp, false);
setRegistrationsTask(this::processPendingRegistrations, false);
setTimeoutTask(this::checkTransactionTimeouts, false);
setCloseTask(this::closeUdp, false);
}

private static void processPendingRegistrations() {
private void processPendingRegistrations() {
while (!registrationQueue.isEmpty()) {
Transaction t = registrationQueue.remove();
try {
Expand All @@ -68,7 +67,7 @@ private static void processPendingRegistrations() {
}
}

private static void checkTransactionTimeouts() {
private void checkTransactionTimeouts() {
for (Iterator<Transaction> it = pendingTransactions.iterator(); it.hasNext(); ) {
Transaction t = it.next();
if (t.endTime - System.nanoTime() < 0) {
Expand All @@ -79,7 +78,7 @@ private static void checkTransactionTimeouts() {
}

@RequiredArgsConstructor
private static class Transaction implements KeyProcessor {
private class Transaction implements KeyProcessor {
private final int id;
private final byte[] data;
private final int max;
Expand Down Expand Up @@ -159,7 +158,8 @@ private void silentCloseChannel() {
}
}

static CompletableFuture<byte[]> sendrecv(
@Override
public CompletableFuture<byte[]> sendAndReceiveUdp(
InetSocketAddress local,
InetSocketAddress remote,
Message query,
Expand All @@ -182,12 +182,12 @@ static CompletableFuture<byte[]> sendrecv(
InetSocketAddress addr = null;
if (local == null) {
if (prng != null) {
addr = new InetSocketAddress(prng.nextInt(EPHEMERAL_RANGE) + EPHEMERAL_START);
addr = new InetSocketAddress(prng.nextInt(ephemeralRange) + ephemeralStart);
}
} else {
int port = local.getPort();
if (port == 0 && prng != null) {
port = prng.nextInt(EPHEMERAL_RANGE) + EPHEMERAL_START;
port = prng.nextInt(ephemeralRange) + ephemeralStart;
}

addr = new InetSocketAddress(local.getAddress(), port);
Expand Down Expand Up @@ -225,7 +225,7 @@ static CompletableFuture<byte[]> sendrecv(
return f;
}

private static void closeUdp() {
private void closeUdp() {
registrationQueue.clear();
EOFException closing = new EOFException("Client is closing");
pendingTransactions.forEach(t -> t.completeExceptionally(closing));
Expand Down
21 changes: 19 additions & 2 deletions src/main/java/org/xbill/DNS/SimpleResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.xbill.DNS.io.DefaultIoClientFactory;
import org.xbill.DNS.io.IoClientFactory;

/**
* An implementation of Resolver that sends one query to one server. SimpleResolver handles TCP
Expand Down Expand Up @@ -44,6 +48,13 @@ public class SimpleResolver implements Resolver {

private static final short DEFAULT_UDPSIZE = 512;

/**
* Gets or sets the factory that creates clients for sending messages to the wire.
*
* @since 3.6
*/
@Getter @Setter private IoClientFactory ioClientFactory = new DefaultIoClientFactory();

private static InetSocketAddress defaultResolver =
new InetSocketAddress(InetAddress.getLoopbackAddress(), DEFAULT_PORT);

Expand Down Expand Up @@ -368,9 +379,15 @@ CompletableFuture<Message> sendAsync(Message query, boolean forceTcp, Executor e

CompletableFuture<byte[]> result;
if (tcp) {
result = NioTcpClient.sendrecv(localAddress, address, query, out, timeoutValue);
result =
ioClientFactory
.createOrGetTcpClient()
.sendAndReceiveTcp(localAddress, address, query, out, timeoutValue);
} else {
result = NioUdpClient.sendrecv(localAddress, address, query, out, udpSize, timeoutValue);
result =
ioClientFactory
.createOrGetUdpClient()
.sendAndReceiveUdp(localAddress, address, query, out, udpSize, timeoutValue);
}

return result.thenComposeAsync(
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/org/xbill/DNS/io/DefaultIoClientFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// SPDX-License-Identifier: BSD-3-Clause
package org.xbill.DNS.io;

import org.xbill.DNS.DefaultIoClient;
import org.xbill.DNS.SimpleResolver;

/**
* Serves as a default implementation that is used by the {@link SimpleResolver}, unless otherwise
* configured. This preserves the default behavior (to use the built-in NIO clients) while allowing
* flexibility at the point of use.
*
* @since 3.6
*/
public class DefaultIoClientFactory implements IoClientFactory {
/**
* Shared instance because it only serves as a bridge to the static NIO classes and does not need
* to be different per class.
*/
private static final DefaultIoClient RESOLVER_CLIENT = new DefaultIoClient();

@Override
public TcpIoClient createOrGetTcpClient() {
return RESOLVER_CLIENT;
}

@Override
public UdpIoClient createOrGetUdpClient() {
return RESOLVER_CLIENT;
}
}
30 changes: 30 additions & 0 deletions src/main/java/org/xbill/DNS/io/IoClientFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// SPDX-License-Identifier: BSD-3-Clause
package org.xbill.DNS.io;

import org.xbill.DNS.SimpleResolver;

/**
* Interface for creating the TCP/UDP factories necessary for the {@link SimpleResolver}.
*
* @since 3.6
*/
public interface IoClientFactory {
/**
* Create or return a cached/reused instance of the TCP resolver that should be used to send DNS
* data over the wire to the remote target. <br>
* It is the responsibility of this method to manage pooling or connection reuse. This method is
* called right before the connection is made every time the {@link SimpleResolver} is called. The
* implementer of this method should be aware and choose how to pool or reuse connections.
*
* @return an instance of the tcp resolver client
*/
TcpIoClient createOrGetTcpClient();

/**
* Create or return a cached/reused instance of the UDP resolver that should be used to send DNS
* data over the wire to the remote target.
*
* @return an instance of the udp resolver client
*/
UdpIoClient createOrGetUdpClient();
}
35 changes: 35 additions & 0 deletions src/main/java/org/xbill/DNS/io/TcpIoClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// SPDX-License-Identifier: BSD-3-Clause
package org.xbill.DNS.io;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.xbill.DNS.Message;
import org.xbill.DNS.Resolver;

/**
* Serves as an interface from a {@link Resolver} to the underlying mechanism for sending bytes over
* the wire as a TCP message.
*
* @since 3.6
*/
public interface TcpIoClient {
/**
* Sends a query to a remote server and returns the answer.
*
* @param local Address from which the connection is coming. may be {@code null} and the
* implementation must decide on the local address.
* @param remote Address that the connection should send the data to.
* @param query DNS message representation of the outbound query.
* @param data Raw byte representation of the outbound query.
* @param timeout Duration before the connection will time out and be closed.
* @return A {@link CompletableFuture} that will be completed with the byte value of the response.
* @since 3.6
*/
CompletableFuture<byte[]> sendAndReceiveTcp(
InetSocketAddress local,
InetSocketAddress remote,
Message query,
byte[] data,
Duration timeout);
}

0 comments on commit 5da2770

Please sign in to comment.