Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void initialize() throws PulsarClientException {
assert this.metadata != null;

URL tokenUrl = this.metadata.getTokenEndpoint();
this.exchanger = new TokenClient(tokenUrl, httpClient);
this.exchanger = new TokenClient(tokenUrl, getHttpClient());
initialized = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,22 @@ abstract class FlowBase implements Flow {
private static final long serialVersionUID = 1L;

protected final URL issuerUrl;
protected final AsyncHttpClient httpClient;
private final Duration connectTimeout;
private final Duration readTimeout;
private final String trustCertsFilePath;
protected final String wellKnownMetadataPath;

protected transient Metadata metadata;
private transient AsyncHttpClient httpClient;

protected FlowBase(URL issuerUrl, Duration connectTimeout, Duration readTimeout, String trustCertsFilePath,
String wellKnownMetadataPath) {
this.issuerUrl = issuerUrl;
this.httpClient = defaultHttpClient(readTimeout, connectTimeout, trustCertsFilePath);
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
this.trustCertsFilePath = trustCertsFilePath;
this.wellKnownMetadataPath = wellKnownMetadataPath;
getHttpClient();
}

private AsyncHttpClient defaultHttpClient(Duration readTimeout, Duration connectTimeout,
Expand All @@ -91,6 +97,13 @@ private AsyncHttpClient defaultHttpClient(Duration readTimeout, Duration connect
return new DefaultAsyncHttpClient(confBuilder.build());
}

protected synchronized AsyncHttpClient getHttpClient() {
if (httpClient == null) {
httpClient = defaultHttpClient(readTimeout, connectTimeout, trustCertsFilePath);
}
return httpClient;
}

private int getParameterDurationToMillis(String name, Duration value, Duration defaultValue) {
Duration duration;
if (value == null) {
Expand Down Expand Up @@ -118,7 +131,7 @@ public void initialize() throws PulsarClientException {
}

protected MetadataResolver createMetadataResolver() {
return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, httpClient, wellKnownMetadataPath);
return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, getHttpClient(), wellKnownMetadataPath);
}

static String parseParameterString(Map<String, String> params, String name) {
Expand Down Expand Up @@ -155,6 +168,8 @@ static Duration parseParameterDuration(Map<String, String> params, String name)

@Override
public void close() throws Exception {
httpClient.close();
if (httpClient != null) {
httpClient.close();
}
}
}
Loading