From 6c547bfaba393523dc461cf864d0f8d3743c62d6 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 20 Oct 2025 09:40:03 +0100 Subject: [PATCH] Expose `masterNodeTimeout` to feature state reset impls Resetting a feature state involves several master-node actions. Today these actions cannot access the timeout specified in the original REST request, so they must invent their own timeouts. In practice they all use the (trappy) default timeout of 30s, which may be too short for some situations. This commit adds the plumbing needed to pass the timeout from the REST request down to a place where it can be accessed by the individual implementations, and applies it to the delete-index requests. Relates #107984 --- .../datastreams/SystemDataStreamIT.java | 21 ++++++++++++++++--- .../CrudSystemDataStreamLifecycleIT.java | 20 +++++++++++++++--- .../snapshots/FeatureStateResetApiIT.java | 2 ++ .../TransportResetFeatureStateAction.java | 13 ++++++------ .../elasticsearch/indices/SystemIndices.java | 17 +++++++++++---- .../plugins/SystemIndexPlugin.java | 10 ++++++--- .../core/LocalStateCompositeXPackPlugin.java | 4 +++- .../org/elasticsearch/xpack/fleet/Fleet.java | 21 +++++++++++++++---- .../xpack/ml/MachineLearning.java | 14 ++++++++++--- .../xpack/transform/Transform.java | 3 ++- 10 files changed, 97 insertions(+), 28 deletions(-) diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java index 4b773e79fe354..88ae748eefb42 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin; import org.elasticsearch.indices.ExecutorNames; @@ -366,6 +367,7 @@ public void cleanup() { internalCluster().clusterService(), TestProjectResolvers.DEFAULT_PROJECT_ONLY, internalCluster().client(), + TEST_REQUEST_TIMEOUT, stateStatusPlainActionFuture ); stateStatusPlainActionFuture.actionGet(); @@ -446,6 +448,7 @@ public void cleanUpFeature( ClusterService clusterService, ProjectResolver projectResolver, Client client, + TimeValue masterNodeTimeout, ActionListener listener ) { Collection dataStreamDescriptors = getSystemDataStreamDescriptors(); @@ -466,11 +469,23 @@ public void cleanUpFeature( DeleteDataStreamAction.INSTANCE, request, ActionListener.wrap( - response -> SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener), + response -> SystemIndexPlugin.super.cleanUpFeature( + clusterService, + projectResolver, + client, + masterNodeTimeout, + listener + ), e -> { Throwable unwrapped = ExceptionsHelper.unwrapCause(e); if (unwrapped instanceof ResourceNotFoundException) { - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener); + SystemIndexPlugin.super.cleanUpFeature( + clusterService, + projectResolver, + client, + masterNodeTimeout, + listener + ); } else { listener.onFailure(e); } @@ -480,7 +495,7 @@ public void cleanUpFeature( } catch (Exception e) { Throwable unwrapped = ExceptionsHelper.unwrapCause(e); if (unwrapped instanceof ResourceNotFoundException) { - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener); + SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, listener); } else { listener.onFailure(e); } diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudSystemDataStreamLifecycleIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudSystemDataStreamLifecycleIT.java index 4b2afaafdd495..c3edc02831756 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudSystemDataStreamLifecycleIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudSystemDataStreamLifecycleIT.java @@ -183,6 +183,7 @@ public void cleanup() { internalCluster().clusterService(), TestProjectResolvers.DEFAULT_PROJECT_ONLY, internalCluster().client(), + TEST_REQUEST_TIMEOUT, stateStatusPlainActionFuture ); stateStatusPlainActionFuture.actionGet(); @@ -241,6 +242,7 @@ public void cleanUpFeature( ClusterService clusterService, ProjectResolver projectResolver, Client client, + TimeValue masterNodeTimeout, ActionListener listener ) { Collection dataStreamDescriptors = getSystemDataStreamDescriptors(); @@ -258,11 +260,23 @@ public void cleanUpFeature( DeleteDataStreamAction.INSTANCE, request, ActionListener.wrap( - response -> SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener), + response -> SystemIndexPlugin.super.cleanUpFeature( + clusterService, + projectResolver, + client, + masterNodeTimeout, + listener + ), e -> { Throwable unwrapped = ExceptionsHelper.unwrapCause(e); if (unwrapped instanceof ResourceNotFoundException) { - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener); + SystemIndexPlugin.super.cleanUpFeature( + clusterService, + projectResolver, + client, + masterNodeTimeout, + listener + ); } else { listener.onFailure(e); } @@ -272,7 +286,7 @@ public void cleanUpFeature( } catch (Exception e) { Throwable unwrapped = ExceptionsHelper.unwrapCause(e); if (unwrapped instanceof ResourceNotFoundException) { - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener); + SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, listener); } else { listener.onFailure(e); } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/FeatureStateResetApiIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/FeatureStateResetApiIT.java index 813d9b09213fc..f06829a8eb011 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/FeatureStateResetApiIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/FeatureStateResetApiIT.java @@ -19,6 +19,7 @@ import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.indices.AssociatedIndexDescriptor; import org.elasticsearch.indices.SystemIndexDescriptor; @@ -220,6 +221,7 @@ public void cleanUpFeature( ClusterService clusterService, ProjectResolver projectResolver, Client client, + TimeValue masterNodeTimeout, ActionListener listener ) { if (isEvil()) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/features/TransportResetFeatureStateAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/features/TransportResetFeatureStateAction.java index aa90c3b95cf98..797f3c3452aa1 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/features/TransportResetFeatureStateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/features/TransportResetFeatureStateAction.java @@ -78,12 +78,13 @@ protected void masterOperation( ) ) { for (final var feature : features) { - feature.getCleanUpFunction().apply(clusterService, projectResolver, client, listeners.acquire(e -> { - assert e != null : feature.getName(); - synchronized (responses) { - responses.add(e); - } - })); + feature.getCleanUpFunction() + .apply(clusterService, projectResolver, client, request.masterNodeTimeout(), listeners.acquire(e -> { + assert e != null : feature.getName(); + synchronized (responses) { + responses.add(e); + } + })); } } } diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java index 3a3d141a32ede..4357f280412c6 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java @@ -36,6 +36,7 @@ import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Predicates; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.index.Index; @@ -751,7 +752,7 @@ public static void validateFeatureName(String name, String plugin) { * details about what constitutes a system feature. * *

