Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Java 11's HttpClient within RestClient [HZ-2460] #25654

Merged
merged 19 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ private static boolean iamRoleAttached() {

static boolean isEndpointAvailable(String url) {
return !RestClient.create(url)
.withConnectTimeoutSeconds(1)
.withReadTimeoutSeconds(1)
.withTimeoutSeconds(1)
.withRetries(1)
.get()
.getBody()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ static String currentTimestamp(Clock clock) {

static RestClient createRestClient(String url, AwsConfig awsConfig) {
return RestClient.create(url)
.withConnectTimeoutSeconds(awsConfig.getConnectionTimeoutSeconds())
.withReadTimeoutSeconds(awsConfig.getReadTimeoutSeconds())
.withPreferredTimeoutSeconds(awsConfig.getConnectionTimeoutSeconds(), awsConfig.getReadTimeoutSeconds())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the AWS config, It probably makes sense to have both - the timeout and the connectionTimeout (using HttpClient.Builder.connectTimeout(Duration)). If we want to support just one, we should improve the AWS config too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, thanks @kwart - I had originally dropped separate values since they seemed to be the same in all areas of the codebase, but this doesn't account for customizable values such as an AWS config. I've restored functionality for both timeouts in this commit 👍 6ea8e09

.withRetries(awsConfig.getConnectionRetries());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ private static boolean azureInstanceMetadataAvailable() {

static boolean isEndpointAvailable(String url) {
return !RestClient.create(url)
.withConnectTimeoutSeconds(1)
.withReadTimeoutSeconds(1)
.withTimeoutSeconds(1)
.withRetries(1)
.withHeader("Metadata", "True")
.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ private static boolean serviceAccountAttached() {

static boolean isEndpointAvailable(String url) {
return !RestClient.create(url)
.withConnectTimeoutSeconds(1)
.withReadTimeoutSeconds(1)
.withTimeoutSeconds(1)
.withRetries(1)
.withHeader("Metadata-Flavor", "Google")
.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.hazelcast.internal.util.HostnameUtil;
import com.hazelcast.internal.util.StringUtil;
import com.hazelcast.internal.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.internal.util.concurrent.ThreadFactoryImpl;
import com.hazelcast.kubernetes.KubernetesConfig.ExposeExternallyMode;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
Expand All @@ -46,8 +45,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static com.hazelcast.instance.impl.ClusterTopologyIntentTracker.UNKNOWN;
import static java.util.Arrays.asList;
Expand All @@ -68,15 +65,17 @@ class KubernetesClient {
private static final int HTTP_FORBIDDEN = 403;

private static final int CONNECTION_TIMEOUT_SECONDS = 10;
private static final int READ_TIMEOUT_SECONDS = 10;

private static final List<String> NON_RETRYABLE_KEYWORDS = asList(
"\"reason\":\"Forbidden\"",
"\"reason\":\"NotFound\"",
"Failure in generating SSLSocketFactory");
"Failure in generating SSLSocketFactory",
"REST call interrupted");

private static final int STS_MONITOR_SHUTDOWN_AWAIT_TIMEOUT_MS = 1000;

@Nullable
final StsMonitorThread stsMonitorThread;
private final String stsName;
private final String namespace;
private final String kubernetesMaster;
Expand All @@ -87,8 +86,6 @@ class KubernetesClient {
private final boolean useNodeNameAsExternalAddress;
private final String servicePerPodLabelName;
private final String servicePerPodLabelValue;
@Nullable
private final StsMonitorThread stsMonitorThread;

private final KubernetesTokenProvider tokenProvider;

Expand Down Expand Up @@ -175,12 +172,12 @@ public void start() {
}

public void destroy() {
// It's important we interrupt the StsMonitorThread first, as the ClusterTopologyIntentTracker
// It's important we shut down the StsMonitorThread first, as the ClusterTopologyIntentTracker
// receives messages from this thread, and we want to let it process all available messages
// before the intent tracker is shutdown
if (stsMonitorThread != null) {
LOGGER.info("Interrupting StatefulSet monitor thread");
stsMonitorThread.interrupt();
LOGGER.info("Shutting down StatefulSet monitor thread");
stsMonitorThread.shutdown();
}

if (clusterTopologyIntentTracker != null) {
Expand Down Expand Up @@ -597,11 +594,10 @@ private static List<Endpoint> createEndpoints(List<Endpoint> endpoints, Map<Stri
*/
private JsonObject callGet(final String urlString) {
return RetryUtils.retry(() -> Json
.parse(RestClient.create(urlString)
.parse((caCertificate == null ? RestClient.create(urlString)
: RestClient.createWithSSL(urlString, caCertificate))
.withHeader("Authorization", String.format("Bearer %s", tokenProvider.getToken()))
.withCaCertificates(caCertificate)
.withConnectTimeoutSeconds(CONNECTION_TIMEOUT_SECONDS)
.withReadTimeoutSeconds(READ_TIMEOUT_SECONDS)
.withTimeoutSeconds(CONNECTION_TIMEOUT_SECONDS)
.get()
.getBody())
.asObject(), retries, NON_RETRYABLE_KEYWORDS);
Expand Down Expand Up @@ -761,6 +757,8 @@ final class StsMonitorThread extends Thread {

// used only for tests
volatile boolean running = true;
volatile boolean finished;
volatile boolean shuttingDown;

String latestResourceVersion;
RuntimeContext latestRuntimeContext;
Expand All @@ -770,19 +768,11 @@ final class StsMonitorThread extends Thread {
private final String stsUrlString;
private final BackoffIdleStrategy backoffIdleStrategy;

// We offload reading to a separate Thread to allow us to interrupt the reading operation
// when this Thread needs to be shutdown, avoiding the need to terminate this thread in a
// non-graceful manner, which could lead to uncompleted message handling - without using a
// separate thread, BufferedReader#readLine() blocks until data is received and does not
// handle Thread#interrupt()
private final ExecutorService readExecutor;

StsMonitorThread() {
super("hz-k8s-sts-monitor");
stsUrlString = formatStsListUrl();
backoffIdleStrategy = new BackoffIdleStrategy(MAX_SPINS, MAX_YIELDS,
MILLISECONDS.toNanos(MIN_PARK_PERIOD_MILLIS), SECONDS.toNanos(MAX_PARK_PERIOD_SECONDS));
readExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("hz-k8s-sts-monitor-reader"));
}

/**
Expand All @@ -798,7 +788,7 @@ public void run() {
String message;

while (running) {
if (Thread.interrupted()) {
if (shuttingDown) {
break;
}
try {
Expand All @@ -809,6 +799,10 @@ public void run() {
updateTracker(previous, latestRuntimeContext);
watchResponse = sendWatchRequest();
} catch (RestClientException e) {
// interrupts during shutdown will trigger a RestClientException
if (shuttingDown) {
break;
}
handleFailure(e);
// always retry after a RestClientException
continue;
Expand All @@ -820,14 +814,34 @@ public void run() {
onMessage(message);
}
} catch (IOException e) {
LOGGER.info("Exception while watching for StatefulSet changes", e);
try {
watchResponse.disconnect();
} catch (Exception t) {
LOGGER.fine("Exception while closing connection after an IOException", t);
// If we're shutting down, the watchResponse is already disconnected, and
// the IOException can be disregarded; otherwise continue with logging
if (!shuttingDown) {
LOGGER.info("Exception while watching for StatefulSet changes", e);

try {
watchResponse.disconnect();
} catch (Exception t) {
LOGGER.fine("Exception while closing connection after an IOException", t);
}
}
}
}
finished = true;
}

public void shutdown() {
this.shuttingDown = true;
try {
if (watchResponse != null) {
watchResponse.disconnect();
}
} catch (IOException e) {
LOGGER.fine("Exception while closing connection during shutdown", e);
}
// Interrupt thread as we may be in the process of making watch requests
// or other calls that need to be interrupted for us to shut down promptly
stsMonitorThread.interrupt();
}

private void handleFailure(RestClientException e) {
Expand Down Expand Up @@ -871,11 +885,11 @@ void readInitialStsList() {
* from Kubernetes API server.
*/
@Nonnull
RestClient.WatchResponse sendWatchRequest() {
RestClient restClient = RestClient.create(stsUrlString)
.withHeader("Authorization", String.format("Bearer %s", tokenProvider.getToken()))
.withCaCertificates(caCertificate);
return restClient.watch(latestResourceVersion, readExecutor);
RestClient.WatchResponse sendWatchRequest() throws RestClientException {
RestClient restClient = (caCertificate == null ? RestClient.create(stsUrlString)
: RestClient.createWithSSL(stsUrlString, caCertificate))
.withHeader("Authorization", String.format("Bearer %s", tokenProvider.getToken()));
return restClient.watch(latestResourceVersion);
}

@Nullable
Expand Down