From c0c949d734c9b67c8a718d1a92fc7aea1fb5b6da Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 19 Jan 2022 13:13:46 +0200 Subject: [PATCH] [Proxy] Remove unnecessary Pulsar Client usage from Pulsar Proxy --- .../pulsar/client/impl/ConnectionPool.java | 67 +++++------ .../pulsar/proxy/server/ProxyConnection.java | 104 +++++++++--------- .../pulsar/proxy/server/ProxyService.java | 12 -- 3 files changed, 83 insertions(+), 100 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 66792f4a5645e..06b5d24e8b48a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -18,11 +18,9 @@ */ package org.apache.pulsar.client.impl; -import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture; import static org.apache.pulsar.client.util.MathUtils.signSafeMod; - +import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture; import com.google.common.annotations.VisibleForTesting; - import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelException; @@ -31,9 +29,6 @@ import io.netty.resolver.dns.DnsNameResolver; import io.netty.resolver.dns.DnsNameResolverBuilder; import io.netty.util.concurrent.Future; - -import java.io.Closeable; -import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URI; @@ -45,9 +40,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; - import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL; @@ -58,7 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ConnectionPool implements Closeable { +public class ConnectionPool implements AutoCloseable { protected final ConcurrentHashMap>> pool; private final Bootstrap bootstrap; @@ -227,7 +220,7 @@ private CompletableFuture createConnection(InetSocketAddress logicalA } /** - * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server + * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server. */ private CompletableFuture createConnection(InetSocketAddress unresolvedAddress) { int port; @@ -252,27 +245,32 @@ private CompletableFuture createConnection(InetSocketAddress unresolved } /** - * Try to connect to a sequence of IP addresses until a successfull connection can be made, or fail if no address is - * working + * Try to connect to a sequence of IP addresses until a successful connection can be made, or fail if no + * address is working. */ - private CompletableFuture connectToResolvedAddresses(Iterator unresolvedAddresses, int port, InetSocketAddress sniHost) { + private CompletableFuture connectToResolvedAddresses(Iterator unresolvedAddresses, + int port, + InetSocketAddress sniHost) { CompletableFuture future = new CompletableFuture<>(); // Successfully connected to server - connectToAddress(unresolvedAddresses.next(), port, sniHost).thenAccept(future::complete).exceptionally(exception -> { - if (unresolvedAddresses.hasNext()) { - // Try next IP address - connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete).exceptionally(ex -> { - // This is already unwinding the recursive call - future.completeExceptionally(ex); + connectToAddress(unresolvedAddresses.next(), port, sniHost) + .thenAccept(future::complete) + .exceptionally(exception -> { + if (unresolvedAddresses.hasNext()) { + // Try next IP address + connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete) + .exceptionally(ex -> { + // This is already unwinding the recursive call + future.completeExceptionally(ex); + return null; + }); + } else { + // Failed to connect to any IP address + future.completeExceptionally(exception); + } return null; }); - } else { - // Failed to connect to any IP address - future.completeExceptionally(exception); - } - return null; - }); return future; } @@ -290,7 +288,7 @@ CompletableFuture> resolveName(String hostname) { } /** - * Attempt to establish a TCP connection to an already resolved single IP address + * Attempt to establish a TCP connection to an already resolved single IP address. */ private CompletableFuture connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) { InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port); @@ -298,12 +296,11 @@ private CompletableFuture connectToAddress(InetAddress ipAddress, int p return toCompletableFuture(bootstrap.register()) .thenCompose(channel -> channelInitializerHandler .initTls(channel, sniHost != null ? sniHost : remoteAddress)) - .thenCompose(channel -> channelInitializerHandler - .initSocks5IfConfig(channel)) + .thenCompose(channelInitializerHandler::initSocks5IfConfig) .thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress))); } else { return toCompletableFuture(bootstrap.register()) - .thenCompose(channel -> channelInitializerHandler.initSocks5IfConfig(channel)) + .thenCompose(channelInitializerHandler::initSocks5IfConfig) .thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress))); } } @@ -312,7 +309,7 @@ public void releaseConnection(ClientCnx cnx) { if (maxConnectionsPerHosts == 0) { //Disable pooling if (cnx.channel().isActive()) { - if(log.isDebugEnabled()) { + if (log.isDebugEnabled()) { log.debug("close connection due to pooling disabled."); } cnx.close(); @@ -321,14 +318,8 @@ public void releaseConnection(ClientCnx cnx) { } @Override - public void close() throws IOException { - try { - if (!eventLoopGroup.isShutdown()) { - eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await(); - } - } catch (InterruptedException e) { - log.warn("EventLoopGroup shutdown was interrupted", e); - } + public void close() throws Exception { + closeAllConnections(); dnsResolver.close(); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index a6dba1c8c20b6..491a227ecbd7a 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -27,7 +27,9 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import java.net.SocketAddress; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; @@ -37,11 +39,9 @@ import org.apache.pulsar.broker.authentication.AuthenticationState; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarChannelInitializer; -import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.api.proto.CommandAuthResponse; @@ -63,8 +63,9 @@ */ public class ProxyConnection extends PulsarHandler implements FutureListener { // ConnectionPool is used by the proxy to issue lookup requests - private PulsarClientImpl client; private ConnectionPool connectionPool; + private final AtomicLong requestIdGenerator = + new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2)); private ProxyService service; AuthenticationDataSource authenticationData; private State state; @@ -108,7 +109,7 @@ enum State { } ConnectionPool getConnectionPool() { - return client.getCnxPool(); + return connectionPool; } public ProxyConnection(ProxyService proxyService, Supplier sslHandlerSupplier) { @@ -125,7 +126,6 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception { if (ProxyService.ACTIVE_CONNECTIONS.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) { ctx.close(); ProxyService.REJECTED_CONNECTIONS.inc(); - return; } } @@ -144,26 +144,27 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { + public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); if (directProxyHandler != null && directProxyHandler.outboundChannel != null) { directProxyHandler.outboundChannel.close(); + directProxyHandler = null; } - if (client != null) { - client.close(); - } service.getClientCnxs().remove(this); LOG.info("[{}] Connection closed", remoteAddress); if (connectionPool != null) { try { connectionPool.close(); + connectionPool = null; } catch (Exception e) { LOG.error("Failed to close connection pool {}", e.getMessage(), e); } } + + state = State.Closed; } @Override @@ -217,7 +218,30 @@ public void operationComplete(Future future) throws Exception { } } - private void completeConnect() { + private synchronized void completeConnect(AuthData clientData) throws PulsarClientException { + if (service.getConfiguration().isAuthenticationEnabled()) { + if (service.getConfiguration().isForwardAuthorizationCredentials()) { + this.clientAuthData = clientData; + this.clientAuthMethod = authMethod; + } + if (this.connectionPool == null) { + this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(), + () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData, + clientAuthMethod, protocolVersionToAdvertise)); + } else { + LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}", + remoteAddress, state, clientAuthRole); + } + } else { + if (this.connectionPool == null) { + this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(), + () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise)); + } else { + LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}", + remoteAddress, state); + } + } + LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}", remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl); if (hasProxyToBrokerUrl) { @@ -237,17 +261,6 @@ private void completeConnect() { } } - private void createClientAndCompleteConnect(AuthData clientData) - throws PulsarClientException { - if (service.getConfiguration().isForwardAuthorizationCredentials()) { - this.clientAuthData = clientData; - this.clientAuthMethod = authMethod; - } - this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod, protocolVersionToAdvertise); - - completeConnect(); - } - // According to auth result, send newConnected or newAuthChallenge command. private void doAuthentication(AuthData clientData) throws Exception { AuthData brokerData = authState.authenticate(clientData); @@ -258,7 +271,7 @@ private void doAuthentication(AuthData clientData) throws Exception { LOG.debug("[{}] Client successfully authenticated with {} role {}", remoteAddress, authMethod, clientAuthRole); } - createClientAndCompleteConnect(clientData); + completeConnect(clientData); return; } @@ -269,7 +282,6 @@ private void doAuthentication(AuthData clientData) throws Exception { remoteAddress, authMethod); } state = State.Connecting; - return; } @Override @@ -297,16 +309,10 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), try { // init authn this.clientConf = createClientConfiguration(); - int protocolVersion = getProtocolVersionToAdvertise(connect); // authn not enabled, complete if (!service.getConfiguration().isAuthenticationEnabled()) { - this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(), - () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)); - this.client = - new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer()); - - completeConnect(); + completeConnect(null); return; } @@ -331,7 +337,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), .orElseThrow(() -> new AuthenticationException("No anonymous role, and no authentication provider configured")); - createClientAndCompleteConnect(clientData); + completeConnect(clientData); return; } @@ -349,7 +355,6 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e); ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate")); close(); - return; } } @@ -404,19 +409,26 @@ protected void handleLookup(CommandLookupTopic lookup) { lookupProxyHandler.handleLookup(lookup); } - private void close() { - state = State.Closed; - ctx.close(); - try { - if (client != null) { - client.close(); + private synchronized void close() { + if (state != State.Closed) { + state = State.Closed; + if (directProxyHandler != null && directProxyHandler.outboundChannel != null) { + directProxyHandler.outboundChannel.close(); + directProxyHandler = null; + } + if (connectionPool != null) { + try { + connectionPool.close(); + connectionPool = null; + } catch (Exception e) { + LOG.error("Error closing connection pool", e); + } } - } catch (PulsarClientException e) { - LOG.error("Unable to close pulsar client - {}. Error - {}", client, e.getMessage()); + ctx.close(); } } - ClientConfigurationData createClientConfiguration() throws UnsupportedAuthenticationException { + ClientConfigurationData createClientConfiguration() { ClientConfigurationData clientConf = new ClientConfigurationData(); clientConf.setServiceUrl(service.getServiceUrl()); ProxyConfiguration proxyConfig = service.getConfiguration(); @@ -436,20 +448,12 @@ ClientConfigurationData createClientConfiguration() throws UnsupportedAuthentica return clientConf; } - private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData, - final String clientAuthMethod, final int protocolVersion) throws PulsarClientException { - this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(), - () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData, - clientAuthMethod, protocolVersion)); - return new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer()); - } - private static int getProtocolVersionToAdvertise(CommandConnect connect) { return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion()); } long newRequestId() { - return client.newRequestId(); + return requestIdGenerator.getAndIncrement(); } public Authentication getClientAuthentication() { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 2af7ebf41d951..271c85f1daa89 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -28,8 +28,6 @@ import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; import io.prometheus.client.Counter; import io.prometheus.client.Gauge; @@ -75,7 +73,6 @@ public class ProxyService implements Closeable { private final ProxyConfiguration proxyConfig; private final Authentication proxyClientAuthentication; - private final Timer timer; private String serviceUrl; private String serviceUrlTls; private final AuthenticationService authenticationService; @@ -137,8 +134,6 @@ public ProxyService(ProxyConfiguration proxyConfig, AuthenticationService authenticationService) throws Exception { requireNonNull(proxyConfig); this.proxyConfig = proxyConfig; - this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", - Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS); this.clientCnxs = Sets.newConcurrentHashSet(); this.topicStats = new ConcurrentHashMap<>(); @@ -342,9 +337,6 @@ public void close() throws IOException { for (EventLoopGroup group : extensionsWorkerGroups) { group.shutdownGracefully(); } - if (timer != null) { - timer.stop(); - } } public String getServiceUrl() { @@ -359,10 +351,6 @@ public ProxyConfiguration getConfiguration() { return proxyConfig; } - public Timer getTimer() { - return timer; - } - public AuthenticationService getAuthenticationService() { return authenticationService; }