From 94fd1f5d087932373c0fd2a107897873d529bd37 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 9 Feb 2022 20:05:31 +0200 Subject: [PATCH] [Proxy] Remove unnecessary Pulsar Client usage from Pulsar Proxy (#13836) (cherry picked from commit 324aa1bf14d89a93b66ed3613a16c118cf9d4c0f) (cherry picked from commit 25e6b65e1e4bc20a1d1fc3c875f73982854954da) --- .../pulsar/client/impl/ConnectionPool.java | 64 +++++----- .../pulsar/proxy/server/ProxyConnection.java | 113 ++++++++++-------- .../pulsar/proxy/server/ProxyService.java | 11 -- 3 files changed, 92 insertions(+), 96 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 869c642914576..92b4a3cda70af 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -19,9 +19,7 @@ package org.apache.pulsar.client.impl; import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture; - import com.google.common.annotations.VisibleForTesting; - import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelException; @@ -30,9 +28,6 @@ import io.netty.resolver.dns.DnsNameResolver; import io.netty.resolver.dns.DnsNameResolverBuilder; import io.netty.util.concurrent.Future; - -import java.io.Closeable; -import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URI; @@ -44,9 +39,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; - import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL; @@ -57,7 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ConnectionPool implements Closeable { +public class ConnectionPool implements AutoCloseable { protected final ConcurrentHashMap>> pool; private final Bootstrap bootstrap; @@ -223,7 +216,7 @@ private CompletableFuture createConnection(InetSocketAddress logicalA } /** - * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server + * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server. */ private CompletableFuture createConnection(InetSocketAddress unresolvedAddress) { int port; @@ -248,31 +241,32 @@ private CompletableFuture createConnection(InetSocketAddress unresolved } /** - * Try to connect to a sequence of IP addresses until a successfull connection can be made, or fail if no address is - * working + * Try to connect to a sequence of IP addresses until a successful connection can be made, or fail if no + * address is working. */ - private CompletableFuture connectToResolvedAddresses(Iterator unresolvedAddresses, int port, InetSocketAddress sniHost) { + private CompletableFuture connectToResolvedAddresses(Iterator unresolvedAddresses, + int port, + InetSocketAddress sniHost) { CompletableFuture future = new CompletableFuture<>(); - connectToAddress(unresolvedAddresses.next(), port, sniHost).thenAccept(channel -> { - // Successfully connected to server - future.complete(channel); - }).exceptionally(exception -> { - if (unresolvedAddresses.hasNext()) { - // Try next IP address - connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(channel -> { - future.complete(channel); - }).exceptionally(ex -> { - // This is already unwinding the recursive call - future.completeExceptionally(ex); + // Successfully connected to server + connectToAddress(unresolvedAddresses.next(), port, sniHost) + .thenAccept(future::complete) + .exceptionally(exception -> { + if (unresolvedAddresses.hasNext()) { + // Try next IP address + connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete) + .exceptionally(ex -> { + // This is already unwinding the recursive call + future.completeExceptionally(ex); + return null; + }); + } else { + // Failed to connect to any IP address + future.completeExceptionally(exception); + } return null; }); - } else { - // Failed to connect to any IP address - future.completeExceptionally(exception); - } - return null; - }); return future; } @@ -291,7 +285,7 @@ CompletableFuture> resolveName(String hostname) { } /** - * Attempt to establish a TCP connection to an already resolved single IP address + * Attempt to establish a TCP connection to an already resolved single IP address. */ private CompletableFuture connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) { InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port); @@ -309,7 +303,7 @@ public void releaseConnection(ClientCnx cnx) { if (maxConnectionsPerHosts == 0) { //Disable pooling if (cnx.channel().isActive()) { - if(log.isDebugEnabled()) { + if (log.isDebugEnabled()) { log.debug("close connection due to pooling disabled."); } cnx.close(); @@ -318,12 +312,8 @@ public void releaseConnection(ClientCnx cnx) { } @Override - public void close() throws IOException { - try { - eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await(); - } catch (InterruptedException e) { - log.warn("EventLoopGroup shutdown was interrupted", e); - } + public void close() throws Exception { + closeAllConnections(); dnsResolver.close(); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index a483ee39a8c7e..c559120180e01 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -21,8 +21,9 @@ import static com.google.common.base.Preconditions.checkArgument; import java.net.SocketAddress; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; @@ -34,11 +35,9 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarChannelInitializer; -import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.protocol.Commands; @@ -68,7 +67,9 @@ */ public class ProxyConnection extends PulsarHandler implements FutureListener { // ConnectionPool is used by the proxy to issue lookup requests - private PulsarClientImpl client; + private ConnectionPool connectionPool; + private final AtomicLong requestIdGenerator = + new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2)); private ProxyService service; private Authentication clientAuthentication; AuthenticationDataSource authenticationData; @@ -111,7 +112,7 @@ enum State { } ConnectionPool getConnectionPool() { - return client.getCnxPool(); + return connectionPool; } public ProxyConnection(ProxyService proxyService, Supplier sslHandlerSupplier) { @@ -128,7 +129,6 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception { if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) { ctx.close(); ProxyService.rejectedConnections.inc(); - return; } } @@ -147,18 +147,27 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { + public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); if (directProxyHandler != null && directProxyHandler.outboundChannel != null) { directProxyHandler.outboundChannel.close(); + directProxyHandler = null; } - if (client != null) { - client.close(); - } service.getClientCnxs().remove(this); LOG.info("[{}] Connection closed", remoteAddress); + + if (connectionPool != null) { + try { + connectionPool.close(); + connectionPool = null; + } catch (Exception e) { + LOG.error("Failed to close connection pool {}", e.getMessage(), e); + } + } + + state = State.Closed; } @Override @@ -212,7 +221,30 @@ public void operationComplete(Future future) throws Exception { } } - private void completeConnect() { + private synchronized void completeConnect(AuthData clientData) throws PulsarClientException { + if (service.getConfiguration().isAuthenticationEnabled()) { + if (service.getConfiguration().isForwardAuthorizationCredentials()) { + this.clientAuthData = clientData; + this.clientAuthMethod = authMethod; + } + if (this.connectionPool == null) { + this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(), + () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData, + clientAuthMethod, protocolVersionToAdvertise)); + } else { + LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}", + remoteAddress, state, clientAuthRole); + } + } else { + if (this.connectionPool == null) { + this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(), + () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise)); + } else { + LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}", + remoteAddress, state); + } + } + LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}", remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl); if (hasProxyToBrokerUrl) { @@ -232,17 +264,6 @@ private void completeConnect() { } } - private void createClientAndCompleteConnect(AuthData clientData) - throws PulsarClientException { - if (service.getConfiguration().isForwardAuthorizationCredentials()) { - this.clientAuthData = clientData; - this.clientAuthMethod = authMethod; - } - this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod, protocolVersionToAdvertise); - - completeConnect(); - } - // According to auth result, send newConnected or newAuthChallenge command. private void doAuthentication(AuthData clientData) throws Exception { AuthData brokerData = authState.authenticate(clientData); @@ -253,7 +274,7 @@ private void doAuthentication(AuthData clientData) throws Exception { LOG.debug("[{}] Client successfully authenticated with {} role {}", remoteAddress, authMethod, clientAuthRole); } - createClientAndCompleteConnect(clientData); + completeConnect(clientData); return; } @@ -264,7 +285,6 @@ private void doAuthentication(AuthData clientData) throws Exception { remoteAddress, authMethod); } state = State.Connecting; - return; } @Override @@ -293,15 +313,10 @@ protected void handleConnect(CommandConnect connect) { // init authn this.clientConf = createClientConfiguration(); this.clientAuthentication = clientConf.getAuthentication(); - int protocolVersion = getProtocolVersionToAdvertise(connect); // authn not enabled, complete if (!service.getConfiguration().isAuthenticationEnabled()) { - this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup(), - new ProxyConnectionPool(clientConf, service.getWorkerGroup(), - () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)), service.getTimer()); - - completeConnect(); + completeConnect(null); return; } @@ -326,7 +341,7 @@ protected void handleConnect(CommandConnect connect) { .orElseThrow(() -> new AuthenticationException("No anonymous role, and no authentication provider configured")); - createClientAndCompleteConnect(clientData); + completeConnect(clientData); return; } @@ -344,7 +359,6 @@ protected void handleConnect(CommandConnect connect) { LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e); ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate")); close(); - return; } } @@ -399,17 +413,27 @@ protected void handleLookup(CommandLookupTopic lookup) { lookupProxyHandler.handleLookup(lookup); } - private void close() { - state = State.Closed; - ctx.close(); - try { - client.close(); - } catch (PulsarClientException e) { - LOG.error("Unable to close pulsar client - {}. Error - {}", client, e.getMessage()); + private synchronized void close() { + if (state != State.Closed) { + state = State.Closed; + if (directProxyHandler != null && directProxyHandler.outboundChannel != null) { + directProxyHandler.outboundChannel.close(); + directProxyHandler = null; + } + if (connectionPool != null) { + try { + connectionPool.close(); + connectionPool = null; + } catch (Exception e) { + LOG.error("Error closing connection pool", e); + } + } + ctx.close(); } } - ClientConfigurationData createClientConfiguration() throws UnsupportedAuthenticationException { + ClientConfigurationData createClientConfiguration() + throws PulsarClientException.UnsupportedAuthenticationException { ClientConfigurationData clientConf = new ClientConfigurationData(); clientConf.setServiceUrl(service.getServiceUrl()); ProxyConfiguration proxyConfig = service.getConfiguration(); @@ -432,19 +456,12 @@ ClientConfigurationData createClientConfiguration() throws UnsupportedAuthentica return clientConf; } - private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData, - final String clientAuthMethod, final int protocolVersion) throws PulsarClientException { - return new PulsarClientImpl(clientConf, service.getWorkerGroup(), - new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf, - service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion)), service.getTimer()); - } - private static int getProtocolVersionToAdvertise(CommandConnect connect) { return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion()); } long newRequestId() { - return client.newRequestId(); + return requestIdGenerator.getAndIncrement(); } public Authentication getClientAuthentication() { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index c2e157c06ea41..58ba3cfbc72b5 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -27,8 +27,6 @@ import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timer; import io.prometheus.client.Counter; import io.prometheus.client.Gauge; import lombok.Getter; @@ -69,7 +67,6 @@ public class ProxyService implements Closeable { private final ProxyConfiguration proxyConfig; - private final Timer timer; private String serviceUrl; private String serviceUrlTls; private ConfigurationCacheService configurationCacheService; @@ -127,7 +124,6 @@ public ProxyService(ProxyConfiguration proxyConfig, AuthenticationService authenticationService) throws IOException { checkNotNull(proxyConfig); this.proxyConfig = proxyConfig; - this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS); this.clientCnxs = Sets.newConcurrentHashSet(); this.topicStats = Maps.newConcurrentMap(); @@ -247,9 +243,6 @@ public void close() throws IOException { acceptorGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); - if (timer != null) { - timer.stop(); - } } public String getServiceUrl() { @@ -264,10 +257,6 @@ public ProxyConfiguration getConfiguration() { return proxyConfig; } - public Timer getTimer() { - return timer; - } - public AuthenticationService getAuthenticationService() { return authenticationService; }