Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce usage of Client utils with remote clusters #104552

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand Down Expand Up @@ -528,9 +527,9 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
} else {
// forward to remote cluster
String clusterAlias = request.getContextSetup().getClusterAlias();
Client remoteClusterClient = transportService.getRemoteClusterService()
.getRemoteClusterClient(threadPool, clusterAlias, EsExecutors.DIRECT_EXECUTOR_SERVICE);
remoteClusterClient.admin().cluster().execute(PainlessExecuteAction.INSTANCE, request, listener);
transportService.getRemoteClusterService()
.getRemoteClusterClient(threadPool, clusterAlias, EsExecutors.DIRECT_EXECUTOR_SERVICE)
.execute(PainlessExecuteAction.INSTANCE, request, listener);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
Expand Down Expand Up @@ -493,13 +492,13 @@ protected void doExecute(Task task, Request request, final ActionListener<Respon
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(
var remoteClusterClient = remoteClusterService.getRemoteClusterClient(
threadPool,
clusterAlias,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
Request remoteRequest = new Request(originalIndices.indices(), originalIndices.indicesOptions());
remoteClusterClient.admin().indices().resolveIndex(remoteRequest, ActionListener.wrap(response -> {
remoteClusterClient.execute(ResolveIndexAction.INSTANCE, remoteRequest, ActionListener.wrap(response -> {
remoteResponses.put(clusterAlias, response);
terminalHandler.run();
}, failure -> terminalHandler.run()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -213,7 +212,7 @@ private void doExecuteForked(Task task, FieldCapabilitiesRequest request, final
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
Client remoteClusterClient = transportService.getRemoteClusterService()
var remoteClusterClient = transportService.getRemoteClusterService()
.getRemoteClusterClient(threadPool, clusterAlias, searchCoordinationExecutor);
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(request, originalIndices, nowInMillis);
ActionListener<FieldCapabilitiesResponse> remoteListener = ActionListener.wrap(response -> {
Expand All @@ -234,7 +233,11 @@ private void doExecuteForked(Task task, FieldCapabilitiesRequest request, final
handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex);
}
});
remoteClusterClient.fieldCaps(remoteRequest, ActionListener.releaseAfter(remoteListener, refs.acquire()));
remoteClusterClient.execute(
TransportFieldCapabilitiesAction.TYPE,
remoteRequest,
ActionListener.releaseAfter(remoteListener, refs.acquire())
);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -526,12 +525,8 @@ static void ccsRemoteReduce(
timeProvider.absoluteStartMillis(),
true
);
Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(
threadPool,
clusterAlias,
remoteClientResponseExecutor
);
remoteClusterClient.search(ccsSearchRequest, new ActionListener<>() {
var remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias, remoteClientResponseExecutor);
remoteClusterClient.execute(TransportSearchAction.TYPE, ccsSearchRequest, new ActionListener<>() {
@Override
public void onResponse(SearchResponse searchResponse) {
// overwrite the existing cluster entry with the updated one
Expand Down Expand Up @@ -609,12 +604,12 @@ public void onFailure(Exception e) {
task.getProgressListener(),
listener
);
Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(
final var remoteClusterClient = remoteClusterService.getRemoteClusterClient(
threadPool,
clusterAlias,
remoteClientResponseExecutor
);
remoteClusterClient.search(ccsSearchRequest, ccsListener);
remoteClusterClient.execute(TransportSearchAction.TYPE, ccsSearchRequest, ccsListener);
}
if (localIndices != null) {
ActionListener<SearchResponse> ccsListener = createCCSListener(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@

import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
Expand Down Expand Up @@ -90,22 +94,21 @@ public void testConnectAndExecuteRequest() throws Exception {
logger.info("now accepting incoming requests on local transport");
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode));
Client client = remoteClusterService.getRemoteClusterClient(
var client = remoteClusterService.getRemoteClusterClient(
threadPool,
"test",
threadPool.executor(TEST_THREAD_POOL_NAME),
randomBoolean()
);
ClusterStateResponse clusterStateResponse = PlainActionFuture.get(
future -> client.admin()
.cluster()
.prepareState()
.execute(
ActionListener.runBefore(
future,
() -> assertTrue(Thread.currentThread().getName().contains('[' + TEST_THREAD_POOL_NAME + ']'))
)
),
future -> client.execute(
ClusterStateAction.INSTANCE,
new ClusterStateRequest(),
ActionListener.runBefore(
future,
() -> assertTrue(Thread.currentThread().getName().contains('[' + TEST_THREAD_POOL_NAME + ']'))
)
),
10,
TimeUnit.SECONDS
);
Expand All @@ -114,7 +117,9 @@ public void testConnectAndExecuteRequest() throws Exception {
// also test a failure, there is no handler for scroll registered
ActionNotFoundTransportException ex = expectThrows(
ActionNotFoundTransportException.class,
() -> client.prepareSearchScroll("").get()
() -> PlainActionFuture.<SearchResponse, RuntimeException>get(
future -> client.execute(TransportSearchScrollAction.TYPE, new SearchScrollRequest(""), future)
)
);
assertEquals("No handler for action [indices:data/read/scroll]", ex.getMessage());
}
Expand Down Expand Up @@ -167,13 +172,10 @@ public void testEnsureWeReconnect() throws Exception {
connectionManager.disconnectFromNode(remoteNode);
closeFuture.get();

Client client = remoteClusterService.getRemoteClusterClient(
threadPool,
"test",
EsExecutors.DIRECT_EXECUTOR_SERVICE,
true
var client = remoteClusterService.getRemoteClusterClient(threadPool, "test", EsExecutors.DIRECT_EXECUTOR_SERVICE, true);
ClusterStateResponse clusterStateResponse = PlainActionFuture.get(
f -> client.execute(ClusterStateAction.INSTANCE, new ClusterStateRequest(), f)
);
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get();
assertNotNull(clusterStateResponse);
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());
assertTrue(remoteClusterConnection.isNodeConnected(remoteNode));
Expand Down Expand Up @@ -241,15 +243,17 @@ public void testQuicklySkipUnavailableClusters() throws Exception {
service.start();
service.acceptIncomingRequests();
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test", EsExecutors.DIRECT_EXECUTOR_SERVICE);
var client = remoteClusterService.getRemoteClusterClient(threadPool, "test", EsExecutors.DIRECT_EXECUTOR_SERVICE);

try {
assertFalse(remoteClusterService.isRemoteNodeConnected("test", remoteNode));

// check that we quickly fail
expectThrows(
NoSuchRemoteClusterException.class,
() -> client.admin().cluster().prepareState().get(TimeValue.timeValueSeconds(10))
() -> PlainActionFuture.<ClusterStateResponse, RuntimeException>get(
f -> client.execute(ClusterStateAction.INSTANCE, new ClusterStateRequest(), f)
)
);
} finally {
service.clearAllRules();
Expand All @@ -258,7 +262,9 @@ public void testQuicklySkipUnavailableClusters() throws Exception {

assertBusy(() -> {
try {
client.admin().cluster().prepareState().get();
PlainActionFuture.<ClusterStateResponse, RuntimeException>get(
f -> client.execute(ClusterStateAction.INSTANCE, new ClusterStateRequest(), f)
);
} catch (NoSuchRemoteClusterException e) {
// keep retrying on this exception, the goal is to check that we eventually reconnect
throw new AssertionError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
Expand Down Expand Up @@ -193,7 +194,7 @@ public static void checkRemoteClusterLicenseAndFetchClusterState(
final Consumer<ClusterStateResponse> leaderClusterStateConsumer
) {
try {
Client remoteClient = systemClient(
var remoteClient = systemClient(
client.getRemoteClusterClient(clusterAlias, client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME))
);
checkRemoteClusterLicenseAndFetchClusterState(
Expand Down Expand Up @@ -251,7 +252,7 @@ public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseChe
onFailure
);
// following an index in remote cluster, so use remote client to fetch leader index metadata
remoteClient.admin().cluster().state(request, clusterStateListener);
remoteClient.execute(ClusterStateAction.INSTANCE, request, clusterStateListener);
} else {
onFailure.accept(nonCompliantLicense.apply(licenseCheck));
}
Expand Down Expand Up @@ -321,7 +322,7 @@ public static void fetchLeaderHistoryUUIDs(
IndicesStatsRequest request = new IndicesStatsRequest();
request.clear();
request.indices(leaderIndex);
remoteClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure));
remoteClient.execute(IndicesStatsAction.INSTANCE, request, ActionListener.wrap(indicesStatsHandler, onFailure));
}

/**
Expand All @@ -345,7 +346,7 @@ public void hasPrivilegesToFollowIndices(final Client remoteClient, final String
return;
}

final User user = getUser(remoteClient);
final User user = getUser(remoteClient.threadPool().getThreadContext());
if (user == null) {
handler.accept(new IllegalStateException("missing or unable to read authentication info on request"));
return;
Expand Down Expand Up @@ -385,10 +386,8 @@ public void hasPrivilegesToFollowIndices(final Client remoteClient, final String
remoteClient.execute(HasPrivilegesAction.INSTANCE, request, ActionListener.wrap(responseHandler, handler));
}

User getUser(final Client remoteClient) {
final ThreadContext threadContext = remoteClient.threadPool().getThreadContext();
final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext);
return securityContext.getUser();
User getUser(ThreadContext threadContext) {
return new SecurityContext(Settings.EMPTY, threadContext).getUser();
}

public static Client wrapClient(Client client, Map<String, String> headers, ClusterState clusterState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RequestValidators;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
Expand Down Expand Up @@ -64,7 +65,7 @@ public static void getIndexMetadata(
if (metadataVersion > 0) {
request.waitForMetadataVersion(metadataVersion).waitForTimeout(timeoutSupplier.get());
}
client.admin().cluster().state(request, listener.delegateFailureAndWrap((delegate, response) -> {
client.execute(ClusterStateAction.INSTANCE, request, listener.delegateFailureAndWrap((delegate, response) -> {
if (response.getState() == null) { // timeout on wait_for_metadata_version
assert metadataVersion > 0 : metadataVersion;
if (timeoutSupplier.get().nanos() < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
Expand Down Expand Up @@ -244,9 +245,11 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum
}
};
try {
remoteClient(params).admin()
.cluster()
.state(CcrRequests.metadataRequest(leaderIndex.getName()), ActionListener.wrap(onResponse, errorHandler));
remoteClient(params).execute(
ClusterStateAction.INSTANCE,
CcrRequests.metadataRequest(leaderIndex.getName()),
ActionListener.wrap(onResponse, errorHandler)
);
} catch (NoSuchRemoteClusterException e) {
errorHandler.accept(e);
}
Expand Down Expand Up @@ -371,9 +374,11 @@ protected void innerUpdateAliases(final LongConsumer handler, final Consumer<Exc
};

try {
remoteClient(params).admin()
.cluster()
.state(CcrRequests.metadataRequest(leaderIndex.getName()), ActionListener.wrap(onResponse, errorHandler));
remoteClient(params).execute(
ClusterStateAction.INSTANCE,
CcrRequests.metadataRequest(leaderIndex.getName()),
ActionListener.wrap(onResponse, errorHandler)
);
} catch (final NoSuchRemoteClusterException e) {
errorHandler.accept(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ protected void masterOperation(
listener.onFailure(new IllegalArgumentException(message));
return;
}
final Client remoteClient = client.getRemoteClusterClient(request.getRemoteCluster(), remoteClientResponseExecutor);
final var remoteClient = client.getRemoteClusterClient(request.getRemoteCluster(), remoteClientResponseExecutor);
final Map<String, String> filteredHeaders = ClientHelper.getPersistableSafeSecurityHeaders(
threadPool.getThreadContext(),
clusterService.state()
Expand Down