diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 8e28c87173bdc..a0ffe5d3febf7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -18,11 +18,9 @@ */ package org.apache.pulsar.client.impl; -import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture; import static org.apache.pulsar.client.util.MathUtils.signSafeMod; - +import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture; import com.google.common.annotations.VisibleForTesting; - import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelException; @@ -31,9 +29,6 @@ import io.netty.resolver.dns.DnsNameResolver; import io.netty.resolver.dns.DnsNameResolverBuilder; import io.netty.util.concurrent.Future; - -import java.io.Closeable; -import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URI; @@ -45,9 +40,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; - import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL; @@ -58,7 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ConnectionPool implements Closeable { +public class ConnectionPool implements AutoCloseable { protected final ConcurrentHashMap>> pool; private final Bootstrap bootstrap; @@ -222,7 +215,7 @@ private CompletableFuture createConnection(InetSocketAddress logicalA } /** - * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server + * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server. */ private CompletableFuture createConnection(InetSocketAddress unresolvedAddress) { int port; @@ -247,27 +240,32 @@ private CompletableFuture createConnection(InetSocketAddress unresolved } /** - * Try to connect to a sequence of IP addresses until a successfull connection can be made, or fail if no address is - * working + * Try to connect to a sequence of IP addresses until a successful connection can be made, or fail if no + * address is working. */ - private CompletableFuture connectToResolvedAddresses(Iterator unresolvedAddresses, int port, InetSocketAddress sniHost) { + private CompletableFuture connectToResolvedAddresses(Iterator unresolvedAddresses, + int port, + InetSocketAddress sniHost) { CompletableFuture future = new CompletableFuture<>(); // Successfully connected to server - connectToAddress(unresolvedAddresses.next(), port, sniHost).thenAccept(future::complete).exceptionally(exception -> { - if (unresolvedAddresses.hasNext()) { - // Try next IP address - connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete).exceptionally(ex -> { - // This is already unwinding the recursive call - future.completeExceptionally(ex); + connectToAddress(unresolvedAddresses.next(), port, sniHost) + .thenAccept(future::complete) + .exceptionally(exception -> { + if (unresolvedAddresses.hasNext()) { + // Try next IP address + connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete) + .exceptionally(ex -> { + // This is already unwinding the recursive call + future.completeExceptionally(ex); + return null; + }); + } else { + // Failed to connect to any IP address + future.completeExceptionally(exception); + } return null; }); - } else { - // Failed to connect to any IP address - future.completeExceptionally(exception); - } - return null; - }); return future; } @@ -285,7 +283,7 @@ CompletableFuture> resolveName(String hostname) { } /** - * Attempt to establish a TCP connection to an already resolved single IP address + * Attempt to establish a TCP connection to an already resolved single IP address. */ private CompletableFuture connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) { InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port); @@ -303,7 +301,7 @@ public void releaseConnection(ClientCnx cnx) { if (maxConnectionsPerHosts == 0) { //Disable pooling if (cnx.channel().isActive()) { - if(log.isDebugEnabled()) { + if (log.isDebugEnabled()) { log.debug("close connection due to pooling disabled."); } cnx.close(); @@ -312,14 +310,8 @@ public void releaseConnection(ClientCnx cnx) { } @Override - public void close() throws IOException { - try { - if (!eventLoopGroup.isShutdown()) { - eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await(); - } - } catch (InterruptedException e) { - log.warn("EventLoopGroup shutdown was interrupted", e); - } + public void close() throws Exception { + closeAllConnections(); dnsResolver.close(); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index f1b780726489c..ff392ca0f09e3 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -21,8 +21,9 @@ import static com.google.common.base.Preconditions.checkArgument; import java.net.SocketAddress; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; @@ -34,11 +35,9 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarChannelInitializer; -import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.protocol.Commands; @@ -68,8 +67,9 @@ */ public class ProxyConnection extends PulsarHandler implements FutureListener { // ConnectionPool is used by the proxy to issue lookup requests - private PulsarClientImpl client; private ConnectionPool connectionPool; + private final AtomicLong requestIdGenerator = + new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2)); private ProxyService service; AuthenticationDataSource authenticationData; private State state; @@ -113,7 +113,7 @@ enum State { } ConnectionPool getConnectionPool() { - return client.getCnxPool(); + return connectionPool; } public ProxyConnection(ProxyService proxyService, Supplier sslHandlerSupplier) { @@ -130,7 +130,6 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception { if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) { ctx.close(); ProxyService.rejectedConnections.inc(); - return; } } @@ -149,26 +148,27 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { + public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); if (directProxyHandler != null && directProxyHandler.outboundChannel != null) { directProxyHandler.outboundChannel.close(); + directProxyHandler = null; } - if (client != null) { - client.close(); - } service.getClientCnxs().remove(this); LOG.info("[{}] Connection closed", remoteAddress); if (connectionPool != null) { try { connectionPool.close(); + connectionPool = null; } catch (Exception e) { LOG.error("Failed to close connection pool {}", e.getMessage(), e); } } + + state = State.Closed; } @Override @@ -222,7 +222,30 @@ public void operationComplete(Future future) throws Exception { } } - private void completeConnect() { + private synchronized void completeConnect(AuthData clientData) throws PulsarClientException { + if (service.getConfiguration().isAuthenticationEnabled()) { + if (service.getConfiguration().isForwardAuthorizationCredentials()) { + this.clientAuthData = clientData; + this.clientAuthMethod = authMethod; + } + if (this.connectionPool == null) { + this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(), + () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData, + clientAuthMethod, protocolVersionToAdvertise)); + } else { + LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}", + remoteAddress, state, clientAuthRole); + } + } else { + if (this.connectionPool == null) { + this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(), + () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise)); + } else { + LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}", + remoteAddress, state); + } + } + LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}", remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl); if (hasProxyToBrokerUrl) { @@ -242,17 +265,6 @@ private void completeConnect() { } } - private void createClientAndCompleteConnect(AuthData clientData) - throws PulsarClientException { - if (service.getConfiguration().isForwardAuthorizationCredentials()) { - this.clientAuthData = clientData; - this.clientAuthMethod = authMethod; - } - this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod, protocolVersionToAdvertise); - - completeConnect(); - } - // According to auth result, send newConnected or newAuthChallenge command. private void doAuthentication(AuthData clientData) throws Exception { AuthData brokerData = authState.authenticate(clientData); @@ -263,7 +275,7 @@ private void doAuthentication(AuthData clientData) throws Exception { LOG.debug("[{}] Client successfully authenticated with {} role {}", remoteAddress, authMethod, clientAuthRole); } - createClientAndCompleteConnect(clientData); + completeConnect(clientData); return; } @@ -274,7 +286,6 @@ private void doAuthentication(AuthData clientData) throws Exception { remoteAddress, authMethod); } state = State.Connecting; - return; } @Override @@ -302,16 +313,10 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), try { // init authn this.clientConf = createClientConfiguration(); - int protocolVersion = getProtocolVersionToAdvertise(connect); // authn not enabled, complete if (!service.getConfiguration().isAuthenticationEnabled()) { - this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(), - () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)); - this.client = - new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer()); - - completeConnect(); + completeConnect(null); return; } @@ -336,7 +341,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), .orElseThrow(() -> new AuthenticationException("No anonymous role, and no authentication provider configured")); - createClientAndCompleteConnect(clientData); + completeConnect(clientData); return; } @@ -354,7 +359,6 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e); ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate")); close(); - return; } } @@ -409,19 +413,26 @@ protected void handleLookup(CommandLookupTopic lookup) { lookupProxyHandler.handleLookup(lookup); } - private void close() { - state = State.Closed; - ctx.close(); - try { - if (client != null) { - client.close(); + private synchronized void close() { + if (state != State.Closed) { + state = State.Closed; + if (directProxyHandler != null && directProxyHandler.outboundChannel != null) { + directProxyHandler.outboundChannel.close(); + directProxyHandler = null; + } + if (connectionPool != null) { + try { + connectionPool.close(); + connectionPool = null; + } catch (Exception e) { + LOG.error("Error closing connection pool", e); + } } - } catch (PulsarClientException e) { - LOG.error("Unable to close pulsar client - {}. Error - {}", client, e.getMessage()); + ctx.close(); } } - ClientConfigurationData createClientConfiguration() throws UnsupportedAuthenticationException { + ClientConfigurationData createClientConfiguration() { ClientConfigurationData clientConf = new ClientConfigurationData(); clientConf.setServiceUrl(service.getServiceUrl()); ProxyConfiguration proxyConfig = service.getConfiguration(); @@ -441,20 +452,12 @@ ClientConfigurationData createClientConfiguration() throws UnsupportedAuthentica return clientConf; } - private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData, - final String clientAuthMethod, final int protocolVersion) throws PulsarClientException { - this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(), - () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData, - clientAuthMethod, protocolVersion)); - return new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer()); - } - private static int getProtocolVersionToAdvertise(CommandConnect connect) { return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion()); } long newRequestId() { - return client.newRequestId(); + return requestIdGenerator.getAndIncrement(); } public Authentication getClientAuthentication() { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 784f300b68b8c..c0e8e761bb853 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -29,8 +29,6 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timer; import io.prometheus.client.Counter; import io.prometheus.client.Gauge; import lombok.Getter; @@ -80,7 +78,6 @@ public class ProxyService implements Closeable { private final ProxyConfiguration proxyConfig; private final Authentication proxyClientAuthentication; - private final Timer timer; private String serviceUrl; private String serviceUrlTls; private ConfigurationMetadataCacheService configurationCacheService; @@ -142,7 +139,6 @@ public ProxyService(ProxyConfiguration proxyConfig, AuthenticationService authenticationService) throws Exception { checkNotNull(proxyConfig); this.proxyConfig = proxyConfig; - this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS); this.clientCnxs = Sets.newConcurrentHashSet(); this.topicStats = Maps.newConcurrentMap(); @@ -345,9 +341,6 @@ public void close() throws IOException { for (EventLoopGroup group : extensionsWorkerGroups) { group.shutdownGracefully(); } - if (timer != null) { - timer.stop(); - } } public String getServiceUrl() { @@ -362,10 +355,6 @@ public ProxyConfiguration getConfiguration() { return proxyConfig; } - public Timer getTimer() { - return timer; - } - public AuthenticationService getAuthenticationService() { return authenticationService; }