Skip to content

Commit

Permalink
Merge branch 'feature/cluster-reuse-webclients'
Browse files Browse the repository at this point in the history
  • Loading branch information
tonydamage committed Sep 27, 2023
2 parents 7451728 + 3b82ad7 commit 6e6d145
Showing 1 changed file with 46 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
* Helps with the intra-cluster remote code execution.
Expand All @@ -60,8 +61,11 @@ public class ClusterExecutionHelperImpl implements ClusterExecutionHelper {
@Autowired private MidpointJsonProvider<?> jsonProvider;
@Autowired private MidpointYamlProvider<?> yamlProvider;

private Map<String, WebClient> clients = new ConcurrentHashMap<>();

private static final String DOT_CLASS = ClusterExecutionHelperImpl.class.getName() + ".";


@Override
public void execute(@NotNull ClientCode code, ClusterExecutionOptions options, String context, OperationResult parentResult) {
OperationResult result = parentResult.createSubresult(DOT_CLASS + "execute");
Expand Down Expand Up @@ -173,8 +177,9 @@ public OperationResult execute(@NotNull NodeType node, @NotNull ClientCode code,
if (isUpAndAlive || ClusterExecutionOptions.isTryAllNodes(options) ||
!isDead && ClusterExecutionOptions.isTryNodesInTransition(options)) {
try {
WebClient client = createClient(node, options, context);
WebClient client = getOrCreateClient(node, options, context);
if (client != null) {
resetClientForUse(client, options, context);
code.execute(client, node, result);
} else {
result.recordStatus(OperationResultStatus.NOT_APPLICABLE, "Node " + nodeIdentifier +
Expand All @@ -199,7 +204,30 @@ public OperationResult execute(@NotNull NodeType node, @NotNull ClientCode code,
return result;
}

private WebClient createClient(NodeType node, ClusterExecutionOptions options, String context) throws SchemaException {
/**
* Creates webclient if no webclient exists for specified node, otherwise reuses existing instance.
*
* Returned WebClient is not safe, and must be used under lock to prevent mixing URLs between calls.
*
* This limits use to one concurrent thread sending messages to one cluster node.
*
* The reason for reusing {@link WebClient} is that WebClient uses internal JVM HTTP Client whose
* smarts actually works against us.
*
* See MID-9106, but in short: WebClient creates HttpClient if used for first time. HttpClient also supports asynchronous use-cases,
* which means they do reference counting and WeakReferences to detect if client is unused and only then they free underlying threads.
*
* Before Java 21 this was only way to free allocated threads in HTTP Client (practicly wait till garbage collector marks
* this objects for garbage collection). But if WebClient was allocated in bursts, it was possible to reach thread count limit
* before any GC was able to kill outdated HTTP Clients.
*
* In Java 11+ there is option to shutdown HTTPClient manually, but that requires reflection and lowering JVM access protection
* which does not work correctly.
*
* So only doable solution is actually reuse WebClient (which requires synchronizing on it) to not allow for spawning large
* numbers of HTTPClients (+2 threads for each HTTPClient).
*/
private WebClient getOrCreateClient(NodeType node, ClusterExecutionOptions options, String context) throws SchemaException {
String baseUrl;
if (node.getUrl() != null) {
baseUrl = node.getUrl();
Expand All @@ -210,7 +238,22 @@ private WebClient createClient(NodeType node, ClusterExecutionOptions options, S

String url = baseUrl + "/ws/cluster";
LOGGER.debug("Going to execute '{}' on '{}'", context, url);
WebClient client = WebClient.create(url, Arrays.asList(xmlProvider, jsonProvider, yamlProvider));

WebClient client = clients.get(url);
if (client == null) {
client = WebClient.create(url, Arrays.asList(xmlProvider, jsonProvider, yamlProvider), true);
// Client was null, so we create it, but maybe other thread created also client, so we do putIfAbsent
// if previous return non-null, there is already existing client, so we should use this
// and let garbage collection take care of rest.
var previous = clients.putIfAbsent(url, client);
if (previous != null) {
client = previous;
}
}
return client;
}
private WebClient resetClientForUse(WebClient client, ClusterExecutionOptions options, String context) throws SchemaException {
client.reset();
if (!ClusterExecutionOptions.isSkipDefaultAccept(options)) {
client.accept(MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, "application/yaml");
}
Expand Down

0 comments on commit 6e6d145

Please sign in to comment.