From 5b90097c4fe4ca39638e8604d78fdcde4890142f Mon Sep 17 00:00:00 2001 From: Guillaume Lecroc Date: Tue, 4 Nov 2025 10:20:43 +0100 Subject: [PATCH 01/12] configure oauth2 connectTimeout, readTimeout and trustCertsFilePath --- .../auth/oauth2/ClientCredentialsFlow.java | 66 +++++++-- .../client/impl/auth/oauth2/ConfigUtils.java | 138 ++++++++++++++++++ .../client/impl/auth/oauth2/FlowBase.java | 7 +- .../protocol/DefaultMetadataResolver.java | 54 +++---- .../auth/oauth2/protocol/TokenClient.java | 36 +---- 5 files changed, 220 insertions(+), 81 deletions(-) create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ConfigUtils.java diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java index ef10f1afdb63b..3084c54ed39e0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java @@ -18,24 +18,25 @@ */ package org.apache.pulsar.client.impl.auth.oauth2; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; +import io.netty.handler.ssl.SslContextBuilder; +import lombok.Builder; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.auth.oauth2.protocol.*; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.apache.pulsar.PulsarVersion; + +import javax.net.ssl.SSLException; +import java.io.*; import java.net.URISyntaxException; import java.net.URL; import java.net.URLConnection; import java.nio.charset.StandardCharsets; import java.util.Map; -import lombok.Builder; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.io.IOUtils; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchangeRequest; -import org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchanger; -import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenClient; -import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenExchangeException; -import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; /** * Implementation of OAuth 2.0 Client Credentials flow. @@ -48,6 +49,12 @@ class ClientCredentialsFlow extends FlowBase { public static final String CONFIG_PARAM_AUDIENCE = "audience"; public static final String CONFIG_PARAM_KEY_FILE = "privateKey"; public static final String CONFIG_PARAM_SCOPE = "scope"; + public static final String CONFIG_PARAM_CONNECT_TIMEOUT = "connectTimeout"; + public static final String CONFIG_PARAM_READ_TIMEOUT = "readTimeout"; + public static final String CONFIG_PARAM_TRUST_CERTS_FILE_PATH = "trustCertsFilePath"; + + private static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10; + private static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30; private static final long serialVersionUID = 1L; @@ -60,8 +67,8 @@ class ClientCredentialsFlow extends FlowBase { private boolean initialized = false; @Builder - public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope) { - super(issuerUrl); + public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope, AsyncHttpClient httpClient) { + super(issuerUrl, httpClient); this.audience = audience; this.privateKey = privateKey; this.scope = scope; @@ -73,7 +80,7 @@ public void initialize() throws PulsarClientException { assert this.metadata != null; URL tokenUrl = this.metadata.getTokenEndpoint(); - this.exchanger = new TokenClient(tokenUrl); + this.exchanger = new TokenClient(tokenUrl, httpClient); initialized = true; } @@ -109,6 +116,9 @@ public TokenResult authenticate() throws PulsarClientException { @Override public void close() throws Exception { + if(httpClient != null) { + httpClient.close(); + } if (exchanger != null) { exchanger.close(); } @@ -125,11 +135,35 @@ public static ClientCredentialsFlow fromParameters(Map params) { // These are optional parameters, so we only perform a get String scope = params.get(CONFIG_PARAM_SCOPE); String audience = params.get(CONFIG_PARAM_AUDIENCE); + + int connectTimeout = ConfigUtils.getConfigValueAsInt(params, CONFIG_PARAM_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000); + int readTimeout = ConfigUtils.getConfigValueAsInt(params, CONFIG_PARAM_READ_TIMEOUT, DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000); + String trustCertsFilePath = params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH); + + DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); + confBuilder.setCookieStore(null); + confBuilder.setUseProxyProperties(true); + confBuilder.setFollowRedirect(true); + confBuilder.setConnectTimeout(connectTimeout); + confBuilder.setReadTimeout(readTimeout); + confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); + if(StringUtils.isNotBlank(trustCertsFilePath)) { + try { + confBuilder.setSslContext(SslContextBuilder.forClient() + .trustManager(new File(trustCertsFilePath)) + .build()); + } catch (SSLException e) { + log.error("Could not set trustCertsFilePath", e); + } + } + AsyncHttpClient httpClient = new DefaultAsyncHttpClient(confBuilder.build()); + return ClientCredentialsFlow.builder() .issuerUrl(issuerUrl) .audience(audience) .privateKey(privateKeyUrl) .scope(scope) + .httpClient(httpClient) .build(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ConfigUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ConfigUtils.java new file mode 100644 index 0000000000000..8f550e902eb01 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ConfigUtils.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +class ConfigUtils { + private static final Logger log = LoggerFactory.getLogger(ConfigUtils.class); + + /** + * Get configured property as a string. If not configured, return null. + * @param params - the parameters + * @param configProp - the property to get + * @return a string from the conf or null, if the configuration property was not set + */ + static String getConfigValueAsString(Map params, + String configProp) throws IllegalArgumentException { + String value = params.get(configProp); + log.info("Configuration for [{}] is [{}]", configProp, value); + return value; + } + + /** + * Get configured property as a string. If not configured, return the default value. + * @param params - the parameters + * @param configProp - the property to get + * @param defaultValue - the value to use if the configuration value is not set + * @return a string from the conf or the default value + */ + static String getConfigValueAsString(Map params, String configProp, + String defaultValue) throws IllegalArgumentException { + String value = params.get(configProp); + if (value == null) { + value = defaultValue; + } + log.info("Configuration for [{}] is [{}]", configProp, value); + return value; + } + + /** + * Get configured property as a set. Split using a comma delimiter and remove any extra whitespace surrounding + * the commas. If not configured, return the empty set. + * + * @param params - the map of configuration properties + * @param configProp - the property (key) to get + * @return a set of strings from the conf + */ + static Set getConfigValueAsSet(Map params, String configProp) { + String value = params.get(configProp); + if (StringUtils.isBlank(value)) { + log.info("Configuration for [{}] is the empty set.", configProp); + return Collections.emptySet(); + } + Set set = Arrays.stream(value.trim().split("\\s*,\\s*")).collect(Collectors.toSet()); + log.info("Configuration for [{}] is [{}].", configProp, String.join(", ", set)); + return set; + } + + /** + * Get configured property as an integer. If the value is not a valid long or the + * key is not present in the conf, the default value will be used. + * + * @param params - the parameters + * @param configProp - the property (key) to get + * @param defaultValue - the value to use if the property is missing from the conf + * @return a long + */ + static int getConfigValueAsInt(Map params, String configProp, int defaultValue) { + String value = params.get(configProp); + if (StringUtils.isNotBlank(value)) { + try { + return Integer.parseInt((String) value); + } catch (NumberFormatException numberFormatException) { + log.error("Expected configuration for [{}] to be an integer, but got [{}]. Using default value: [{}]", + configProp, value, defaultValue, numberFormatException); + return defaultValue; + } + } else { + log.info("Configuration for [{}] is using the default value: [{}]", configProp, defaultValue); + return defaultValue; + } + } + + /** + * Get configured property as an boolean. If the key is not present in the conf, + * return the default value. If key is present the value is not a valid boolean, the result will be false. + * + * @param params - the map of configuration properties + * @param configProp - the property (key) to get + * @param defaultValue - the value to use if the property is missing from the conf + * @return a boolean + */ + static boolean getConfigValueAsBoolean(Map params, String configProp, boolean defaultValue) { + String value = params.get(configProp); + if (StringUtils.isNotBlank(value)) { + boolean result = Boolean.parseBoolean((String) value); + log.info("Configuration for [{}] is [{}]", configProp, result); + return result; + } else { + log.info("Configuration for [{}] is using the default value: [{}]", configProp, defaultValue); + return defaultValue; + } + } + + static File getConfigValueAsFile(Map params, String configProp) { + String value = params.get(configProp); + if (StringUtils.isNotBlank(value)) { + return new File(value); + } + return null; + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java index 125a880086297..e02cfef82099a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java @@ -28,6 +28,7 @@ import org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver; import org.apache.pulsar.client.impl.auth.oauth2.protocol.Metadata; import org.apache.pulsar.client.impl.auth.oauth2.protocol.MetadataResolver; +import org.asynchttpclient.AsyncHttpClient; /** * An abstract OAuth 2.0 authorization flow. @@ -38,11 +39,13 @@ abstract class FlowBase implements Flow { private static final long serialVersionUID = 1L; protected final URL issuerUrl; + protected final AsyncHttpClient httpClient; protected transient Metadata metadata; - protected FlowBase(URL issuerUrl) { + protected FlowBase(URL issuerUrl, AsyncHttpClient httpClient) { this.issuerUrl = issuerUrl; + this.httpClient = httpClient; } public void initialize() throws PulsarClientException { @@ -55,7 +58,7 @@ public void initialize() throws PulsarClientException { } protected MetadataResolver createMetadataResolver() { - return DefaultMetadataResolver.fromIssuerUrl(issuerUrl); + return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, httpClient); } static String parseParameterString(Map params, String name) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java index be636145cb24b..cea36bfa456bb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java @@ -19,44 +19,32 @@ package org.apache.pulsar.client.impl.auth.oauth2.protocol; import com.fasterxml.jackson.databind.ObjectReader; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.Response; + import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; -import java.net.URLConnection; -import java.time.Duration; -import org.apache.pulsar.common.util.ObjectMapperFactory; +import java.util.concurrent.ExecutionException; /** * Resolves OAuth 2.0 authorization server metadata as described in RFC 8414. */ public class DefaultMetadataResolver implements MetadataResolver { - protected static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10; - protected static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30; - private final URL metadataUrl; private final ObjectReader objectReader; - private Duration connectTimeout; - private Duration readTimeout; + private final AsyncHttpClient httpClient; - public DefaultMetadataResolver(URL metadataUrl) { + public DefaultMetadataResolver(URL metadataUrl, AsyncHttpClient httpClient) { this.metadataUrl = metadataUrl; this.objectReader = ObjectMapperFactory.getMapper().reader().forType(Metadata.class); - // set a default timeout to ensure that this doesn't block - this.connectTimeout = Duration.ofSeconds(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS); - this.readTimeout = Duration.ofSeconds(DEFAULT_READ_TIMEOUT_IN_SECONDS); - } - - public DefaultMetadataResolver withConnectTimeout(Duration connectTimeout) { - this.connectTimeout = connectTimeout; - return this; - } - - public DefaultMetadataResolver withReadTimeout(Duration readTimeout) { - this.readTimeout = readTimeout; - return this; + this.httpClient = httpClient; } /** @@ -65,23 +53,21 @@ public DefaultMetadataResolver withReadTimeout(Duration readTimeout) { * @throws IOException if the metadata could not be resolved. */ public Metadata resolve() throws IOException { + try { - URLConnection c = this.metadataUrl.openConnection(); - if (connectTimeout != null) { - c.setConnectTimeout((int) connectTimeout.toMillis()); - } - if (readTimeout != null) { - c.setReadTimeout((int) readTimeout.toMillis()); - } - c.setRequestProperty("Accept", "application/json"); + Response response = httpClient.prepareGet(metadataUrl.toString()) + .addHeader(HttpHeaderNames.ACCEPT, HttpHeaderValues.APPLICATION_JSON) + .execute() + .toCompletableFuture() + .get(); Metadata metadata; - try (InputStream inputStream = c.getInputStream()) { + try (InputStream inputStream = response.getResponseBodyAsStream()) { metadata = this.objectReader.readValue(inputStream); } return metadata; - } catch (IOException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new IOException("Cannot obtain authorization metadata from " + metadataUrl.toString(), e); } } @@ -91,8 +77,8 @@ public Metadata resolve() throws IOException { * @param issuerUrl The authorization server's issuer identifier * @return a resolver */ - public static DefaultMetadataResolver fromIssuerUrl(URL issuerUrl) { - return new DefaultMetadataResolver(getWellKnownMetadataUrl(issuerUrl)); + public static DefaultMetadataResolver fromIssuerUrl(URL issuerUrl, AsyncHttpClient httpClient) { + return new DefaultMetadataResolver(getWellKnownMetadataUrl(issuerUrl), httpClient); } /** diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java index f4e4c770e67fc..0b99b596566aa 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java @@ -18,6 +18,11 @@ */ package org.apache.pulsar.client.impl.auth.oauth2.protocol; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.Response; + import java.io.IOException; import java.net.URL; import java.net.URLEncoder; @@ -26,44 +31,17 @@ import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.PulsarVersion; -import org.apache.pulsar.common.util.ObjectMapperFactory; -import org.asynchttpclient.AsyncHttpClient; -import org.asynchttpclient.AsyncHttpClientConfig; -import org.asynchttpclient.DefaultAsyncHttpClient; -import org.asynchttpclient.DefaultAsyncHttpClientConfig; -import org.asynchttpclient.Response; /** * A client for an OAuth 2.0 token endpoint. */ public class TokenClient implements ClientCredentialsExchanger { - protected static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10; - protected static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30; - private final URL tokenUrl; private final AsyncHttpClient httpClient; - public TokenClient(URL tokenUrl) { - this(tokenUrl, null); - } - - TokenClient(URL tokenUrl, AsyncHttpClient httpClient) { - if (httpClient == null) { - DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); - confBuilder.setCookieStore(null); - confBuilder.setUseProxyProperties(true); - confBuilder.setFollowRedirect(true); - confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000); - confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000); - confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); - AsyncHttpClientConfig config = confBuilder.build(); - this.httpClient = new DefaultAsyncHttpClient(config); - } else { - this.httpClient = httpClient; - } + public TokenClient(URL tokenUrl, AsyncHttpClient httpClient) { + this.httpClient = httpClient; this.tokenUrl = tokenUrl; } From a650d81389cf4752cb44ea492d3b48c27de866cb Mon Sep 17 00:00:00 2001 From: Guillaume Lecroc Date: Tue, 4 Nov 2025 11:41:19 +0100 Subject: [PATCH 02/12] fix checkstyle --- .../auth/oauth2/ClientCredentialsFlow.java | 140 ++++++++++-------- .../client/impl/auth/oauth2/ConfigUtils.java | 96 +----------- .../protocol/DefaultMetadataResolver.java | 60 ++++---- .../auth/oauth2/protocol/TokenClient.java | 38 ++--- 4 files changed, 132 insertions(+), 202 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java index 3084c54ed39e0..249a8e23dfab3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java @@ -19,24 +19,31 @@ package org.apache.pulsar.client.impl.auth.oauth2; import io.netty.handler.ssl.SslContextBuilder; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLConnection; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import javax.net.ssl.SSLException; import lombok.Builder; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.auth.oauth2.protocol.*; +import org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchangeRequest; +import org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchanger; +import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenClient; +import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenExchangeException; +import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.DefaultAsyncHttpClient; import org.asynchttpclient.DefaultAsyncHttpClientConfig; -import org.apache.pulsar.PulsarVersion; - -import javax.net.ssl.SSLException; -import java.io.*; -import java.net.URISyntaxException; -import java.net.URL; -import java.net.URLConnection; -import java.nio.charset.StandardCharsets; -import java.util.Map; /** * Implementation of OAuth 2.0 Client Credentials flow. @@ -67,65 +74,17 @@ class ClientCredentialsFlow extends FlowBase { private boolean initialized = false; @Builder - public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope, AsyncHttpClient httpClient) { + public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope, + AsyncHttpClient httpClient) { super(issuerUrl, httpClient); this.audience = audience; this.privateKey = privateKey; this.scope = scope; } - @Override - public void initialize() throws PulsarClientException { - super.initialize(); - assert this.metadata != null; - - URL tokenUrl = this.metadata.getTokenEndpoint(); - this.exchanger = new TokenClient(tokenUrl, httpClient); - initialized = true; - } - - public TokenResult authenticate() throws PulsarClientException { - // read the private key from storage - KeyFile keyFile; - try { - keyFile = loadPrivateKey(this.privateKey); - } catch (IOException e) { - throw new PulsarClientException.AuthenticationException("Unable to read private key: " + e.getMessage()); - } - - // request an access token using client credentials - ClientCredentialsExchangeRequest req = ClientCredentialsExchangeRequest.builder() - .clientId(keyFile.getClientId()) - .clientSecret(keyFile.getClientSecret()) - .audience(this.audience) - .scope(this.scope) - .build(); - TokenResult tr; - if (!initialized) { - initialize(); - } - try { - tr = this.exchanger.exchangeClientCredentials(req); - } catch (TokenExchangeException | IOException e) { - throw new PulsarClientException.AuthenticationException("Unable to obtain an access token: " - + e.getMessage()); - } - - return tr; - } - - @Override - public void close() throws Exception { - if(httpClient != null) { - httpClient.close(); - } - if (exchanger != null) { - exchanger.close(); - } - } - /** * Constructs a {@link ClientCredentialsFlow} from configuration parameters. + * * @param params * @return */ @@ -136,8 +95,10 @@ public static ClientCredentialsFlow fromParameters(Map params) { String scope = params.get(CONFIG_PARAM_SCOPE); String audience = params.get(CONFIG_PARAM_AUDIENCE); - int connectTimeout = ConfigUtils.getConfigValueAsInt(params, CONFIG_PARAM_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000); - int readTimeout = ConfigUtils.getConfigValueAsInt(params, CONFIG_PARAM_READ_TIMEOUT, DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000); + int connectTimeout = ConfigUtils.getConfigValueAsInt(params, CONFIG_PARAM_CONNECT_TIMEOUT, + DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000); + int readTimeout = ConfigUtils.getConfigValueAsInt(params, CONFIG_PARAM_READ_TIMEOUT, + DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000); String trustCertsFilePath = params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH); DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); @@ -147,7 +108,7 @@ public static ClientCredentialsFlow fromParameters(Map params) { confBuilder.setConnectTimeout(connectTimeout); confBuilder.setReadTimeout(readTimeout); confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); - if(StringUtils.isNotBlank(trustCertsFilePath)) { + if (StringUtils.isNotBlank(trustCertsFilePath)) { try { confBuilder.setSslContext(SslContextBuilder.forClient() .trustManager(new File(trustCertsFilePath)) @@ -169,6 +130,7 @@ public static ClientCredentialsFlow fromParameters(Map params) { /** * Loads the private key from the given URL. + * * @param privateKeyURL * @return * @throws IOException @@ -196,4 +158,54 @@ private static KeyFile loadPrivateKey(String privateKeyURL) throws IOException { throw new IOException("Invalid privateKey format", e); } } + + @Override + public void initialize() throws PulsarClientException { + super.initialize(); + assert this.metadata != null; + + URL tokenUrl = this.metadata.getTokenEndpoint(); + this.exchanger = new TokenClient(tokenUrl, httpClient); + initialized = true; + } + + public TokenResult authenticate() throws PulsarClientException { + // read the private key from storage + KeyFile keyFile; + try { + keyFile = loadPrivateKey(this.privateKey); + } catch (IOException e) { + throw new PulsarClientException.AuthenticationException("Unable to read private key: " + e.getMessage()); + } + + // request an access token using client credentials + ClientCredentialsExchangeRequest req = ClientCredentialsExchangeRequest.builder() + .clientId(keyFile.getClientId()) + .clientSecret(keyFile.getClientSecret()) + .audience(this.audience) + .scope(this.scope) + .build(); + TokenResult tr; + if (!initialized) { + initialize(); + } + try { + tr = this.exchanger.exchangeClientCredentials(req); + } catch (TokenExchangeException | IOException e) { + throw new PulsarClientException.AuthenticationException("Unable to obtain an access token: " + + e.getMessage()); + } + + return tr; + } + + @Override + public void close() throws Exception { + if (httpClient != null) { + httpClient.close(); + } + if (exchanger != null) { + exchanger.close(); + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ConfigUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ConfigUtils.java index 8f550e902eb01..c13e430c6a711 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ConfigUtils.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ConfigUtils.java @@ -18,83 +18,28 @@ */ package org.apache.pulsar.client.impl.auth.oauth2; +import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.util.Arrays; -import java.util.Collections; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - class ConfigUtils { private static final Logger log = LoggerFactory.getLogger(ConfigUtils.class); /** - * Get configured property as a string. If not configured, return null. - * @param params - the parameters - * @param configProp - the property to get - * @return a string from the conf or null, if the configuration property was not set - */ - static String getConfigValueAsString(Map params, - String configProp) throws IllegalArgumentException { - String value = params.get(configProp); - log.info("Configuration for [{}] is [{}]", configProp, value); - return value; - } - - /** - * Get configured property as a string. If not configured, return the default value. - * @param params - the parameters - * @param configProp - the property to get - * @param defaultValue - the value to use if the configuration value is not set - * @return a string from the conf or the default value - */ - static String getConfigValueAsString(Map params, String configProp, - String defaultValue) throws IllegalArgumentException { - String value = params.get(configProp); - if (value == null) { - value = defaultValue; - } - log.info("Configuration for [{}] is [{}]", configProp, value); - return value; - } - - /** - * Get configured property as a set. Split using a comma delimiter and remove any extra whitespace surrounding - * the commas. If not configured, return the empty set. - * - * @param params - the map of configuration properties - * @param configProp - the property (key) to get - * @return a set of strings from the conf - */ - static Set getConfigValueAsSet(Map params, String configProp) { - String value = params.get(configProp); - if (StringUtils.isBlank(value)) { - log.info("Configuration for [{}] is the empty set.", configProp); - return Collections.emptySet(); - } - Set set = Arrays.stream(value.trim().split("\\s*,\\s*")).collect(Collectors.toSet()); - log.info("Configuration for [{}] is [{}].", configProp, String.join(", ", set)); - return set; - } - - /** - * Get configured property as an integer. If the value is not a valid long or the + * Get configured property as an integer. If the value is not a valid integer or the * key is not present in the conf, the default value will be used. * - * @param params - the parameters - * @param configProp - the property (key) to get + * @param params - the parameters + * @param configProp - the property (key) to get * @param defaultValue - the value to use if the property is missing from the conf - * @return a long + * @return an integer */ static int getConfigValueAsInt(Map params, String configProp, int defaultValue) { String value = params.get(configProp); if (StringUtils.isNotBlank(value)) { try { - return Integer.parseInt((String) value); + return Integer.parseInt(value); } catch (NumberFormatException numberFormatException) { log.error("Expected configuration for [{}] to be an integer, but got [{}]. Using default value: [{}]", configProp, value, defaultValue, numberFormatException); @@ -106,33 +51,4 @@ static int getConfigValueAsInt(Map params, String configProp, in } } - /** - * Get configured property as an boolean. If the key is not present in the conf, - * return the default value. If key is present the value is not a valid boolean, the result will be false. - * - * @param params - the map of configuration properties - * @param configProp - the property (key) to get - * @param defaultValue - the value to use if the property is missing from the conf - * @return a boolean - */ - static boolean getConfigValueAsBoolean(Map params, String configProp, boolean defaultValue) { - String value = params.get(configProp); - if (StringUtils.isNotBlank(value)) { - boolean result = Boolean.parseBoolean((String) value); - log.info("Configuration for [{}] is [{}]", configProp, result); - return result; - } else { - log.info("Configuration for [{}] is using the default value: [{}]", configProp, defaultValue); - return defaultValue; - } - } - - static File getConfigValueAsFile(Map params, String configProp) { - String value = params.get(configProp); - if (StringUtils.isNotBlank(value)) { - return new File(value); - } - return null; - } - } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java index cea36bfa456bb..d82f1319b5a40 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java @@ -21,16 +21,15 @@ import com.fasterxml.jackson.databind.ObjectReader; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; -import org.apache.pulsar.common.util.ObjectMapperFactory; -import org.asynchttpclient.AsyncHttpClient; -import org.asynchttpclient.Response; - import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; import java.util.concurrent.ExecutionException; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.Response; /** * Resolves OAuth 2.0 authorization server metadata as described in RFC 8414. @@ -47,8 +46,35 @@ public DefaultMetadataResolver(URL metadataUrl, AsyncHttpClient httpClient) { this.httpClient = httpClient; } + /** + * Gets a well-known metadata URL for the given OAuth issuer URL. + * + * @param issuerUrl The authorization server's issuer identifier + * @return a resolver + */ + public static DefaultMetadataResolver fromIssuerUrl(URL issuerUrl, AsyncHttpClient httpClient) { + return new DefaultMetadataResolver(getWellKnownMetadataUrl(issuerUrl), httpClient); + } + + /** + * Gets a well-known metadata URL for the given OAuth issuer URL. + * + * @param issuerUrl The authorization server's issuer identifier + * @return a URL + * @see + * OAuth Discovery: Obtaining Authorization Server Metadata + */ + public static URL getWellKnownMetadataUrl(URL issuerUrl) { + try { + return URI.create(issuerUrl.toExternalForm() + "/.well-known/openid-configuration").normalize().toURL(); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + } + /** * Resolves the authorization metadata. + * * @return metadata * @throws IOException if the metadata could not be resolved. */ @@ -68,31 +94,7 @@ public Metadata resolve() throws IOException { return metadata; } catch (IOException | InterruptedException | ExecutionException e) { - throw new IOException("Cannot obtain authorization metadata from " + metadataUrl.toString(), e); - } - } - - /** - * Gets a well-known metadata URL for the given OAuth issuer URL. - * @param issuerUrl The authorization server's issuer identifier - * @return a resolver - */ - public static DefaultMetadataResolver fromIssuerUrl(URL issuerUrl, AsyncHttpClient httpClient) { - return new DefaultMetadataResolver(getWellKnownMetadataUrl(issuerUrl), httpClient); - } - - /** - * Gets a well-known metadata URL for the given OAuth issuer URL. - * @see - * OAuth Discovery: Obtaining Authorization Server Metadata - * @param issuerUrl The authorization server's issuer identifier - * @return a URL - */ - public static URL getWellKnownMetadataUrl(URL issuerUrl) { - try { - return URI.create(issuerUrl.toExternalForm() + "/.well-known/openid-configuration").normalize().toURL(); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e); + throw new IOException("Cannot obtain authorization metadata from " + metadataUrl, e); } } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java index 0b99b596566aa..6c47219759bb3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java @@ -18,11 +18,6 @@ */ package org.apache.pulsar.client.impl.auth.oauth2.protocol; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.common.util.ObjectMapperFactory; -import org.asynchttpclient.AsyncHttpClient; -import org.asynchttpclient.Response; - import java.io.IOException; import java.net.URL; import java.net.URLEncoder; @@ -31,6 +26,10 @@ import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.Response; /** * A client for an OAuth 2.0 token endpoint. @@ -52,6 +51,7 @@ public void close() throws Exception { /** * Constructing http request parameters. + * * @param req object with relevant request parameters * @return Generate the final request body from a map. */ @@ -75,6 +75,7 @@ String buildClientCredentialsBody(ClientCredentialsExchangeRequest req) { /** * Performs a token exchange using client credentials. + * * @param req the client credentials request details. * @return a token result * @throws TokenExchangeException @@ -93,23 +94,22 @@ public TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest re .get(); switch (res.getStatusCode()) { - case 200: - return ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(), - TokenResult.class); - - case 400: // Bad request - case 401: // Unauthorized - throw new TokenExchangeException( - ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(), - TokenError.class)); - - default: - throw new IOException( - "Failed to perform HTTP request. res: " + res.getStatusCode() + " " + res.getStatusText()); + case 200: + return ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(), + TokenResult.class); + + case 400: // Bad request + case 401: // Unauthorized + throw new TokenExchangeException( + ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(), + TokenError.class)); + + default: + throw new IOException( + "Failed to perform HTTP request. res: " + res.getStatusCode() + " " + res.getStatusText()); } - } catch (InterruptedException | ExecutionException e1) { throw new IOException(e1); } From 77308f303f473094438cd9166703227dc04911de Mon Sep 17 00:00:00 2001 From: Guillaume Lecroc Date: Tue, 4 Nov 2025 13:59:33 +0100 Subject: [PATCH 03/12] fix for oauth2 authentication factory --- .../oauth2/AuthenticationFactoryOAuth2.java | 46 +++++++-- .../auth/oauth2/ClientCredentialsFlow.java | 93 ++++++++++++++----- 2 files changed, 105 insertions(+), 34 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java index c9abb3a3c0147..6f8f39cd2cb2a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java @@ -31,9 +31,9 @@ public final class AuthenticationFactoryOAuth2 { /** * Authenticate with client credentials. * - * @param issuerUrl the issuer URL + * @param issuerUrl the issuer URL * @param credentialsUrl the credentials URL - * @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0. + * @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0. * @return an Authentication object */ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience) { @@ -43,23 +43,51 @@ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl /** * Authenticate with client credentials. * - * @param issuerUrl the issuer URL + * @param issuerUrl the issuer URL * @param credentialsUrl the credentials URL - * @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0. - * @param scope An optional field. The value of the scope parameter is expressed as a list of space-delimited, - * case-sensitive strings. The strings are defined by the authorization server. - * If the value contains multiple space-delimited strings, their order does not matter, - * and each string adds an additional access range to the requested scope. - * From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2 + * @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0. + * @param scope An optional field. The value of the scope parameter is expressed as a list of + * space-delimited, + * case-sensitive strings. The strings are defined by the authorization server. + * If the value contains multiple space-delimited strings, their order does not matter, + * and each string adds an additional access range to the requested scope. + * From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2 * @return an Authentication object */ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope) { + return clientCredentials(issuerUrl, credentialsUrl, audience, scope, null, null, null); + } + + /** + * Authenticate with client credentials. + * + * @param issuerUrl the issuer URL + * @param credentialsUrl the credentials URL + * @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0. + * @param scope An optional field. The value of the scope parameter is expressed as a list of + * space-delimited, + * case-sensitive strings. The strings are defined by the authorization server. + * If the value contains multiple space-delimited strings, their order does not matter, + * and each string adds an additional access range to the requested scope. + * From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2 + * @param connectTimeout the connect timeout in milliseconds + * @param readTimeout the read timeout in milliseconds + * @param trustCertsFilePath the path to the file that contains trusted certificates + * @return an Authentication object + */ + public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope, + Integer connectTimeout, Integer readTimeout, + String trustCertsFilePath) { ClientCredentialsFlow flow = ClientCredentialsFlow.builder() .issuerUrl(issuerUrl) .privateKey(credentialsUrl.toExternalForm()) .audience(audience) .scope(scope) + .connectTimeout(connectTimeout) + .readTimeout(readTimeout) + .trustCertsFilePath(trustCertsFilePath) .build(); return new AuthenticationOAuth2(flow, Clock.systemDefaultZone()); } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java index 249a8e23dfab3..04e3016823318 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java @@ -75,38 +75,24 @@ class ClientCredentialsFlow extends FlowBase { @Builder public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope, - AsyncHttpClient httpClient) { - super(issuerUrl, httpClient); + Integer connectTimeout, Integer readTimeout, String trustCertsFilePath) { + super(issuerUrl, getHttpClient(connectTimeout, readTimeout, trustCertsFilePath)); this.audience = audience; this.privateKey = privateKey; this.scope = scope; } - /** - * Constructs a {@link ClientCredentialsFlow} from configuration parameters. - * - * @param params - * @return - */ - public static ClientCredentialsFlow fromParameters(Map params) { - URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL); - String privateKeyUrl = parseParameterString(params, CONFIG_PARAM_KEY_FILE); - // These are optional parameters, so we only perform a get - String scope = params.get(CONFIG_PARAM_SCOPE); - String audience = params.get(CONFIG_PARAM_AUDIENCE); - - int connectTimeout = ConfigUtils.getConfigValueAsInt(params, CONFIG_PARAM_CONNECT_TIMEOUT, - DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000); - int readTimeout = ConfigUtils.getConfigValueAsInt(params, CONFIG_PARAM_READ_TIMEOUT, - DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000); - String trustCertsFilePath = params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH); - + private static AsyncHttpClient getHttpClient(Integer connectTimeout, Integer readTimeout, + String trustCertsFilePath) { DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); confBuilder.setCookieStore(null); confBuilder.setUseProxyProperties(true); confBuilder.setFollowRedirect(true); - confBuilder.setConnectTimeout(connectTimeout); - confBuilder.setReadTimeout(readTimeout); + confBuilder.setConnectTimeout( + getConfigValueAsInt(CONFIG_PARAM_CONNECT_TIMEOUT, connectTimeout, + DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000)); + confBuilder.setReadTimeout( + getConfigValueAsInt(CONFIG_PARAM_READ_TIMEOUT, readTimeout, DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000)); confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); if (StringUtils.isNotBlank(trustCertsFilePath)) { try { @@ -117,17 +103,74 @@ public static ClientCredentialsFlow fromParameters(Map params) { log.error("Could not set trustCertsFilePath", e); } } - AsyncHttpClient httpClient = new DefaultAsyncHttpClient(confBuilder.build()); + return new DefaultAsyncHttpClient(confBuilder.build()); + } + + /** + * Constructs a {@link ClientCredentialsFlow} from configuration parameters. + * + * @param params + * @return + */ + public static ClientCredentialsFlow fromParameters(Map params) { + URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL); + String privateKeyUrl = parseParameterString(params, CONFIG_PARAM_KEY_FILE); + // These are optional parameters, so we only perform a get + String scope = params.get(CONFIG_PARAM_SCOPE); + String audience = params.get(CONFIG_PARAM_AUDIENCE); + Integer connectTimeout = getConfigValueAsInt(params, CONFIG_PARAM_CONNECT_TIMEOUT); + Integer readTimeout = getConfigValueAsInt(params, CONFIG_PARAM_READ_TIMEOUT); + String trustCertsFilePath = params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH); return ClientCredentialsFlow.builder() .issuerUrl(issuerUrl) .audience(audience) .privateKey(privateKeyUrl) .scope(scope) - .httpClient(httpClient) + .connectTimeout(connectTimeout) + .readTimeout(readTimeout) + .trustCertsFilePath(trustCertsFilePath) .build(); } + /** + * Utility method to get an integer from parameters. + * + * @param params the parameters + * @param key the key + * @return the integer value if exists, else null. + */ + private static Integer getConfigValueAsInt(Map params, String key) { + String value = params.get(key); + if (StringUtils.isNotBlank(value)) { + try { + return Integer.parseInt(value); + } catch (NumberFormatException numberFormatException) { + log.error("Expected configuration for [{}] to be an integer, but got [{}]", + key, value, numberFormatException); + } + } + return null; + } + + /** + * Utility method to get an integer, or the default value. + * + * @param key the key + * @param value the nullable value + * @param defaultValue the default value + * @return the value if exits, else the default value. + */ + private static int getConfigValueAsInt(String key, Integer value, int defaultValue) { + if (value == null) { + log.info("Configuration for [{}] is using the default value: [{}]", key, defaultValue); + return defaultValue; + } else { + log.info("Configuration for [{}] is [{}]", key, value); + return value; + } + } + /** * Loads the private key from the given URL. * From bd6c84aa4f344c8d94d23e1c7af142d21a5efe49 Mon Sep 17 00:00:00 2001 From: Guillaume Lecroc Date: Tue, 4 Nov 2025 14:15:53 +0100 Subject: [PATCH 04/12] delete old class --- .../client/impl/auth/oauth2/ConfigUtils.java | 54 ------------------- 1 file changed, 54 deletions(-) delete mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ConfigUtils.java diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ConfigUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ConfigUtils.java deleted file mode 100644 index c13e430c6a711..0000000000000 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ConfigUtils.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.impl.auth.oauth2; - -import java.util.Map; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class ConfigUtils { - private static final Logger log = LoggerFactory.getLogger(ConfigUtils.class); - - /** - * Get configured property as an integer. If the value is not a valid integer or the - * key is not present in the conf, the default value will be used. - * - * @param params - the parameters - * @param configProp - the property (key) to get - * @param defaultValue - the value to use if the property is missing from the conf - * @return an integer - */ - static int getConfigValueAsInt(Map params, String configProp, int defaultValue) { - String value = params.get(configProp); - if (StringUtils.isNotBlank(value)) { - try { - return Integer.parseInt(value); - } catch (NumberFormatException numberFormatException) { - log.error("Expected configuration for [{}] to be an integer, but got [{}]. Using default value: [{}]", - configProp, value, defaultValue, numberFormatException); - return defaultValue; - } - } else { - log.info("Configuration for [{}] is using the default value: [{}]", configProp, defaultValue); - return defaultValue; - } - } - -} From f0a71f61aa7a567f87b68842bc8156920bae206d Mon Sep 17 00:00:00 2001 From: Guillaume Lecroc Date: Tue, 4 Nov 2025 16:51:22 +0100 Subject: [PATCH 05/12] add builder --- .../oauth2/AuthenticationFactoryOAuth2.java | 9 +++-- .../AuthenticationFactoryOAuth2Test.java | 40 +++++++++++++++++++ 2 files changed, 46 insertions(+), 3 deletions(-) create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java index 6f8f39cd2cb2a..d8f92a38f7cc1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java @@ -20,6 +20,7 @@ import java.net.URL; import java.time.Clock; +import lombok.Builder; import org.apache.pulsar.client.api.Authentication; /** @@ -75,9 +76,10 @@ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl * @param trustCertsFilePath the path to the file that contains trusted certificates * @return an Authentication object */ - public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope, - Integer connectTimeout, Integer readTimeout, - String trustCertsFilePath) { + @Builder(builderMethodName = "clientCredentialsBuilder") + private static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope, + Integer connectTimeout, Integer readTimeout, + String trustCertsFilePath) { ClientCredentialsFlow flow = ClientCredentialsFlow.builder() .issuerUrl(issuerUrl) .privateKey(credentialsUrl.toExternalForm()) @@ -90,4 +92,5 @@ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl return new AuthenticationOAuth2(flow, Clock.systemDefaultZone()); } + } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java new file mode 100644 index 0000000000000..0a5d61163bc7c --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java @@ -0,0 +1,40 @@ +package org.apache.pulsar.client.impl.auth.oauth2; + +import static org.testng.Assert.assertTrue; +import java.io.IOException; +import java.net.URL; +import org.apache.pulsar.client.api.Authentication; +import org.testng.annotations.Test; + +public class AuthenticationFactoryOAuth2Test { + + @Test + public void testBuilder() throws IOException { + URL issuerUrl = new URL("http://localhost"); + URL credentialsUrl = new URL("http://localhost"); + String audience = "audience"; + String scope = "scope"; + Integer connectTimeout = 10001; + Integer readTimeout = 30001; + String trustCertsFilePath = null; + try (Authentication authentication = + AuthenticationFactoryOAuth2.clientCredentialsBuilder().issuerUrl(issuerUrl) + .credentialsUrl(credentialsUrl).audience(audience).scope(scope) + .connectTimeout(connectTimeout).readTimeout(readTimeout) + .trustCertsFilePath(trustCertsFilePath).build()) { + assertTrue(authentication instanceof AuthenticationOAuth2); + } + } + + @Test + public void testClientCredentials() throws IOException { + URL issuerUrl = new URL("http://localhost"); + URL credentialsUrl = new URL("http://localhost"); + String audience = "audience"; + try (Authentication authentication = + AuthenticationFactoryOAuth2.clientCredentials(issuerUrl, credentialsUrl, audience)) { + assertTrue(authentication instanceof AuthenticationOAuth2); + } + } + +} From 214ccf662766676c7dc29356fbcb07f8a2345494 Mon Sep 17 00:00:00 2001 From: Guillaume Lecroc Date: Tue, 4 Nov 2025 17:31:31 +0100 Subject: [PATCH 06/12] move httpClient to flowBase to be reused by other impl --- .../auth/oauth2/ClientCredentialsFlow.java | 86 +------------------ .../client/impl/auth/oauth2/FlowBase.java | 69 ++++++++++++++- 2 files changed, 71 insertions(+), 84 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java index 04e3016823318..53b1dbee23562 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.client.impl.auth.oauth2; -import io.netty.handler.ssl.SslContextBuilder; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -29,21 +27,15 @@ import java.net.URLConnection; import java.nio.charset.StandardCharsets; import java.util.Map; -import javax.net.ssl.SSLException; import lombok.Builder; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchangeRequest; import org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchanger; import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenClient; import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenExchangeException; import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; -import org.asynchttpclient.AsyncHttpClient; -import org.asynchttpclient.DefaultAsyncHttpClient; -import org.asynchttpclient.DefaultAsyncHttpClientConfig; /** * Implementation of OAuth 2.0 Client Credentials flow. @@ -56,12 +48,6 @@ class ClientCredentialsFlow extends FlowBase { public static final String CONFIG_PARAM_AUDIENCE = "audience"; public static final String CONFIG_PARAM_KEY_FILE = "privateKey"; public static final String CONFIG_PARAM_SCOPE = "scope"; - public static final String CONFIG_PARAM_CONNECT_TIMEOUT = "connectTimeout"; - public static final String CONFIG_PARAM_READ_TIMEOUT = "readTimeout"; - public static final String CONFIG_PARAM_TRUST_CERTS_FILE_PATH = "trustCertsFilePath"; - - private static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10; - private static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30; private static final long serialVersionUID = 1L; @@ -76,36 +62,12 @@ class ClientCredentialsFlow extends FlowBase { @Builder public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope, Integer connectTimeout, Integer readTimeout, String trustCertsFilePath) { - super(issuerUrl, getHttpClient(connectTimeout, readTimeout, trustCertsFilePath)); + super(issuerUrl, connectTimeout, readTimeout, trustCertsFilePath); this.audience = audience; this.privateKey = privateKey; this.scope = scope; } - private static AsyncHttpClient getHttpClient(Integer connectTimeout, Integer readTimeout, - String trustCertsFilePath) { - DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); - confBuilder.setCookieStore(null); - confBuilder.setUseProxyProperties(true); - confBuilder.setFollowRedirect(true); - confBuilder.setConnectTimeout( - getConfigValueAsInt(CONFIG_PARAM_CONNECT_TIMEOUT, connectTimeout, - DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000)); - confBuilder.setReadTimeout( - getConfigValueAsInt(CONFIG_PARAM_READ_TIMEOUT, readTimeout, DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000)); - confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); - if (StringUtils.isNotBlank(trustCertsFilePath)) { - try { - confBuilder.setSslContext(SslContextBuilder.forClient() - .trustManager(new File(trustCertsFilePath)) - .build()); - } catch (SSLException e) { - log.error("Could not set trustCertsFilePath", e); - } - } - return new DefaultAsyncHttpClient(confBuilder.build()); - } - /** * Constructs a {@link ClientCredentialsFlow} from configuration parameters. * @@ -118,8 +80,8 @@ public static ClientCredentialsFlow fromParameters(Map params) { // These are optional parameters, so we only perform a get String scope = params.get(CONFIG_PARAM_SCOPE); String audience = params.get(CONFIG_PARAM_AUDIENCE); - Integer connectTimeout = getConfigValueAsInt(params, CONFIG_PARAM_CONNECT_TIMEOUT); - Integer readTimeout = getConfigValueAsInt(params, CONFIG_PARAM_READ_TIMEOUT); + Integer connectTimeout = parseParameterInt(params, CONFIG_PARAM_CONNECT_TIMEOUT); + Integer readTimeout = parseParameterInt(params, CONFIG_PARAM_READ_TIMEOUT); String trustCertsFilePath = params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH); return ClientCredentialsFlow.builder() @@ -133,44 +95,6 @@ public static ClientCredentialsFlow fromParameters(Map params) { .build(); } - /** - * Utility method to get an integer from parameters. - * - * @param params the parameters - * @param key the key - * @return the integer value if exists, else null. - */ - private static Integer getConfigValueAsInt(Map params, String key) { - String value = params.get(key); - if (StringUtils.isNotBlank(value)) { - try { - return Integer.parseInt(value); - } catch (NumberFormatException numberFormatException) { - log.error("Expected configuration for [{}] to be an integer, but got [{}]", - key, value, numberFormatException); - } - } - return null; - } - - /** - * Utility method to get an integer, or the default value. - * - * @param key the key - * @param value the nullable value - * @param defaultValue the default value - * @return the value if exits, else the default value. - */ - private static int getConfigValueAsInt(String key, Integer value, int defaultValue) { - if (value == null) { - log.info("Configuration for [{}] is using the default value: [{}]", key, defaultValue); - return defaultValue; - } else { - log.info("Configuration for [{}] is [{}]", key, value); - return value; - } - } - /** * Loads the private key from the given URL. * @@ -244,9 +168,7 @@ public TokenResult authenticate() throws PulsarClientException { @Override public void close() throws Exception { - if (httpClient != null) { - httpClient.close(); - } + super.close(); if (exchanger != null) { exchanger.close(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java index e02cfef82099a..7f8335a6676db 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java @@ -18,17 +18,23 @@ */ package org.apache.pulsar.client.impl.auth.oauth2; +import io.netty.handler.ssl.SslContextBuilder; +import java.io.File; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.util.Map; +import javax.net.ssl.SSLException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver; import org.apache.pulsar.client.impl.auth.oauth2.protocol.Metadata; import org.apache.pulsar.client.impl.auth.oauth2.protocol.MetadataResolver; import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; /** * An abstract OAuth 2.0 authorization flow. @@ -36,6 +42,13 @@ @Slf4j abstract class FlowBase implements Flow { + public static final String CONFIG_PARAM_CONNECT_TIMEOUT = "connectTimeout"; + public static final String CONFIG_PARAM_READ_TIMEOUT = "readTimeout"; + public static final String CONFIG_PARAM_TRUST_CERTS_FILE_PATH = "trustCertsFilePath"; + + protected static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10; + protected static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30; + private static final long serialVersionUID = 1L; protected final URL issuerUrl; @@ -43,9 +56,42 @@ abstract class FlowBase implements Flow { protected transient Metadata metadata; - protected FlowBase(URL issuerUrl, AsyncHttpClient httpClient) { + protected FlowBase(URL issuerUrl, Integer connectTimeout, Integer readTimeout, String trustCertsFilePath) { this.issuerUrl = issuerUrl; - this.httpClient = httpClient; + this.httpClient = defaultHttpClient(readTimeout, connectTimeout, trustCertsFilePath); + } + + private AsyncHttpClient defaultHttpClient(Integer readTimeout, Integer connectTimeout, String trustCertsFilePath) { + DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); + confBuilder.setCookieStore(null); + confBuilder.setUseProxyProperties(true); + confBuilder.setFollowRedirect(true); + confBuilder.setConnectTimeout( + getParameterInt(CONFIG_PARAM_CONNECT_TIMEOUT, connectTimeout, + DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000)); + confBuilder.setReadTimeout( + getParameterInt(CONFIG_PARAM_READ_TIMEOUT, readTimeout, DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000)); + confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); + if (StringUtils.isNotBlank(trustCertsFilePath)) { + try { + confBuilder.setSslContext(SslContextBuilder.forClient() + .trustManager(new File(trustCertsFilePath)) + .build()); + } catch (SSLException e) { + log.error("Could not set " + CONFIG_PARAM_TRUST_CERTS_FILE_PATH, e); + } + } + return new DefaultAsyncHttpClient(confBuilder.build()); + } + + private int getParameterInt(String name, Integer value, int defaultValue) { + if (value == null) { + log.info("Configuration for [{}] is using the default value: [{}]", name, defaultValue); + return defaultValue; + } else { + log.info("Configuration for [{}] is: [{}]", name, value); + return value; + } } public void initialize() throws PulsarClientException { @@ -80,4 +126,23 @@ static URL parseParameterUrl(Map params, String name) { throw new IllegalArgumentException("Malformed configuration parameter: " + name); } } + + static Integer parseParameterInt(Map params, String name) { + String value = params.get(name); + if (StringUtils.isNotBlank(value)) { + try { + return Integer.parseInt(value); + } catch (NumberFormatException numberFormatException) { + throw new IllegalArgumentException("Malformed configuration parameter: " + name); + } + } + return null; + } + + @Override + public void close() throws Exception { + if (httpClient != null) { + httpClient.close(); + } + } } From cb265ab7c3ab824f57da8247b8b0669f10e1d100 Mon Sep 17 00:00:00 2001 From: Guillaume Lecroc Date: Wed, 5 Nov 2025 08:25:41 +0100 Subject: [PATCH 07/12] fix builder --- .../oauth2/AuthenticationFactoryOAuth2.java | 103 ++++++++++++------ 1 file changed, 69 insertions(+), 34 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java index d8f92a38f7cc1..04a8e108a81d0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java @@ -20,7 +20,6 @@ import java.net.URL; import java.time.Clock; -import lombok.Builder; import org.apache.pulsar.client.api.Authentication; /** @@ -38,7 +37,8 @@ public final class AuthenticationFactoryOAuth2 { * @return an Authentication object */ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience) { - return clientCredentials(issuerUrl, credentialsUrl, audience, null); + return clientCredentialsBuilder().issuerUrl(issuerUrl).credentialsUrl(credentialsUrl).audience(audience) + .build(); } /** @@ -56,40 +56,75 @@ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl * @return an Authentication object */ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope) { - return clientCredentials(issuerUrl, credentialsUrl, audience, scope, null, null, null); + return clientCredentialsBuilder().issuerUrl(issuerUrl).credentialsUrl(credentialsUrl).audience(audience) + .scope(scope).build(); } - /** - * Authenticate with client credentials. - * - * @param issuerUrl the issuer URL - * @param credentialsUrl the credentials URL - * @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0. - * @param scope An optional field. The value of the scope parameter is expressed as a list of - * space-delimited, - * case-sensitive strings. The strings are defined by the authorization server. - * If the value contains multiple space-delimited strings, their order does not matter, - * and each string adds an additional access range to the requested scope. - * From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2 - * @param connectTimeout the connect timeout in milliseconds - * @param readTimeout the read timeout in milliseconds - * @param trustCertsFilePath the path to the file that contains trusted certificates - * @return an Authentication object - */ - @Builder(builderMethodName = "clientCredentialsBuilder") - private static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope, - Integer connectTimeout, Integer readTimeout, - String trustCertsFilePath) { - ClientCredentialsFlow flow = ClientCredentialsFlow.builder() - .issuerUrl(issuerUrl) - .privateKey(credentialsUrl.toExternalForm()) - .audience(audience) - .scope(scope) - .connectTimeout(connectTimeout) - .readTimeout(readTimeout) - .trustCertsFilePath(trustCertsFilePath) - .build(); - return new AuthenticationOAuth2(flow, Clock.systemDefaultZone()); + public static ClientCredentialsBuilder clientCredentialsBuilder() { + return new ClientCredentialsBuilder(); + } + + public static class ClientCredentialsBuilder { + + private URL issuerUrl; + private URL credentialsUrl; + private String audience; + private String scope; + private Integer connectTimeout; + private Integer readTimeout; + private String trustCertsFilePath; + + private ClientCredentialsBuilder() { + } + + public ClientCredentialsBuilder issuerUrl(URL issuerUrl) { + this.issuerUrl = issuerUrl; + return this; + } + + public ClientCredentialsBuilder credentialsUrl(URL credentialsUrl) { + this.credentialsUrl = credentialsUrl; + return this; + } + + public ClientCredentialsBuilder audience(String audience) { + this.audience = audience; + return this; + } + + public ClientCredentialsBuilder scope(String scope) { + this.scope = scope; + return this; + } + + public ClientCredentialsBuilder connectTimeout(Integer connectTimeout) { + this.connectTimeout = connectTimeout; + return this; + } + + public ClientCredentialsBuilder readTimeout(Integer readTimeout) { + this.readTimeout = readTimeout; + return this; + } + + public ClientCredentialsBuilder trustCertsFilePath(String trustCertsFilePath) { + this.trustCertsFilePath = trustCertsFilePath; + return this; + } + + public Authentication build() { + ClientCredentialsFlow flow = ClientCredentialsFlow.builder() + .issuerUrl(issuerUrl) + .privateKey(credentialsUrl.toExternalForm()) + .audience(audience) + .scope(scope) + .connectTimeout(connectTimeout) + .readTimeout(readTimeout) + .trustCertsFilePath(trustCertsFilePath) + .build(); + return new AuthenticationOAuth2(flow, Clock.systemDefaultZone()); + } + } From bfdabb5b2314714191dc9cf6f8ded24ca855b482 Mon Sep 17 00:00:00 2001 From: Guillaume Lecroc Date: Wed, 5 Nov 2025 20:33:40 +0100 Subject: [PATCH 08/12] add licence header --- .../AuthenticationFactoryOAuth2Test.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java index 0a5d61163bc7c..7fa5b04ebda19 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.pulsar.client.impl.auth.oauth2; import static org.testng.Assert.assertTrue; From edd1739f61fe75b8bb87ed4484a94fc89dbff8fa Mon Sep 17 00:00:00 2001 From: gulecroc Date: Thu, 6 Nov 2025 07:51:09 +0100 Subject: [PATCH 09/12] Update pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java index 7f8335a6676db..9d6c2afafccdf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java @@ -141,8 +141,6 @@ static Integer parseParameterInt(Map params, String name) { @Override public void close() throws Exception { - if (httpClient != null) { - httpClient.close(); - } + httpClient.close(); } } From 8dfc2e5ef7d3311f72483c477bea5d19c83e94dd Mon Sep 17 00:00:00 2001 From: Guillaume Lecroc Date: Thu, 6 Nov 2025 08:08:01 +0100 Subject: [PATCH 10/12] fix interruptedException --- .../impl/auth/oauth2/protocol/DefaultMetadataResolver.java | 3 +++ .../pulsar/client/impl/auth/oauth2/protocol/TokenClient.java | 3 +++ 2 files changed, 6 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java index d82f1319b5a40..19d0c1acadd15 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java @@ -94,6 +94,9 @@ public Metadata resolve() throws IOException { return metadata; } catch (IOException | InterruptedException | ExecutionException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new IOException("Cannot obtain authorization metadata from " + metadataUrl, e); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java index 6c47219759bb3..cb4c2a551d01e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java @@ -111,6 +111,9 @@ public TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest re } catch (InterruptedException | ExecutionException e1) { + if (e1 instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new IOException(e1); } } From ffff9187ebf8ba87abe189db4c675c2a1c6a68c3 Mon Sep 17 00:00:00 2001 From: Guillaume Lecroc Date: Thu, 6 Nov 2025 08:11:49 +0100 Subject: [PATCH 11/12] fix NPE --- .../client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java index 04a8e108a81d0..055c18aa569f2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java @@ -115,7 +115,7 @@ public ClientCredentialsBuilder trustCertsFilePath(String trustCertsFilePath) { public Authentication build() { ClientCredentialsFlow flow = ClientCredentialsFlow.builder() .issuerUrl(issuerUrl) - .privateKey(credentialsUrl.toExternalForm()) + .privateKey(credentialsUrl == null ? null : credentialsUrl.toExternalForm()) .audience(audience) .scope(scope) .connectTimeout(connectTimeout) From 1708d104dffc01810927bde7311ff7df4c3f0f30 Mon Sep 17 00:00:00 2001 From: Guillaume Lecroc Date: Mon, 10 Nov 2025 22:52:07 +0100 Subject: [PATCH 12/12] use Duration for timeouts --- .../oauth2/AuthenticationFactoryOAuth2.java | 68 +++++++++++++++++-- .../auth/oauth2/ClientCredentialsFlow.java | 7 +- .../client/impl/auth/oauth2/FlowBase.java | 34 ++++++---- .../AuthenticationFactoryOAuth2Test.java | 5 +- 4 files changed, 89 insertions(+), 25 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java index 055c18aa569f2..033d5308a2a96 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java @@ -20,6 +20,7 @@ import java.net.URL; import java.time.Clock; +import java.time.Duration; import org.apache.pulsar.client.api.Authentication; /** @@ -37,8 +38,7 @@ public final class AuthenticationFactoryOAuth2 { * @return an Authentication object */ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience) { - return clientCredentialsBuilder().issuerUrl(issuerUrl).credentialsUrl(credentialsUrl).audience(audience) - .build(); + return clientCredentials(issuerUrl, credentialsUrl, audience, null); } /** @@ -60,6 +60,11 @@ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl .scope(scope).build(); } + /** + * A builder to create an authentication with client credentials. + * + * @return the builder + */ public static ClientCredentialsBuilder clientCredentialsBuilder() { return new ClientCredentialsBuilder(); } @@ -70,48 +75,99 @@ public static class ClientCredentialsBuilder { private URL credentialsUrl; private String audience; private String scope; - private Integer connectTimeout; - private Integer readTimeout; + private Duration connectTimeout; + private Duration readTimeout; private String trustCertsFilePath; private ClientCredentialsBuilder() { } + /** + * Required issuer URL. + * + * @param issuerUrl the issuer URL + * @return the builder + */ public ClientCredentialsBuilder issuerUrl(URL issuerUrl) { this.issuerUrl = issuerUrl; return this; } + /** + * Required credentials URL. + * + * @param credentialsUrl the credentials URL + * @return the builder + */ public ClientCredentialsBuilder credentialsUrl(URL credentialsUrl) { this.credentialsUrl = credentialsUrl; return this; } + /** + * Optional audience identifier used by some Identity Providers, like Auth0. + * + * @param audience the audiance + * @return the builder + */ public ClientCredentialsBuilder audience(String audience) { this.audience = audience; return this; } + /** + * Optional scope expressed as a list of space-delimited, case-sensitive strings. + * The strings are defined by the authorization server. + * If the value contains multiple space-delimited strings, their order does not matter, + * and each string adds an additional access range to the requested scope. + * From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2 + * + * @param scope the scope + * @return the builder + */ public ClientCredentialsBuilder scope(String scope) { this.scope = scope; return this; } - public ClientCredentialsBuilder connectTimeout(Integer connectTimeout) { + /** + * Optional HTTP connection timeout. + * + * @param connectTimeout the connect timeout + * @return the builder + */ + public ClientCredentialsBuilder connectTimeout(Duration connectTimeout) { this.connectTimeout = connectTimeout; return this; } - public ClientCredentialsBuilder readTimeout(Integer readTimeout) { + /** + * Optional HTTP read timeout. + * + * @param readTimeout the read timeout + * @return the builder + */ + public ClientCredentialsBuilder readTimeout(Duration readTimeout) { this.readTimeout = readTimeout; return this; } + /** + * Optional path to the file containing the trusted certificate(s) of the token issuer. + * + * @param trustCertsFilePath the path to the file containing the trusted certificate(s) + * @return the builder + */ public ClientCredentialsBuilder trustCertsFilePath(String trustCertsFilePath) { this.trustCertsFilePath = trustCertsFilePath; return this; } + /** + * Authenticate with client credentials. + * + * @return an Authentication object + */ public Authentication build() { ClientCredentialsFlow flow = ClientCredentialsFlow.builder() .issuerUrl(issuerUrl) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java index 53b1dbee23562..7f64c0b18ac73 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java @@ -26,6 +26,7 @@ import java.net.URL; import java.net.URLConnection; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Map; import lombok.Builder; import lombok.extern.slf4j.Slf4j; @@ -61,7 +62,7 @@ class ClientCredentialsFlow extends FlowBase { @Builder public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope, - Integer connectTimeout, Integer readTimeout, String trustCertsFilePath) { + Duration connectTimeout, Duration readTimeout, String trustCertsFilePath) { super(issuerUrl, connectTimeout, readTimeout, trustCertsFilePath); this.audience = audience; this.privateKey = privateKey; @@ -80,8 +81,8 @@ public static ClientCredentialsFlow fromParameters(Map params) { // These are optional parameters, so we only perform a get String scope = params.get(CONFIG_PARAM_SCOPE); String audience = params.get(CONFIG_PARAM_AUDIENCE); - Integer connectTimeout = parseParameterInt(params, CONFIG_PARAM_CONNECT_TIMEOUT); - Integer readTimeout = parseParameterInt(params, CONFIG_PARAM_READ_TIMEOUT); + Duration connectTimeout = parseParameterDuration(params, CONFIG_PARAM_CONNECT_TIMEOUT); + Duration readTimeout = parseParameterDuration(params, CONFIG_PARAM_READ_TIMEOUT); String trustCertsFilePath = params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH); return ClientCredentialsFlow.builder() diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java index 9d6c2afafccdf..6cc9f8e41b5e4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.time.Duration; +import java.time.format.DateTimeParseException; import java.util.Map; import javax.net.ssl.SSLException; import lombok.extern.slf4j.Slf4j; @@ -46,8 +48,8 @@ abstract class FlowBase implements Flow { public static final String CONFIG_PARAM_READ_TIMEOUT = "readTimeout"; public static final String CONFIG_PARAM_TRUST_CERTS_FILE_PATH = "trustCertsFilePath"; - protected static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10; - protected static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30; + protected static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(10); + protected static final Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(30); private static final long serialVersionUID = 1L; @@ -56,21 +58,22 @@ abstract class FlowBase implements Flow { protected transient Metadata metadata; - protected FlowBase(URL issuerUrl, Integer connectTimeout, Integer readTimeout, String trustCertsFilePath) { + protected FlowBase(URL issuerUrl, Duration connectTimeout, Duration readTimeout, String trustCertsFilePath) { this.issuerUrl = issuerUrl; this.httpClient = defaultHttpClient(readTimeout, connectTimeout, trustCertsFilePath); } - private AsyncHttpClient defaultHttpClient(Integer readTimeout, Integer connectTimeout, String trustCertsFilePath) { + private AsyncHttpClient defaultHttpClient(Duration readTimeout, Duration connectTimeout, + String trustCertsFilePath) { DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); confBuilder.setCookieStore(null); confBuilder.setUseProxyProperties(true); confBuilder.setFollowRedirect(true); confBuilder.setConnectTimeout( - getParameterInt(CONFIG_PARAM_CONNECT_TIMEOUT, connectTimeout, - DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000)); + getParameterDurationToMillis(CONFIG_PARAM_CONNECT_TIMEOUT, connectTimeout, + DEFAULT_CONNECT_TIMEOUT)); confBuilder.setReadTimeout( - getParameterInt(CONFIG_PARAM_READ_TIMEOUT, readTimeout, DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000)); + getParameterDurationToMillis(CONFIG_PARAM_READ_TIMEOUT, readTimeout, DEFAULT_READ_TIMEOUT)); confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); if (StringUtils.isNotBlank(trustCertsFilePath)) { try { @@ -84,14 +87,17 @@ private AsyncHttpClient defaultHttpClient(Integer readTimeout, Integer connectTi return new DefaultAsyncHttpClient(confBuilder.build()); } - private int getParameterInt(String name, Integer value, int defaultValue) { + private int getParameterDurationToMillis(String name, Duration value, Duration defaultValue) { + Duration duration; if (value == null) { log.info("Configuration for [{}] is using the default value: [{}]", name, defaultValue); - return defaultValue; + duration = defaultValue; } else { log.info("Configuration for [{}] is: [{}]", name, value); - return value; + duration = value; } + + return (int) duration.toMillis(); } public void initialize() throws PulsarClientException { @@ -127,13 +133,13 @@ static URL parseParameterUrl(Map params, String name) { } } - static Integer parseParameterInt(Map params, String name) { + static Duration parseParameterDuration(Map params, String name) { String value = params.get(name); if (StringUtils.isNotBlank(value)) { try { - return Integer.parseInt(value); - } catch (NumberFormatException numberFormatException) { - throw new IllegalArgumentException("Malformed configuration parameter: " + name); + return Duration.parse(value); + } catch (DateTimeParseException e) { + throw new IllegalArgumentException("Malformed configuration parameter: " + name, e); } } return null; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java index 7fa5b04ebda19..602aafa7b6c91 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java @@ -21,6 +21,7 @@ import static org.testng.Assert.assertTrue; import java.io.IOException; import java.net.URL; +import java.time.Duration; import org.apache.pulsar.client.api.Authentication; import org.testng.annotations.Test; @@ -32,8 +33,8 @@ public void testBuilder() throws IOException { URL credentialsUrl = new URL("http://localhost"); String audience = "audience"; String scope = "scope"; - Integer connectTimeout = 10001; - Integer readTimeout = 30001; + Duration connectTimeout = Duration.parse("PT11S"); + Duration readTimeout = Duration.ofSeconds(31); String trustCertsFilePath = null; try (Authentication authentication = AuthenticationFactoryOAuth2.clientCredentialsBuilder().issuerUrl(issuerUrl)