diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java index fe7beb47ed21f..f3c85ca5cdd62 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java @@ -136,7 +136,7 @@ public void initialize() throws PulsarClientException { assert this.metadata != null; URL tokenUrl = this.metadata.getTokenEndpoint(); - this.exchanger = new TokenClient(tokenUrl, httpClient); + this.exchanger = new TokenClient(tokenUrl, getHttpClient()); initialized = true; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java index 9649d17903148..63248e8605108 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java @@ -55,16 +55,22 @@ abstract class FlowBase implements Flow { private static final long serialVersionUID = 1L; protected final URL issuerUrl; - protected final AsyncHttpClient httpClient; + private final Duration connectTimeout; + private final Duration readTimeout; + private final String trustCertsFilePath; protected final String wellKnownMetadataPath; protected transient Metadata metadata; + private transient AsyncHttpClient httpClient; protected FlowBase(URL issuerUrl, Duration connectTimeout, Duration readTimeout, String trustCertsFilePath, String wellKnownMetadataPath) { this.issuerUrl = issuerUrl; - this.httpClient = defaultHttpClient(readTimeout, connectTimeout, trustCertsFilePath); + this.connectTimeout = connectTimeout; + this.readTimeout = readTimeout; + this.trustCertsFilePath = trustCertsFilePath; this.wellKnownMetadataPath = wellKnownMetadataPath; + getHttpClient(); } private AsyncHttpClient defaultHttpClient(Duration readTimeout, Duration connectTimeout, @@ -91,6 +97,13 @@ private AsyncHttpClient defaultHttpClient(Duration readTimeout, Duration connect return new DefaultAsyncHttpClient(confBuilder.build()); } + protected synchronized AsyncHttpClient getHttpClient() { + if (httpClient == null) { + httpClient = defaultHttpClient(readTimeout, connectTimeout, trustCertsFilePath); + } + return httpClient; + } + private int getParameterDurationToMillis(String name, Duration value, Duration defaultValue) { Duration duration; if (value == null) { @@ -118,7 +131,7 @@ public void initialize() throws PulsarClientException { } protected MetadataResolver createMetadataResolver() { - return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, httpClient, wellKnownMetadataPath); + return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, getHttpClient(), wellKnownMetadataPath); } static String parseParameterString(Map params, String name) { @@ -155,6 +168,8 @@ static Duration parseParameterDuration(Map params, String name) @Override public void close() throws Exception { - httpClient.close(); + if (httpClient != null) { + httpClient.close(); + } } }