Skip to content

Commit

Permalink
[Proxy] Remove unnecessary Pulsar Client usage from Pulsar Proxy (#13836
Browse files Browse the repository at this point in the history
)
  • Loading branch information
lhotari committed Jan 19, 2022
1 parent f09e3fa commit 324aa1b
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
*/
package org.apache.pulsar.client.impl;

import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture;
import static org.apache.pulsar.client.util.MathUtils.signSafeMod;

import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture;
import com.google.common.annotations.VisibleForTesting;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
Expand All @@ -31,9 +29,6 @@
import io.netty.resolver.dns.DnsNameResolver;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.util.concurrent.Future;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
Expand All @@ -45,9 +40,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
Expand All @@ -58,7 +51,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionPool implements Closeable {
public class ConnectionPool implements AutoCloseable {
protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;

private final Bootstrap bootstrap;
Expand Down Expand Up @@ -227,7 +220,7 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA
}

/**
* Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server
* Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server.
*/
private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
int port;
Expand All @@ -252,27 +245,32 @@ private CompletableFuture<Channel> createConnection(InetSocketAddress unresolved
}

/**
* Try to connect to a sequence of IP addresses until a successfull connection can be made, or fail if no address is
* working
* Try to connect to a sequence of IP addresses until a successful connection can be made, or fail if no
* address is working.
*/
private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port, InetSocketAddress sniHost) {
private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses,
int port,
InetSocketAddress sniHost) {
CompletableFuture<Channel> future = new CompletableFuture<>();

// Successfully connected to server
connectToAddress(unresolvedAddresses.next(), port, sniHost).thenAccept(future::complete).exceptionally(exception -> {
if (unresolvedAddresses.hasNext()) {
// Try next IP address
connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete).exceptionally(ex -> {
// This is already unwinding the recursive call
future.completeExceptionally(ex);
connectToAddress(unresolvedAddresses.next(), port, sniHost)
.thenAccept(future::complete)
.exceptionally(exception -> {
if (unresolvedAddresses.hasNext()) {
// Try next IP address
connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete)
.exceptionally(ex -> {
// This is already unwinding the recursive call
future.completeExceptionally(ex);
return null;
});
} else {
// Failed to connect to any IP address
future.completeExceptionally(exception);
}
return null;
});
} else {
// Failed to connect to any IP address
future.completeExceptionally(exception);
}
return null;
});

return future;
}
Expand All @@ -290,20 +288,19 @@ CompletableFuture<List<InetAddress>> resolveName(String hostname) {
}

/**
* Attempt to establish a TCP connection to an already resolved single IP address
* Attempt to establish a TCP connection to an already resolved single IP address.
*/
private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) {
InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port);
if (clientConfig.isUseTls()) {
return toCompletableFuture(bootstrap.register())
.thenCompose(channel -> channelInitializerHandler
.initTls(channel, sniHost != null ? sniHost : remoteAddress))
.thenCompose(channel -> channelInitializerHandler
.initSocks5IfConfig(channel))
.thenCompose(channelInitializerHandler::initSocks5IfConfig)
.thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress)));
} else {
return toCompletableFuture(bootstrap.register())
.thenCompose(channel -> channelInitializerHandler.initSocks5IfConfig(channel))
.thenCompose(channelInitializerHandler::initSocks5IfConfig)
.thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress)));
}
}
Expand All @@ -312,7 +309,7 @@ public void releaseConnection(ClientCnx cnx) {
if (maxConnectionsPerHosts == 0) {
//Disable pooling
if (cnx.channel().isActive()) {
if(log.isDebugEnabled()) {
if (log.isDebugEnabled()) {
log.debug("close connection due to pooling disabled.");
}
cnx.close();
Expand All @@ -321,14 +318,8 @@ public void releaseConnection(ClientCnx cnx) {
}

@Override
public void close() throws IOException {
try {
if (!eventLoopGroup.isShutdown()) {
eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await();
}
} catch (InterruptedException e) {
log.warn("EventLoopGroup shutdown was interrupted", e);
}
public void close() throws Exception {
closeAllConnections();
dnsResolver.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.net.SocketAddress;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
Expand All @@ -37,11 +39,9 @@
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarChannelInitializer;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
Expand All @@ -63,8 +63,9 @@
*/
public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
// ConnectionPool is used by the proxy to issue lookup requests
private PulsarClientImpl client;
private ConnectionPool connectionPool;
private final AtomicLong requestIdGenerator =
new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
private ProxyService service;
AuthenticationDataSource authenticationData;
private State state;
Expand Down Expand Up @@ -108,7 +109,7 @@ enum State {
}

ConnectionPool getConnectionPool() {
return client.getCnxPool();
return connectionPool;
}

public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) {
Expand All @@ -125,7 +126,6 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (ProxyService.ACTIVE_CONNECTIONS.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
ctx.close();
ProxyService.REJECTED_CONNECTIONS.inc();
return;
}
}

Expand All @@ -144,26 +144,27 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);

if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
directProxyHandler.outboundChannel.close();
directProxyHandler = null;
}

if (client != null) {
client.close();
}
service.getClientCnxs().remove(this);
LOG.info("[{}] Connection closed", remoteAddress);

if (connectionPool != null) {
try {
connectionPool.close();
connectionPool = null;
} catch (Exception e) {
LOG.error("Failed to close connection pool {}", e.getMessage(), e);
}
}

state = State.Closed;
}

@Override
Expand Down Expand Up @@ -217,7 +218,30 @@ public void operationComplete(Future<Void> future) throws Exception {
}
}

private void completeConnect() {
private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
if (service.getConfiguration().isAuthenticationEnabled()) {
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}
if (this.connectionPool == null) {
this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
clientAuthMethod, protocolVersionToAdvertise));
} else {
LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
remoteAddress, state, clientAuthRole);
}
} else {
if (this.connectionPool == null) {
this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise));
} else {
LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}",
remoteAddress, state);
}
}

LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
if (hasProxyToBrokerUrl) {
Expand All @@ -237,17 +261,6 @@ private void completeConnect() {
}
}

private void createClientAndCompleteConnect(AuthData clientData)
throws PulsarClientException {
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}
this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod, protocolVersionToAdvertise);

completeConnect();
}

// According to auth result, send newConnected or newAuthChallenge command.
private void doAuthentication(AuthData clientData) throws Exception {
AuthData brokerData = authState.authenticate(clientData);
Expand All @@ -258,7 +271,7 @@ private void doAuthentication(AuthData clientData) throws Exception {
LOG.debug("[{}] Client successfully authenticated with {} role {}",
remoteAddress, authMethod, clientAuthRole);
}
createClientAndCompleteConnect(clientData);
completeConnect(clientData);
return;
}

Expand All @@ -269,7 +282,6 @@ private void doAuthentication(AuthData clientData) throws Exception {
remoteAddress, authMethod);
}
state = State.Connecting;
return;
}

@Override
Expand Down Expand Up @@ -297,16 +309,10 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
try {
// init authn
this.clientConf = createClientConfiguration();
int protocolVersion = getProtocolVersionToAdvertise(connect);

// authn not enabled, complete
if (!service.getConfiguration().isAuthenticationEnabled()) {
this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion));
this.client =
new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());

completeConnect();
completeConnect(null);
return;
}

Expand All @@ -331,7 +337,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
.orElseThrow(() ->
new AuthenticationException("No anonymous role, and no authentication provider configured"));

createClientAndCompleteConnect(clientData);
completeConnect(clientData);
return;
}

Expand All @@ -349,7 +355,6 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"));
close();
return;
}
}

Expand Down Expand Up @@ -404,19 +409,26 @@ protected void handleLookup(CommandLookupTopic lookup) {
lookupProxyHandler.handleLookup(lookup);
}

private void close() {
state = State.Closed;
ctx.close();
try {
if (client != null) {
client.close();
private synchronized void close() {
if (state != State.Closed) {
state = State.Closed;
if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
directProxyHandler.outboundChannel.close();
directProxyHandler = null;
}
if (connectionPool != null) {
try {
connectionPool.close();
connectionPool = null;
} catch (Exception e) {
LOG.error("Error closing connection pool", e);
}
}
} catch (PulsarClientException e) {
LOG.error("Unable to close pulsar client - {}. Error - {}", client, e.getMessage());
ctx.close();
}
}

ClientConfigurationData createClientConfiguration() throws UnsupportedAuthenticationException {
ClientConfigurationData createClientConfiguration() {
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl(service.getServiceUrl());
ProxyConfiguration proxyConfig = service.getConfiguration();
Expand All @@ -436,20 +448,12 @@ ClientConfigurationData createClientConfiguration() throws UnsupportedAuthentica
return clientConf;
}

private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData,
final String clientAuthMethod, final int protocolVersion) throws PulsarClientException {
this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
clientAuthMethod, protocolVersion));
return new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
}

private static int getProtocolVersionToAdvertise(CommandConnect connect) {
return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion());
}

long newRequestId() {
return client.newRequestId();
return requestIdGenerator.getAndIncrement();
}

public Authentication getClientAuthentication() {
Expand Down
Loading

0 comments on commit 324aa1b

Please sign in to comment.