Skip to content

Commit

Permalink
Reduce usage of Client utils with remote clusters (#104552)
Browse files Browse the repository at this point in the history
In practice almost all of the utility methods on `Client` are only used
on the local node, and many of them are untested, or positively do not
work, when targetting a remote cluster (see #100111). This commit
removes most of the remaining usages of these utility methods against
remote clusters as a step towards separating the remote-cluster client
from the local-node client.

Relates #104536
  • Loading branch information
DaveCTurner committed Jan 18, 2024
1 parent 84a71ea commit 4121cf6
Show file tree
Hide file tree
Showing 13 changed files with 178 additions and 117 deletions.
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand Down Expand Up @@ -528,9 +527,9 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
} else {
// forward to remote cluster
String clusterAlias = request.getContextSetup().getClusterAlias();
Client remoteClusterClient = transportService.getRemoteClusterService()
.getRemoteClusterClient(threadPool, clusterAlias, EsExecutors.DIRECT_EXECUTOR_SERVICE);
remoteClusterClient.admin().cluster().execute(PainlessExecuteAction.INSTANCE, request, listener);
transportService.getRemoteClusterService()
.getRemoteClusterClient(threadPool, clusterAlias, EsExecutors.DIRECT_EXECUTOR_SERVICE)
.execute(PainlessExecuteAction.INSTANCE, request, listener);
}
}

