Skip to content

Commit

Permalink
Implement Java 11's HttpClient within RestClient [HZ-2460] (hazelcast…
Browse files Browse the repository at this point in the history
…#25654)

During the investigation of - and solution generation for -
hazelcast#24613, it was noted that
Java 11 provides the new `HttpClient` that features interruptible
connections - the original solution was kept in place for 5.3, but with
5.4 we have now dropped JDK 8 support, so we can introduce this JDK 11
feature 🎉

This PR does exactly that, introducing `HttpClient` within the
`RestClient` implementation we maintain as a Proof of Concept - this
allows connections to be interrupted, as demonstrated in the
`KubernetesClient` implementation, which is where the majority of focus
for this PoC has been. Changes are generally fairly minimal outside of
the `RestClient` itself and `KubernetesClient` implementation, with the
main areas requiring changes being those that used
`#withCaCertificates()` (now replaced by a separate constructor for
clients using SSL), and timeout changes (connection/read timeouts are
now unified).

This PR now also includes a regression test for the original issue that
was encountered, ensuring the `HttpClient` solution still resolves the
issue.
  • Loading branch information
JamesHazelcast authored and Fly-Style committed Oct 31, 2023
1 parent 76e0ada commit 3610ffd
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,8 @@ private static boolean iamRoleAttached() {
}