This class has a static - * {@link #cleanUpFeature(Collection, Collection, String, ClusterService, ProjectResolver, Client, ActionListener)} method + * {@link #cleanUpFeature(Collection, Collection, String, ClusterService, ProjectResolver, Client, TimeValue, ActionListener)} method * that is the default implementation for resetting feature state. */ public static class Feature { @@ -808,13 +809,14 @@ public Feature(String name, String description, Collection cleanUpFeature( + (clusterService, projectResolver, client, masterNodeTimeout, listener) -> cleanUpFeature( indexDescriptors, Collections.emptyList(), name, clusterService, projectResolver, client, + masterNodeTimeout, listener ), Feature::noopPreMigrationFunction, @@ -841,13 +843,14 @@ public Feature( indexDescriptors, dataStreamDescriptors, Collections.emptyList(), - (clusterService, projectResolver, client, listener) -> cleanUpFeature( + (clusterService, projectResolver, client, masterNodeTimeout, listener) -> cleanUpFeature( indexDescriptors, Collections.emptyList(), name, clusterService, projectResolver, client, + masterNodeTimeout, listener ), Feature::noopPreMigrationFunction, @@ -918,10 +921,12 @@ private static void cleanUpFeatureForIndices( String name, Client client, String[] indexNames, + TimeValue masterNodeTimeout, final ActionListener listener ) { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(); deleteIndexRequest.indices(indexNames); + deleteIndexRequest.masterNodeTimeout(masterNodeTimeout); client.execute(TransportDeleteIndexAction.TYPE, deleteIndexRequest, new ActionListener<>() { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { @@ -944,6 +949,7 @@ public void onFailure(Exception e) { * @param clusterService A clusterService, for retrieving cluster metadata * @param projectResolver The project resolver * @param client A client, for issuing delete requests + * @param masterNodeTimeout Timeout for tasks enqueued on the master node * @param listener A listener to return success or failure of cleanup */ public static void cleanUpFeature( @@ -953,6 +959,7 @@ public static void cleanUpFeature( ClusterService clusterService, ProjectResolver projectResolver, Client client, + TimeValue masterNodeTimeout, final ActionListener listener ) { final ProjectMetadata project = projectResolver.getProjectMetadata(clusterService.state()); @@ -985,7 +992,7 @@ public static void cleanUpFeature( .flatMap(descriptor -> descriptor.getMatchingIndices(project).stream()) .toArray(String[]::new); if (associatedIndices.length > 0) { - cleanUpFeatureForIndices(name, client, associatedIndices, listeners.acquire(handleResponse)); + cleanUpFeatureForIndices(name, client, associatedIndices, masterNodeTimeout, listeners.acquire(handleResponse)); } // One descriptor at a time, create an originating client and clean up the feature @@ -1000,6 +1007,7 @@ public static void cleanUpFeature( name, clientWithOrigin, matchingIndices.toArray(Strings.EMPTY_ARRAY), + masterNodeTimeout, listeners.acquire(handleResponse) ); } @@ -1044,6 +1052,7 @@ void apply( ClusterService clusterService, ProjectResolver projectResolver, Client client, + TimeValue masterNodeTimeout, ActionListener listener ); } diff --git a/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java b/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java index ff2fc515ce2e0..7d7cce286f2a5 100644 --- a/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.indices.AssociatedIndexDescriptor; import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.indices.SystemIndexDescriptor; @@ -42,9 +43,9 @@ *

A SystemIndexPlugin may also specify “associated indices,” which hold plugin state in user space. These indices are not managed or * protected, but they are included in snapshots of the feature. * - *

An implementation of SystemIndexPlugin may override {@link #cleanUpFeature(ClusterService, ProjectResolver, Client, ActionListener)} - * in order to provide a “factory reset” of the plugin state. This can be useful for testing. The default method will simply retrieve a list - * of system and associated indices and delete them. + *

An implementation of SystemIndexPlugin may override {@link #cleanUpFeature(ClusterService, ProjectResolver, Client, TimeValue, + * ActionListener)} in order to provide a “factory reset” of the plugin state. This can be useful for testing. The default method will + * simply retrieve a list of system and associated indices and delete them. * *

An implementation may also override {@link #prepareForIndicesMigration(ProjectMetadata, Client, ActionListener)} and * {@link #indicesMigrationComplete(Map, Client, ActionListener)} in order to take special action before and after a @@ -100,12 +101,14 @@ default Collection getAssociatedIndexDescriptors() { * @param clusterService Cluster service to provide cluster state * @param projectResolver Project resolver * @param client A client, for executing actions + * @param masterNodeTimeout Timeout for tasks enqueued on the master node * @param listener Listener for post-cleanup result */ default void cleanUpFeature( ClusterService clusterService, ProjectResolver projectResolver, Client client, + TimeValue masterNodeTimeout, ActionListener listener ) { SystemIndices.Feature.cleanUpFeature( @@ -115,6 +118,7 @@ default void cleanUpFeature( clusterService, projectResolver, client, + masterNodeTimeout, listener ); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index 8f6183374e8ba..49ce2472dd675 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.IOUtils; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.http.HttpPreRequest; @@ -724,6 +725,7 @@ public void cleanUpFeature( ClusterService clusterService, ProjectResolver projectResolver, Client client, + TimeValue masterNodeTimeout, ActionListener finalListener ) { List systemPlugins = filterPlugins(SystemIndexPlugin.class); @@ -741,7 +743,7 @@ public void cleanUpFeature( } }) ); - systemPlugins.forEach(plugin -> plugin.cleanUpFeature(clusterService, projectResolver, client, allListeners)); + systemPlugins.forEach(plugin -> plugin.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, allListeners)); } @Override diff --git a/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/Fleet.java b/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/Fleet.java index 57d7f984694b8..d8e749b479382 100644 --- a/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/Fleet.java +++ b/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/Fleet.java @@ -316,6 +316,7 @@ public void cleanUpFeature( ClusterService clusterService, ProjectResolver projectResolver, Client client, + TimeValue masterNodeTimeout, ActionListener listener ) { Collection dataStreamDescriptors = getSystemDataStreamDescriptors(); @@ -335,11 +336,23 @@ public void cleanUpFeature( DeleteDataStreamAction.INSTANCE, request, ActionListener.wrap( - response -> SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener), + response -> SystemIndexPlugin.super.cleanUpFeature( + clusterService, + projectResolver, + client, + masterNodeTimeout, + listener + ), e -> { Throwable unwrapped = ExceptionsHelper.unwrapCause(e); if (unwrapped instanceof ResourceNotFoundException) { - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener); + SystemIndexPlugin.super.cleanUpFeature( + clusterService, + projectResolver, + client, + masterNodeTimeout, + listener + ); } else { listener.onFailure(e); } @@ -349,13 +362,13 @@ public void cleanUpFeature( } catch (Exception e) { Throwable unwrapped = ExceptionsHelper.unwrapCause(e); if (unwrapped instanceof ResourceNotFoundException) { - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener); + SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, listener); } else { listener.onFailure(e); } } } else { - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener); + SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, listener); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 704b0d7634db4..72c3b4ba3be01 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -2136,12 +2136,13 @@ public void cleanUpFeature( ClusterService clusterService, ProjectResolver projectResolver, Client unwrappedClient, + TimeValue masterNodeTimeout, ActionListener finalListener ) { if (this.enabled == false) { // if ML is disabled, the custom cleanup can fail, but we can still clean up indices // by calling the superclass cleanup method - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, unwrappedClient, finalListener); + SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, unwrappedClient, masterNodeTimeout, finalListener); return; } logger.info("Starting machine learning feature reset"); @@ -2204,6 +2205,7 @@ public void cleanUpFeature( clusterService, projectResolver, client, + masterNodeTimeout, delegate ), clearFailed -> { @@ -2211,14 +2213,20 @@ public void cleanUpFeature( "failed to clear memory tracker cache via machine learning reset feature API", clearFailed ); - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, delegate); + SystemIndexPlugin.super.cleanUpFeature( + clusterService, + projectResolver, + client, + masterNodeTimeout, + delegate + ); } ) ); return; } // Call into the original listener to clean up the indices and then clear ml memory cache - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, delegate); + SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, delegate); } else { final List failedComponents = results.entrySet() .stream() diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 6085d6bbe38af..3738d037bcc5c 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -439,6 +439,7 @@ public void cleanUpFeature( ClusterService clusterService, ProjectResolver projectResolver, Client unwrappedClient, + TimeValue masterNodeTimeout, ActionListener finalListener ) { OriginSettingClient client = new OriginSettingClient(unwrappedClient, TRANSFORM_ORIGIN); @@ -479,7 +480,7 @@ public void cleanUpFeature( ActionListener afterWaitingForTasks = ActionListener.wrap(listTasksResponse -> { listTasksResponse.rethrowFailures("Waiting for transform indexing tasks"); - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, unsetResetModeListener); + SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, unsetResetModeListener); }, unsetResetModeListener::onFailure); ActionListener afterForceStoppingTransforms = ActionListener.wrap(stopTransformsResponse -> {