Skip to content

Commit

Permalink
[Proxy] Remove unnecessary Pulsar Client usage from Pulsar Proxy (apa…
Browse files Browse the repository at this point in the history
…che#13836)

(cherry picked from commit 324aa1b)
(cherry picked from commit 25e6b65)
  • Loading branch information
lhotari committed Feb 9, 2022
1 parent 820b069 commit 3a02e0a
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
*/
package org.apache.pulsar.client.impl;

import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture;
import static org.apache.pulsar.client.util.MathUtils.signSafeMod;

import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture;
import com.google.common.annotations.VisibleForTesting;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
Expand All @@ -31,9 +29,6 @@
import io.netty.resolver.dns.DnsNameResolver;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.util.concurrent.Future;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
Expand All @@ -45,9 +40,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
Expand All @@ -58,7 +51,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionPool implements Closeable {
public class ConnectionPool implements AutoCloseable {
protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;

private final Bootstrap bootstrap;
Expand Down Expand Up @@ -222,7 +215,7 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA
}

/**
* Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server
* Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server.
*/
private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
int port;
Expand All @@ -247,27 +240,32 @@ private CompletableFuture<Channel> createConnection(InetSocketAddress unresolved
}

/**
* Try to connect to a sequence of IP addresses until a successfull connection can be made, or fail if no address is
* working
* Try to connect to a sequence of IP addresses until a successful connection can be made, or fail if no
* address is working.
*/
private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port, InetSocketAddress sniHost) {
private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses,
int port,
InetSocketAddress sniHost) {
CompletableFuture<Channel> future = new CompletableFuture<>();

// Successfully connected to server
connectToAddress(unresolvedAddresses.next(), port, sniHost).thenAccept(future::complete).exceptionally(exception -> {
if (unresolvedAddresses.hasNext()) {
// Try next IP address
connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete).exceptionally(ex -> {
// This is already unwinding the recursive call
future.completeExceptionally(ex);
connectToAddress(unresolvedAddresses.next(), port, sniHost)
.thenAccept(future::complete)
.exceptionally(exception -> {
if (unresolvedAddresses.hasNext()) {
// Try next IP address
connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete)
.exceptionally(ex -> {
// This is already unwinding the recursive call
future.completeExceptionally(ex);
return null;
});
} else {
// Failed to connect to any IP address
future.completeExceptionally(exception);
}
return null;
});
} else {
// Failed to connect to any IP address
future.completeExceptionally(exception);
}
return null;
});

return future;
}
Expand All @@ -285,7 +283,7 @@ CompletableFuture<List<InetAddress>> resolveName(String hostname) {
}

/**
* Attempt to establish a TCP connection to an already resolved single IP address
* Attempt to establish a TCP connection to an already resolved single IP address.
*/
private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) {
InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port);
Expand All @@ -303,7 +301,7 @@ public void releaseConnection(ClientCnx cnx) {
if (maxConnectionsPerHosts == 0) {
//Disable pooling
if (cnx.channel().isActive()) {
if(log.isDebugEnabled()) {
if (log.isDebugEnabled()) {
log.debug("close connection due to pooling disabled.");
}
cnx.close();
Expand All @@ -312,14 +310,8 @@ public void releaseConnection(ClientCnx cnx) {
}

@Override
public void close() throws IOException {
try {
if (!eventLoopGroup.isShutdown()) {
eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await();
}
} catch (InterruptedException e) {
log.warn("EventLoopGroup shutdown was interrupted", e);
}
public void close() throws Exception {
closeAllConnections();
dnsResolver.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import static com.google.common.base.Preconditions.checkArgument;

import java.net.SocketAddress;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
Expand All @@ -34,11 +35,9 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarChannelInitializer;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.protocol.Commands;
Expand Down Expand Up @@ -68,8 +67,9 @@
*/
public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
// ConnectionPool is used by the proxy to issue lookup requests
private PulsarClientImpl client;
private ConnectionPool connectionPool;
private final AtomicLong requestIdGenerator =
new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
private ProxyService service;
AuthenticationDataSource authenticationData;
private State state;
Expand Down Expand Up @@ -113,7 +113,7 @@ enum State {
}

ConnectionPool getConnectionPool() {
return client.getCnxPool();
return connectionPool;
}

public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) {
Expand All @@ -130,7 +130,6 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
ctx.close();
ProxyService.rejectedConnections.inc();
return;
}
}

Expand All @@ -149,26 +148,27 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);

if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
directProxyHandler.outboundChannel.close();
directProxyHandler = null;
}

if (client != null) {
client.close();
}
service.getClientCnxs().remove(this);
LOG.info("[{}] Connection closed", remoteAddress);

if (connectionPool != null) {
try {
connectionPool.close();
connectionPool = null;
} catch (Exception e) {
LOG.error("Failed to close connection pool {}", e.getMessage(), e);
}
}

state = State.Closed;
}

@Override
Expand Down Expand Up @@ -222,7 +222,30 @@ public void operationComplete(Future<Void> future) throws Exception {
}
}

