Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proxy] Remove unnecessary Pulsar Client usage from Pulsar Proxy #13836

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
*/
package org.apache.pulsar.client.impl;

import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture;
import static org.apache.pulsar.client.util.MathUtils.signSafeMod;

import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture;
import com.google.common.annotations.VisibleForTesting;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
Expand All @@ -31,9 +29,6 @@
import io.netty.resolver.dns.DnsNameResolver;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.util.concurrent.Future;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
Expand All @@ -45,9 +40,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
Expand All @@ -58,7 +51,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionPool implements Closeable {
public class ConnectionPool implements AutoCloseable {
protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;

private final Bootstrap bootstrap;
Expand Down Expand Up @@ -227,7 +220,7 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA
}

/**
* Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server
* Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server.
*/
private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
int port;
Expand All @@ -252,27 +245,32 @@ private CompletableFuture<Channel> createConnection(InetSocketAddress unresolved
}

/**
* Try to connect to a sequence of IP addresses until a successfull connection can be made, or fail if no address is
* working
* Try to connect to a sequence of IP addresses until a successful connection can be made, or fail if no
* address is working.
*/
private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port, InetSocketAddress sniHost) {
private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses,
int port,
InetSocketAddress sniHost) {
CompletableFuture<Channel> future = new CompletableFuture<>();

// Successfully connected to server
connectToAddress(unresolvedAddresses.next(), port, sniHost).thenAccept(future::complete).exceptionally(exception -> {
if (unresolvedAddresses.hasNext()) {
// Try next IP address
connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete).exceptionally(ex -> {
// This is already unwinding the recursive call
future.completeExceptionally(ex);
connectToAddress(unresolvedAddresses.next(), port, sniHost)
.thenAccept(future::complete)
.exceptionally(exception -> {
if (unresolvedAddresses.hasNext()) {
// Try next IP address
connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete)
.exceptionally(ex -> {
// This is already unwinding the recursive call
future.completeExceptionally(ex);
return null;
});
} else {
// Failed to connect to any IP address
future.completeExceptionally(exception);
}
return null;
});
} else {
// Failed to connect to any IP address
future.completeExceptionally(exception);
}
return null;
});

return future;
}
Expand All @@ -290,20 +288,19 @@ CompletableFuture<List<InetAddress>> resolveName(String hostname) {
}

/**
* Attempt to establish a TCP connection to an already resolved single IP address
* Attempt to establish a TCP connection to an already resolved single IP address.
*/
private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) {
InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port);
if (clientConfig.isUseTls()) {
return toCompletableFuture(bootstrap.register())
.thenCompose(channel -> channelInitializerHandler
.initTls(channel, sniHost != null ? sniHost : remoteAddress))
.thenCompose(channel -> channelInitializerHandler
.initSocks5IfConfig(channel))
.thenCompose(channelInitializerHandler::initSocks5IfConfig)
.thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress)));
} else {
return toCompletableFuture(bootstrap.register())
.thenCompose(channel -> channelInitializerHandler.initSocks5IfConfig(channel))
.thenCompose(channelInitializerHandler::initSocks5IfConfig)
.thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress)));
}
}
Expand All @@ -312,7 +309,7 @@ public void releaseConnection(ClientCnx cnx) {
if (maxConnectionsPerHosts == 0) {
//Disable pooling
if (cnx.channel().isActive()) {
if(log.isDebugEnabled()) {
if (log.isDebugEnabled()) {
log.debug("close connection due to pooling disabled.");
}
cnx.close();
Expand All @@ -321,14 +318,8 @@ public void releaseConnection(ClientCnx cnx) {
}

@Override
public void close() throws IOException {
try {
if (!eventLoopGroup.isShutdown()) {
eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await();
}
} catch (InterruptedException e) {
log.warn("EventLoopGroup shutdown was interrupted", e);
}
public void close() throws Exception {
closeAllConnections();
dnsResolver.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.net.SocketAddress;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
Expand All @@ -37,11 +39,9 @@
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarChannelInitializer;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
Expand All @@ -63,8 +63,9 @@
*/
public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
// ConnectionPool is used by the proxy to issue lookup requests
private PulsarClientImpl client;
private ConnectionPool connectionPool;
private final AtomicLong requestIdGenerator =
new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
private ProxyService service;
AuthenticationDataSource authenticationData;
private State state;
Expand Down Expand Up @@ -108,7 +109,7 @@ enum State {
}

ConnectionPool getConnectionPool() {
return client.getCnxPool();
return connectionPool;
}

public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) {
Expand All @@ -125,7 +126,6 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (ProxyService.ACTIVE_CONNECTIONS.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
ctx.close();
ProxyService.REJECTED_CONNECTIONS.inc();
return;
}
}

Expand All @@ -144,26 +144,27 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);

if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
directProxyHandler.outboundChannel.close();
directProxyHandler = null;
}

if (client != null) {
client.close();
}
service.getClientCnxs().remove(this);
LOG.info("[{}] Connection closed", remoteAddress);

if (connectionPool != null) {
try {
connectionPool.close();
connectionPool = null;
} catch (Exception e) {
LOG.error("Failed to close connection pool {}", e.getMessage(), e);
}
}

state = State.Closed;
}

@Override
Expand Down Expand Up @@ -217,7 +218,30 @@ public void operationComplete(Future<Void> future) throws Exception {
}
}

private void completeConnect() {
private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
if (service.getConfiguration().isAuthenticationEnabled()) {
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}
if (this.connectionPool == null) {
this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
clientAuthMethod, protocolVersionToAdvertise));
} else {
LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
remoteAddress, state, clientAuthRole);
}
} else {
if (this.connectionPool == null) {
this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise));
} else {
LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}",
remoteAddress, state);
}
}

LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
if (hasProxyToBrokerUrl) {
Expand All @@ -237,17 +261,6 @@ private void completeConnect() {
}
}

private void createClientAndCompleteConnect(AuthData clientData)
throws PulsarClientException {
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}
this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod, protocolVersionToAdvertise);

completeConnect();
}

// According to auth result, send newConnected or newAuthChallenge command.
private void doAuthentication(AuthData clientData) throws Exception {
AuthData brokerData = authState.authenticate(clientData);
Expand All @@ -258,7 +271,7 @@ private void doAuthentication(AuthData clientData) throws Exception {
LOG.debug("[{}] Client successfully authenticated with {} role {}",
remoteAddress, authMethod, clientAuthRole);
}
createClientAndCompleteConnect(clientData);
completeConnect(clientData);
return;
}

Expand All @@ -269,7 +282,6 @@ private void doAuthentication(AuthData clientData) throws Exception {
remoteAddress, authMethod);
}
state = State.Connecting;
return;
}

@Override
Expand Down Expand Up @@ -297,16 +309,10 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
try {
// init authn
this.clientConf = createClientConfiguration();
int protocolVersion = getProtocolVersionToAdvertise(connect);

// authn not enabled, complete
if (!service.getConfiguration().isAuthenticationEnabled()) {
this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion));
this.client =
new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());

completeConnect();
completeConnect(null);
return;
}

Expand All @@ -331,7 +337,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
.orElseThrow(() ->
new AuthenticationException("No anonymous role, and no authentication provider configured"));

createClientAndCompleteConnect(clientData);
completeConnect(clientData);
return;
}

Expand All @@ -349,7 +355,6 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"));
close();
return;
}
}

Expand Down Expand Up @@ -404,19 +409,26 @@ protected void handleLookup(CommandLookupTopic lookup) {
lookupProxyHandler.handleLookup(lookup);
}

private void close() {
state = State.Closed;
ctx.close();
try {
if (client != null) {
client.close();
private synchronized void close() {
if (state != State.Closed) {
state = State.Closed;
if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
directProxyHandler.outboundChannel.close();
directProxyHandler = null;
}
if (connectionPool != null) {
try {
connectionPool.close();
connectionPool = null;
} catch (Exception e) {
LOG.error("Error closing connection pool", e);
}
}
} catch (PulsarClientException e) {
LOG.error("Unable to close pulsar client - {}. Error - {}", client, e.getMessage());
ctx.close();
}
}

ClientConfigurationData createClientConfiguration() throws UnsupportedAuthenticationException {
ClientConfigurationData createClientConfiguration() {
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl(service.getServiceUrl());
ProxyConfiguration proxyConfig = service.getConfiguration();
Expand All @@ -436,20 +448,12 @@ ClientConfigurationData createClientConfiguration() throws UnsupportedAuthentica
return clientConf;
}

private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData,
final String clientAuthMethod, final int protocolVersion) throws PulsarClientException {
this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
clientAuthMethod, protocolVersion));
return new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
}

private static int getProtocolVersionToAdvertise(CommandConnect connect) {
return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion());
}

long newRequestId() {
return client.newRequestId();
return requestIdGenerator.getAndIncrement();
}

public Authentication getClientAuthentication() {
Expand Down
Loading