static boolean isEndpointAvailable(String url) {
return !RestClient.create(url)
.withConnectTimeoutSeconds(1)
.withReadTimeoutSeconds(1)
return !RestClient.create(url, 1)
.withRequestTimeoutSeconds(1)
.withRetries(1)
.get()
.getBody()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ static String currentTimestamp(Clock clock) {
}

static RestClient createRestClient(String url, AwsConfig awsConfig) {
return RestClient.create(url)
.withConnectTimeoutSeconds(awsConfig.getConnectionTimeoutSeconds())
.withReadTimeoutSeconds(awsConfig.getReadTimeoutSeconds())
.withRetries(awsConfig.getConnectionRetries());
return RestClient.create(url, awsConfig.getConnectionTimeoutSeconds())
.withRequestTimeoutSeconds(awsConfig.getReadTimeoutSeconds())
.withRetries(awsConfig.getConnectionRetries());
}

static String canonicalQueryString(Map<String, String> attributes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,8 @@ private static boolean azureInstanceMetadataAvailable() {


static boolean isEndpointAvailable(String url) {
return !RestClient.create(url)
.withConnectTimeoutSeconds(1)
.withReadTimeoutSeconds(1)
return !RestClient.create(url, 1)
.withRequestTimeoutSeconds(1)
.withRetries(1)
.withHeader("Metadata", "True")
.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,8 @@ private static boolean serviceAccountAttached() {
}

static boolean isEndpointAvailable(String url) {
return !RestClient.create(url)
.withConnectTimeoutSeconds(1)
.withReadTimeoutSeconds(1)
return !RestClient.create(url, 1)
.withRequestTimeoutSeconds(1)
.withRetries(1)
.withHeader("Metadata-Flavor", "Google")
.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.hazelcast.internal.util.HostnameUtil;
import com.hazelcast.internal.util.StringUtil;
import com.hazelcast.internal.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.internal.util.concurrent.ThreadFactoryImpl;
import com.hazelcast.kubernetes.KubernetesConfig.ExposeExternallyMode;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
Expand All @@ -46,8 +45,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static com.hazelcast.instance.impl.ClusterTopologyIntentTracker.UNKNOWN;
import static java.util.Arrays.asList;
Expand All @@ -73,10 +70,13 @@ class KubernetesClient {
private static final List<String> NON_RETRYABLE_KEYWORDS = asList(
"\"reason\":\"Forbidden\"",
"\"reason\":\"NotFound\"",
"Failure in generating SSLSocketFactory");
"Failure in generating SSLSocketFactory",
"REST call interrupted");

private static final int STS_MONITOR_SHUTDOWN_AWAIT_TIMEOUT_MS = 1000;

@Nullable
final StsMonitorThread stsMonitorThread;
private final String stsName;
private final String namespace;
private final String kubernetesMaster;
Expand All @@ -87,8 +87,6 @@ class KubernetesClient {
private final boolean useNodeNameAsExternalAddress;
private final String servicePerPodLabelName;
private final String servicePerPodLabelValue;
@Nullable
private final StsMonitorThread stsMonitorThread;

private final KubernetesTokenProvider tokenProvider;

Expand Down Expand Up @@ -175,12 +173,12 @@ public void start() {
}

public void destroy() {
// It's important we interrupt the StsMonitorThread first, as the ClusterTopologyIntentTracker
// It's important we shut down the StsMonitorThread first, as the ClusterTopologyIntentTracker
// receives messages from this thread, and we want to let it process all available messages
// before the intent tracker is shutdown
if (stsMonitorThread != null) {
LOGGER.info("Interrupting StatefulSet monitor thread");
stsMonitorThread.interrupt();
LOGGER.info("Shutting down StatefulSet monitor thread");
stsMonitorThread.shutdown();
}

if (clusterTopologyIntentTracker != null) {
Expand Down Expand Up @@ -597,11 +595,10 @@ private static List<Endpoint> createEndpoints(List<Endpoint> endpoints, Map<Stri
*/
private JsonObject callGet(final String urlString) {
return RetryUtils.retry(() -> Json
.parse(RestClient.create(urlString)
.parse((caCertificate == null ? RestClient.create(urlString, CONNECTION_TIMEOUT_SECONDS)
: RestClient.createWithSSL(urlString, caCertificate, CONNECTION_TIMEOUT_SECONDS))
.withHeader("Authorization", String.format("Bearer %s", tokenProvider.getToken()))
.withCaCertificates(caCertificate)
.withConnectTimeoutSeconds(CONNECTION_TIMEOUT_SECONDS)
.withReadTimeoutSeconds(READ_TIMEOUT_SECONDS)
.withRequestTimeoutSeconds(READ_TIMEOUT_SECONDS)
.get()
.getBody())
.asObject(), retries, NON_RETRYABLE_KEYWORDS);
Expand Down Expand Up @@ -761,6 +758,8 @@ final class StsMonitorThread extends Thread {

// used only for tests
volatile boolean running = true;
volatile boolean finished;
volatile boolean shuttingDown;

String latestResourceVersion;
RuntimeContext latestRuntimeContext;
Expand All @@ -770,19 +769,11 @@ final class StsMonitorThread extends Thread {
private final String stsUrlString;
private final BackoffIdleStrategy backoffIdleStrategy;

// We offload reading to a separate Thread to allow us to interrupt the reading operation
// when this Thread needs to be shutdown, avoiding the need to terminate this thread in a
// non-graceful manner, which could lead to uncompleted message handling - without using a
// separate thread, BufferedReader#readLine() blocks until data is received and does not
// handle Thread#interrupt()
private final ExecutorService readExecutor;

StsMonitorThread() {
super("hz-k8s-sts-monitor");
stsUrlString = formatStsListUrl();
backoffIdleStrategy = new BackoffIdleStrategy(MAX_SPINS, MAX_YIELDS,
MILLISECONDS.toNanos(MIN_PARK_PERIOD_MILLIS), SECONDS.toNanos(MAX_PARK_PERIOD_SECONDS));
readExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("hz-k8s-sts-monitor-reader"));
}

/**
Expand All @@ -798,7 +789,7 @@ public void run() {
String message;

while (running) {
if (Thread.interrupted()) {
if (shuttingDown) {
break;
}
try {
Expand All @@ -809,6 +800,10 @@ public void run() {
updateTracker(previous, latestRuntimeContext);
watchResponse = sendWatchRequest();
} catch (RestClientException e) {
// interrupts during shutdown will trigger a RestClientException
if (shuttingDown) {
break;
}
handleFailure(e);
// always retry after a RestClientException
continue;
Expand All @@ -820,14 +815,34 @@ public void run() {
onMessage(message);
}
} catch (IOException e) {
LOGGER.info("Exception while watching for StatefulSet changes", e);
try {
watchResponse.disconnect();
} catch (Exception t) {
LOGGER.fine("Exception while closing connection after an IOException", t);
// If we're shutting down, the watchResponse is already disconnected, and
// the IOException can be disregarded; otherwise continue with logging
if (!shuttingDown) {
LOGGER.info("Exception while watching for StatefulSet changes", e);

try {
watchResponse.disconnect();
} catch (Exception t) {
LOGGER.fine("Exception while closing connection after an IOException", t);
}
}
}
}
finished = true;
}

public void shutdown() {
this.shuttingDown = true;
try {
if (watchResponse != null) {
watchResponse.disconnect();
}
} catch (IOException e) {
LOGGER.fine("Exception while closing connection during shutdown", e);
}
// Interrupt thread as we may be in the process of making watch requests
// or other calls that need to be interrupted for us to shut down promptly
stsMonitorThread.interrupt();
}

private void handleFailure(RestClientException e) {
Expand Down Expand Up @@ -871,11 +886,11 @@ void readInitialStsList() {
* from Kubernetes API server.
*/
@Nonnull
RestClient.WatchResponse sendWatchRequest() {
RestClient restClient = RestClient.create(stsUrlString)
.withHeader("Authorization", String.format("Bearer %s", tokenProvider.getToken()))
.withCaCertificates(caCertificate);
return restClient.watch(latestResourceVersion, readExecutor);
RestClient.WatchResponse sendWatchRequest() throws RestClientException {
RestClient restClient = (caCertificate == null ? RestClient.create(stsUrlString)
: RestClient.createWithSSL(stsUrlString, caCertificate))
.withHeader("Authorization", String.format("Bearer %s", tokenProvider.getToken()));
return restClient.watch(latestResourceVersion);
}

@Nullable
Expand Down

0 comments on commit 3610ffd

Please sign in to comment.