diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 06b5d24e8b48a..edb2a983f25b9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -36,6 +36,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -47,6 +48,7 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.netty.DnsResolverUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,13 +64,20 @@ public class ConnectionPool implements AutoCloseable { private final boolean isSniProxy; protected final DnsNameResolver dnsResolver; + private final boolean shouldCloseDnsResolver; public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup)); } public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, - Supplier clientCnxSupplier) throws PulsarClientException { + Supplier clientCnxSupplier) throws PulsarClientException { + this(conf, eventLoopGroup, clientCnxSupplier, Optional.empty()); + } + + public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, + Supplier clientCnxSupplier, Optional dnsNameResolver) + throws PulsarClientException { this.eventLoopGroup = eventLoopGroup; this.clientConfig = conf; this.maxConnectionsPerHosts = conf.getConnectionsPerBroker(); @@ -91,6 +100,12 @@ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGrou log.error("Failed to create channel initializer"); throw new PulsarClientException(e); } + + this.shouldCloseDnsResolver = !dnsNameResolver.isPresent(); + this.dnsResolver = dnsNameResolver.orElseGet(() -> createDnsNameResolver(conf, eventLoopGroup)); + } + + private static DnsNameResolver createDnsNameResolver(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) { DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(eventLoopGroup.next()) .traceEnabled(true).channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)); if (conf.getDnsLookupBindAddress() != null) { @@ -98,7 +113,8 @@ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGrou conf.getDnsLookupBindPort()); dnsNameResolverBuilder.localAddress(addr); } - this.dnsResolver = dnsNameResolverBuilder.build(); + DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder); + return dnsNameResolverBuilder.build(); } private static final Random random = new Random(); @@ -320,7 +336,9 @@ public void releaseConnection(ClientCnx cnx) { @Override public void close() throws Exception { closeAllConnections(); - dnsResolver.close(); + if (shouldCloseDnsResolver) { + dnsResolver.close(); + } } private void cleanupConnection(InetSocketAddress address, int connectionKey, diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml index 3b0fba07aaa04..643432392a095 100644 --- a/pulsar-common/pom.xml +++ b/pulsar-common/pom.xml @@ -72,6 +72,11 @@ netty-handler + + io.netty + netty-resolver-dns + + io.netty netty-transport-native-epoll diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java new file mode 100644 index 0000000000000..8b06dbf36eca3 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util.netty; + +import io.netty.resolver.dns.DnsNameResolverBuilder; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class DnsResolverUtil { + private static final int MIN_TTL = 0; + private static final int TTL; + private static final int NEGATIVE_TTL; + + // default TTL value when JDK setting is "forever" (-1) + private static final int DEFAULT_TTL = 60; + + // default negative TTL value when JDK setting is "forever" (-1) + private static final int DEFAULT_NEGATIVE_TTL = 10; + + static { + int ttl = DEFAULT_TTL; + int negativeTtl = DEFAULT_NEGATIVE_TTL; + try { + // use reflection to call sun.net.InetAddressCachePolicy's get and getNegative methods for getting + // effective JDK settings for DNS caching + Class inetAddressCachePolicyClass = Class.forName("sun.net.InetAddressCachePolicy"); + Method getTTLMethod = inetAddressCachePolicyClass.getMethod("get"); + ttl = (Integer) getTTLMethod.invoke(null); + Method getNegativeTTLMethod = inetAddressCachePolicyClass.getMethod("getNegative"); + negativeTtl = (Integer) getNegativeTTLMethod.invoke(null); + } catch (NoSuchMethodException | ClassNotFoundException | InvocationTargetException + | IllegalAccessException e) { + log.warn("Cannot get DNS TTL settings from sun.net.InetAddressCachePolicy class", e); + } + TTL = useDefaultTTLWhenSetToForever(ttl, DEFAULT_TTL); + NEGATIVE_TTL = useDefaultTTLWhenSetToForever(negativeTtl, DEFAULT_NEGATIVE_TTL); + } + + private static int useDefaultTTLWhenSetToForever(int ttl, int defaultTtl) { + return ttl < 0 ? defaultTtl : ttl; + } + + private DnsResolverUtil() { + // utility class with static methods, prevent instantiation + } + + /** + * Configure Netty's {@link DnsNameResolverBuilder}'s ttl and negativeTtl to match the JDK's DNS caching settings. + * If the JDK setting for TTL is forever (-1), the TTL will be set to 60 seconds. + * + * @param dnsNameResolverBuilder The Netty {@link DnsNameResolverBuilder} instance to apply the settings + */ + public static void applyJdkDnsCacheSettings(DnsNameResolverBuilder dnsNameResolverBuilder) { + dnsNameResolverBuilder.ttl(MIN_TTL, TTL); + dnsNameResolverBuilder.negativeTtl(NEGATIVE_TTL); + } +} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 58203eee51c28..99f8f04ea84fb 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -25,9 +25,11 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.haproxy.HAProxyMessage; import io.netty.handler.ssl.SslHandler; +import io.netty.resolver.dns.DnsNameResolver; import java.net.SocketAddress; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -71,6 +73,7 @@ public class ProxyConnection extends PulsarHandler { private final AtomicLong requestIdGenerator = new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2)); private final ProxyService service; + private final DnsNameResolver dnsNameResolver; AuthenticationDataSource authenticationData; private State state; private final Supplier sslHandlerSupplier; @@ -119,9 +122,11 @@ ConnectionPool getConnectionPool() { return connectionPool; } - public ProxyConnection(ProxyService proxyService, Supplier sslHandlerSupplier) { + public ProxyConnection(ProxyService proxyService, Supplier sslHandlerSupplier, + DnsNameResolver dnsNameResolver) { super(30, TimeUnit.SECONDS); this.service = proxyService; + this.dnsNameResolver = dnsNameResolver; this.state = State.Init; this.sslHandlerSupplier = sslHandlerSupplier; this.brokerProxyValidator = service.getBrokerProxyValidator(); @@ -229,27 +234,26 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce } private synchronized void completeConnect(AuthData clientData) throws PulsarClientException { + Supplier clientCnxSupplier; if (service.getConfiguration().isAuthenticationEnabled()) { if (service.getConfiguration().isForwardAuthorizationCredentials()) { this.clientAuthData = clientData; this.clientAuthMethod = authMethod; } - if (this.connectionPool == null) { - this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(), - () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData, - clientAuthMethod, protocolVersionToAdvertise)); - } else { - LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}", - remoteAddress, state, clientAuthRole); - } + clientCnxSupplier = + () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData, + clientAuthMethod, protocolVersionToAdvertise); } else { - if (this.connectionPool == null) { - this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(), - () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise)); - } else { - LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}", - remoteAddress, state); - } + clientCnxSupplier = + () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise); + } + + if (this.connectionPool == null) { + this.connectionPool = new ConnectionPool(clientConf, service.getWorkerGroup(), + clientCnxSupplier, Optional.of(dnsNameResolver)); + } else { + LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}", + remoteAddress, state, clientAuthRole); } LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}", diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java deleted file mode 100644 index 4dcb09570c667..0000000000000 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.proxy.server; - -import io.netty.channel.EventLoopGroup; -import java.io.IOException; -import java.util.concurrent.ExecutionException; -import java.util.function.Supplier; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.ClientCnx; -import org.apache.pulsar.client.impl.ConnectionPool; -import org.apache.pulsar.client.impl.conf.ClientConfigurationData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ProxyConnectionPool extends ConnectionPool { - public ProxyConnectionPool(ClientConfigurationData clientConfig, EventLoopGroup eventLoopGroup, - Supplier clientCnxSupplier) throws PulsarClientException { - super(clientConfig, eventLoopGroup, clientCnxSupplier); - } - - @Override - public void close() throws IOException { - log.info("Closing ProxyConnectionPool."); - pool.forEach((address, clientCnxPool) -> { - if (clientCnxPool != null) { - clientCnxPool.forEach((identifier, clientCnx) -> { - if (clientCnx != null && clientCnx.isDone()) { - try { - clientCnx.get().close(); - } catch (InterruptedException | ExecutionException e) { - log.error("Unable to close get client connection future.", e); - } - } - }); - } - }); - dnsResolver.close(); - } - - private static final Logger log = LoggerFactory.getLogger(ProxyConnectionPool.class); -} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 6a830657423c1..10b99aeaff1c3 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -63,6 +63,7 @@ import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.util.netty.DnsResolverUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -163,6 +164,8 @@ public ProxyService(ProxyConfiguration proxyConfig, DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(workerGroup.next()) .channelType(EventLoopUtil.getDatagramChannelClass(workerGroup)); + DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder); + dnsNameResolver = dnsNameResolverBuilder.build(); brokerProxyValidator = new BrokerProxyValidator(dnsNameResolver.asAddressResolver(), diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java index e5f15b66d31cc..1a588b481fc56 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java @@ -173,7 +173,7 @@ public SslHandler get() { } ch.pipeline().addLast("handler", - new ProxyConnection(proxyService, sslHandlerSupplier)); + new ProxyConnection(proxyService, sslHandlerSupplier, proxyService.getDnsNameResolver())); } }