private void completeConnect() {
private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
if (service.getConfiguration().isAuthenticationEnabled()) {
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}
if (this.connectionPool == null) {
this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
clientAuthMethod, protocolVersionToAdvertise));
} else {
LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
remoteAddress, state, clientAuthRole);
}
} else {
if (this.connectionPool == null) {
this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise));
} else {
LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}",
remoteAddress, state);
}
}

LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
if (hasProxyToBrokerUrl) {
Expand All @@ -242,17 +265,6 @@ private void completeConnect() {
}
}

private void createClientAndCompleteConnect(AuthData clientData)
throws PulsarClientException {
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}
this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod, protocolVersionToAdvertise);

completeConnect();
}

// According to auth result, send newConnected or newAuthChallenge command.
private void doAuthentication(AuthData clientData) throws Exception {
AuthData brokerData = authState.authenticate(clientData);
Expand All @@ -263,7 +275,7 @@ private void doAuthentication(AuthData clientData) throws Exception {
LOG.debug("[{}] Client successfully authenticated with {} role {}",
remoteAddress, authMethod, clientAuthRole);
}
createClientAndCompleteConnect(clientData);
completeConnect(clientData);
return;
}

Expand All @@ -274,7 +286,6 @@ private void doAuthentication(AuthData clientData) throws Exception {
remoteAddress, authMethod);
}
state = State.Connecting;
return;
}

@Override
Expand Down Expand Up @@ -302,16 +313,10 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
try {
// init authn
this.clientConf = createClientConfiguration();
int protocolVersion = getProtocolVersionToAdvertise(connect);

// authn not enabled, complete
if (!service.getConfiguration().isAuthenticationEnabled()) {
this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion));
this.client =
new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());

completeConnect();
completeConnect(null);
return;
}

Expand All @@ -336,7 +341,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
.orElseThrow(() ->
new AuthenticationException("No anonymous role, and no authentication provider configured"));

createClientAndCompleteConnect(clientData);
completeConnect(clientData);
return;
}

Expand All @@ -354,7 +359,6 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"));
close();
return;
}
}

Expand Down Expand Up @@ -409,19 +413,26 @@ protected void handleLookup(CommandLookupTopic lookup) {
lookupProxyHandler.handleLookup(lookup);
}

private void close() {
state = State.Closed;
ctx.close();
try {
if (client != null) {
client.close();
private synchronized void close() {
if (state != State.Closed) {
state = State.Closed;
if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
directProxyHandler.outboundChannel.close();
directProxyHandler = null;
}
if (connectionPool != null) {
try {
connectionPool.close();
connectionPool = null;
} catch (Exception e) {
LOG.error("Error closing connection pool", e);
}
}
} catch (PulsarClientException e) {
LOG.error("Unable to close pulsar client - {}. Error - {}", client, e.getMessage());
ctx.close();
}
}

ClientConfigurationData createClientConfiguration() throws UnsupportedAuthenticationException {
ClientConfigurationData createClientConfiguration() {
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl(service.getServiceUrl());
ProxyConfiguration proxyConfig = service.getConfiguration();
Expand All @@ -441,20 +452,12 @@ ClientConfigurationData createClientConfiguration() throws UnsupportedAuthentica
return clientConf;
}

private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData,
final String clientAuthMethod, final int protocolVersion) throws PulsarClientException {
this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
clientAuthMethod, protocolVersion));
return new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
}

private static int getProtocolVersionToAdvertise(CommandConnect connect) {
return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion());
}

long newRequestId() {
return client.newRequestId();
return requestIdGenerator.getAndIncrement();
}

public Authentication getClientAuthentication() {
Expand Down
Loading

0 comments on commit 3a02e0a

Please sign in to comment.