Expand Down
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
Expand Down Expand Up @@ -493,13 +492,13 @@ protected void doExecute(Task task, Request request, final ActionListener<Respon
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(
var remoteClusterClient = remoteClusterService.getRemoteClusterClient(
threadPool,
clusterAlias,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
Request remoteRequest = new Request(originalIndices.indices(), originalIndices.indicesOptions());
remoteClusterClient.admin().indices().resolveIndex(remoteRequest, ActionListener.wrap(response -> {
remoteClusterClient.execute(ResolveIndexAction.INSTANCE, remoteRequest, ActionListener.wrap(response -> {
remoteResponses.put(clusterAlias, response);
terminalHandler.run();
}, failure -> terminalHandler.run()));
Expand Down
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -213,7 +212,7 @@ private void doExecuteForked(Task task, FieldCapabilitiesRequest request, final
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
Client remoteClusterClient = transportService.getRemoteClusterService()
var remoteClusterClient = transportService.getRemoteClusterService()
.getRemoteClusterClient(threadPool, clusterAlias, searchCoordinationExecutor);
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(request, originalIndices, nowInMillis);
ActionListener<FieldCapabilitiesResponse> remoteListener = ActionListener.wrap(response -> {
Expand All @@ -234,7 +233,11 @@ private void doExecuteForked(Task task, FieldCapabilitiesRequest request, final
handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex);
}
});
remoteClusterClient.fieldCaps(remoteRequest, ActionListener.releaseAfter(remoteListener, refs.acquire()));
remoteClusterClient.execute(
TransportFieldCapabilitiesAction.TYPE,
remoteRequest,
ActionListener.releaseAfter(remoteListener, refs.acquire())
);
}
}
}
Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -526,12 +525,8 @@ static void ccsRemoteReduce(
timeProvider.absoluteStartMillis(),
true
);
Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(
threadPool,
clusterAlias,
remoteClientResponseExecutor
);
remoteClusterClient.search(ccsSearchRequest, new ActionListener<>() {
var remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias, remoteClientResponseExecutor);
remoteClusterClient.execute(TransportSearchAction.TYPE, ccsSearchRequest, new ActionListener<>() {
@Override
public void onResponse(SearchResponse searchResponse) {
// overwrite the existing cluster entry with the updated one
Expand Down Expand Up @@ -609,12 +604,12 @@ public void onFailure(Exception e) {
task.getProgressListener(),
listener
);
Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(
final var remoteClusterClient = remoteClusterService.getRemoteClusterClient(
threadPool,
clusterAlias,
remoteClientResponseExecutor
);
remoteClusterClient.search(ccsSearchRequest, ccsListener);
remoteClusterClient.execute(TransportSearchAction.TYPE, ccsSearchRequest, ccsListener);
}
if (localIndices != null) {
ActionListener<SearchResponse> ccsListener = createCCSListener(
Expand Down
Expand Up @@ -9,9 +9,13 @@

import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
Expand Down Expand Up @@ -90,22 +94,21 @@ public void testConnectAndExecuteRequest() throws Exception {
logger.info("now accepting incoming requests on local transport");
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode));
Client client = remoteClusterService.getRemoteClusterClient(
var client = remoteClusterService.getRemoteClusterClient(
threadPool,
"test",
threadPool.executor(TEST_THREAD_POOL_NAME),
randomBoolean()
);
ClusterStateResponse clusterStateResponse = PlainActionFuture.get(
future -> client.admin()
.cluster()
.prepareState()
.execute(
ActionListener.runBefore(
future,
() -> assertTrue(Thread.currentThread().getName().contains('[' + TEST_THREAD_POOL_NAME + ']'))
)
),
future -> client.execute(
ClusterStateAction.INSTANCE,
new ClusterStateRequest(),
ActionListener.runBefore(
future,
() -> assertTrue(Thread.currentThread().getName().contains('[' + TEST_THREAD_POOL_NAME + ']'))
)
),
10,
TimeUnit.SECONDS
);
Expand All @@ -114,7 +117,9 @@ public void testConnectAndExecuteRequest() throws Exception {
// also test a failure, there is no handler for scroll registered
ActionNotFoundTransportException ex = expectThrows(
ActionNotFoundTransportException.class,
() -> client.prepareSearchScroll("").get()
() -> PlainActionFuture.<SearchResponse, RuntimeException>get(
future -> client.execute(TransportSearchScrollAction.TYPE, new SearchScrollRequest(""), future)
)
);
assertEquals("No handler for action [indices:data/read/scroll]", ex.getMessage());
}
Expand Down Expand Up @@ -167,13 +172,10 @@ public void testEnsureWeReconnect() throws Exception {
connectionManager.disconnectFromNode(remoteNode);
closeFuture.get();

Client client = remoteClusterService.getRemoteClusterClient(
threadPool,
"test",
EsExecutors.DIRECT_EXECUTOR_SERVICE,
true
var client = remoteClusterService.getRemoteClusterClient(threadPool, "test", EsExecutors.DIRECT_EXECUTOR_SERVICE, true);
ClusterStateResponse clusterStateResponse = PlainActionFuture.get(
f -> client.execute(ClusterStateAction.INSTANCE, new ClusterStateRequest(), f)
);
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get();
assertNotNull(clusterStateResponse);
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());
assertTrue(remoteClusterConnection.isNodeConnected(remoteNode));
Expand Down Expand Up @@ -241,15 +243,17 @@ public void testQuicklySkipUnavailableClusters() throws Exception {
service.start();
service.acceptIncomingRequests();
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test", EsExecutors.DIRECT_EXECUTOR_SERVICE);
var client = remoteClusterService.getRemoteClusterClient(threadPool, "test", EsExecutors.DIRECT_EXECUTOR_SERVICE);

try {
assertFalse(remoteClusterService.isRemoteNodeConnected("test", remoteNode));

// check that we quickly fail
expectThrows(
NoSuchRemoteClusterException.class,
() -> client.admin().cluster().prepareState().get(TimeValue.timeValueSeconds(10))
() -> PlainActionFuture.<ClusterStateResponse, RuntimeException>get(
f -> client.execute(ClusterStateAction.INSTANCE, new ClusterStateRequest(), f)
)
);
} finally {
service.clearAllRules();
Expand All @@ -258,7 +262,9 @@ public void testQuicklySkipUnavailableClusters() throws Exception {

assertBusy(() -> {
try {
client.admin().cluster().prepareState().get();
PlainActionFuture.<ClusterStateResponse, RuntimeException>get(
f -> client.execute(ClusterStateAction.INSTANCE, new ClusterStateRequest(), f)
);
} catch (NoSuchRemoteClusterException e) {
// keep retrying on this exception, the goal is to check that we eventually reconnect
throw new AssertionError(e);
Expand Down
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
Expand Down Expand Up @@ -193,7 +194,7 @@ public static void checkRemoteClusterLicenseAndFetchClusterState(
final Consumer<ClusterStateResponse> leaderClusterStateConsumer
) {
try {
Client remoteClient = systemClient(
var remoteClient = systemClient(
client.getRemoteClusterClient(clusterAlias, client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME))
);
checkRemoteClusterLicenseAndFetchClusterState(
Expand Down Expand Up @@ -251,7 +252,7 @@ public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseChe
onFailure
);
// following an index in remote cluster, so use remote client to fetch leader index metadata
remoteClient.admin().cluster().state(request, clusterStateListener);
remoteClient.execute(ClusterStateAction.INSTANCE, request, clusterStateListener);
} else {
onFailure.accept(nonCompliantLicense.apply(licenseCheck));
}
Expand Down Expand Up @@ -321,7 +322,7 @@ public static void fetchLeaderHistoryUUIDs(
IndicesStatsRequest request = new IndicesStatsRequest();
request.clear();
request.indices(leaderIndex);
remoteClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure));
remoteClient.execute(IndicesStatsAction.INSTANCE, request, ActionListener.wrap(indicesStatsHandler, onFailure));
}

/**
Expand All @@ -345,7 +346,7 @@ public void hasPrivilegesToFollowIndices(final Client remoteClient, final String
return;
}

final User user = getUser(remoteClient);
final User user = getUser(remoteClient.threadPool().getThreadContext());
if (user == null) {
handler.accept(new IllegalStateException("missing or unable to read authentication info on request"));
return;
Expand Down Expand Up @@ -385,10 +386,8 @@ public void hasPrivilegesToFollowIndices(final Client remoteClient, final String
remoteClient.execute(HasPrivilegesAction.INSTANCE, request, ActionListener.wrap(responseHandler, handler));
}

User getUser(final Client remoteClient) {
final ThreadContext threadContext = remoteClient.threadPool().getThreadContext();
final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext);
return securityContext.getUser();
User getUser(ThreadContext threadContext) {
return new SecurityContext(Settings.EMPTY, threadContext).getUser();
}

public static Client wrapClient(Client client, Map<String, String> headers, ClusterState clusterState) {
Expand Down
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RequestValidators;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
Expand Down Expand Up @@ -64,7 +65,7 @@ public static void getIndexMetadata(
if (metadataVersion > 0) {
request.waitForMetadataVersion(metadataVersion).waitForTimeout(timeoutSupplier.get());
}
client.admin().cluster().state(request, listener.delegateFailureAndWrap((delegate, response) -> {
client.execute(ClusterStateAction.INSTANCE, request, listener.delegateFailureAndWrap((delegate, response) -> {
if (response.getState() == null) { // timeout on wait_for_metadata_version
assert metadataVersion > 0 : metadataVersion;
if (timeoutSupplier.get().nanos() < 0) {
Expand Down
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
Expand Down Expand Up @@ -244,9 +245,11 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum
}
};
try {
remoteClient(params).admin()
.cluster()
.state(CcrRequests.metadataRequest(leaderIndex.getName()), ActionListener.wrap(onResponse, errorHandler));
remoteClient(params).execute(
ClusterStateAction.INSTANCE,
CcrRequests.metadataRequest(leaderIndex.getName()),
ActionListener.wrap(onResponse, errorHandler)
);
} catch (NoSuchRemoteClusterException e) {
errorHandler.accept(e);
}
Expand Down Expand Up @@ -371,9 +374,11 @@ protected void innerUpdateAliases(final LongConsumer handler, final Consumer<Exc
};

try {
remoteClient(params).admin()
.cluster()
.state(CcrRequests.metadataRequest(leaderIndex.getName()), ActionListener.wrap(onResponse, errorHandler));
remoteClient(params).execute(
ClusterStateAction.INSTANCE,
CcrRequests.metadataRequest(leaderIndex.getName()),
ActionListener.wrap(onResponse, errorHandler)
);
} catch (final NoSuchRemoteClusterException e) {
errorHandler.accept(e);
}
Expand Down
Expand Up @@ -100,7 +100,7 @@ protected void masterOperation(
listener.onFailure(new IllegalArgumentException(message));
return;
}
final Client remoteClient = client.getRemoteClusterClient(request.getRemoteCluster(), remoteClientResponseExecutor);
final var remoteClient = client.getRemoteClusterClient(request.getRemoteCluster(), remoteClientResponseExecutor);
final Map<String, String> filteredHeaders = ClientHelper.getPersistableSafeSecurityHeaders(
threadPool.getThreadContext(),
clusterService.state()
Expand Down

0 comments on commit 4121cf6

Please sign in to comment.