Skip to content

Commit

Permalink
Fix Pulsar Proxy to re-use authentication instance (#12245)
Browse files Browse the repository at this point in the history
* Fix Pulsar Proxy to re-use authentication instance

Currently, the Pulsar Proxy creates a new PulsarClientImpl with a new
instance of the client authentication plugin.

For certain client auth implementations, this can cause issues. For
example, if a client plugin needs to generate a token and then cache and
re-use it (which is very common with typical Pulsar client usage) this
pattern breaks, because the client auth plugin is tied to the
lifecycle of the connection and not the more "singleton" usage of the
Pulsar client.

Arguably, we should instead figure out how to re-use the entire Pulsar
client, but that likely has more complexity, instead this "quick fix"
will get one of the most obvious cases solved.

* add test for ensuring all same auth instance

* Simplify ProxyAuthTest

It isn't clear why this test was doing timeouts... it doesn't really
seem to be testing anything as the auth token was not being refreshed
and it appears the test was expected to pass (it almost looks like they
were expected to fail?)

This allows us to make this test faster and more reliable, as the
timeouts don't really seem to be adding anything
  • Loading branch information
addisonj authored and eolivelli committed Oct 8, 2021
1 parent 9293adf commit 0daae95
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
private PulsarClientImpl client;
private ConnectionPool connectionPool;
private ProxyService service;
private Authentication clientAuthentication;
AuthenticationDataSource authenticationData;
private State state;
private final Supplier<SslHandler> sslHandlerSupplier;
Expand Down Expand Up @@ -303,7 +302,6 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
try {
// init authn
this.clientConf = createClientConfiguration();
this.clientAuthentication = clientConf.getAuthentication();
int protocolVersion = getProtocolVersionToAdvertise(connect);

// authn not enabled, complete
Expand Down Expand Up @@ -427,10 +425,7 @@ ClientConfigurationData createClientConfiguration() throws UnsupportedAuthentica
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl(service.getServiceUrl());
ProxyConfiguration proxyConfig = service.getConfiguration();
if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters()));
}
clientConf.setAuthentication(this.getClientAuthentication());
if (proxyConfig.isTlsEnabledWithBroker()) {
clientConf.setUseTls(true);
if (proxyConfig.isBrokerClientTlsEnabledWithKeyStore()) {
Expand Down Expand Up @@ -463,7 +458,7 @@ long newRequestId() {
}

public Authentication getClientAuthentication() {
return clientAuthentication;
return service.getProxyClientAuthenticationPlugin();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
Expand All @@ -69,6 +72,7 @@
public class ProxyService implements Closeable {

private final ProxyConfiguration proxyConfig;
private final Authentication proxyClientAuthentication;
private final Timer timer;
private String serviceUrl;
private String serviceUrlTls;
Expand Down Expand Up @@ -163,6 +167,12 @@ public ProxyService(ProxyConfiguration proxyConfig,
});
}, 60, TimeUnit.SECONDS);
this.proxyAdditionalServlets = AdditionalServlets.load(proxyConfig);
if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters());
} else {
proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
}
}

public void start() throws Exception {
Expand Down Expand Up @@ -367,5 +377,9 @@ public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataS
proxyConfig.getZookeeperSessionTimeoutMs());
}

public Authentication getProxyClientAuthenticationPlugin() {
return this.proxyClientAuthentication;
}

private static final Logger LOG = LoggerFactory.getLogger(ProxyService.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.common.policies.data.AuthAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -144,9 +145,11 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
}

JsonObject element = JsonParser.parseString(commandData).getAsJsonObject();
log.info("Have log of {}", element);
long expiryTimeInMillis = Long.parseLong(element.get("expiryTime").getAsString());
long currentTimeInMillis = System.currentTimeMillis();
if (expiryTimeInMillis < currentTimeInMillis) {
log.warn("Auth failed due to timeout");
throw new AuthenticationException("Authentication data has been expired");
}
return element.get("entityType").getAsString();
Expand Down Expand Up @@ -198,10 +201,10 @@ void testAuthentication() throws Exception {
String namespaceName = "my-property/my-ns";
String topicName = "persistent://my-property/my-ns/my-topic1";
String subscriptionName = "my-subscriber-name";
// expires after 6 seconds
String clientAuthParams = "entityType:client,expiryTime:" + (System.currentTimeMillis() + 6 * 1000);
// expires after 3 seconds
String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 3 * 1000);
// expires after 60 seconds
String clientAuthParams = "entityType:client,expiryTime:" + (System.currentTimeMillis() + 60 * 1000);
// expires after 60 seconds
String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 60 * 1000);

admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
Expand Down Expand Up @@ -230,16 +233,18 @@ void testAuthentication() throws Exception {
proxyService.start();
final String proxyServiceUrl = proxyService.getServiceUrl();

// Step 3: Pass correct client params
// Step 3: Pass correct client params and use multiple connections
@Cleanup
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 1);
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 3);
proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
// Sleep for 4 seconds - wait for proxy auth params to expire
Thread.sleep(4 * 1000);
proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
// Sleep for 3 seconds - wait for client auth parans to expire
Thread.sleep(3 * 1000);
proxyClient.newProducer(Schema.BYTES).topic(topicName).create();

// Step 4: Ensure that all client contexts share the same auth provider
Assert.assertTrue(proxyService.getClientCnxs().size() >= 3, "expect at least 3 clients");
proxyService.getClientCnxs().stream().forEach((cnx) -> {
Assert.assertSame(cnx.authenticationProvider, proxyService.getAuthenticationService().getAuthenticationProvider("BasicAuthentication"));
});
}

private void updateAdminClient() throws PulsarClientException {
Expand Down

0 comments on commit 0daae95

Please sign in to comment.