diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java index 4b773e79fe354..88ae748eefb42 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin; import org.elasticsearch.indices.ExecutorNames; @@ -366,6 +367,7 @@ public void cleanup() { internalCluster().clusterService(), TestProjectResolvers.DEFAULT_PROJECT_ONLY, internalCluster().client(), + TEST_REQUEST_TIMEOUT, stateStatusPlainActionFuture ); stateStatusPlainActionFuture.actionGet(); @@ -446,6 +448,7 @@ public void cleanUpFeature( ClusterService clusterService, ProjectResolver projectResolver, Client client, + TimeValue masterNodeTimeout, ActionListener listener ) { Collection dataStreamDescriptors = getSystemDataStreamDescriptors(); @@ -466,11 +469,23 @@ public void cleanUpFeature( DeleteDataStreamAction.INSTANCE, request, ActionListener.wrap( - response -> SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener), + response -> SystemIndexPlugin.super.cleanUpFeature( + clusterService, + projectResolver, + client, + masterNodeTimeout, + listener + ), e -> { Throwable unwrapped = ExceptionsHelper.unwrapCause(e); if (unwrapped instanceof ResourceNotFoundException) { - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener); + SystemIndexPlugin.super.cleanUpFeature( + clusterService, + projectResolver, + client, + masterNodeTimeout, + listener + ); } else { listener.onFailure(e); } @@ -480,7 +495,7 @@ public void cleanUpFeature( } catch (Exception e) { Throwable unwrapped = ExceptionsHelper.unwrapCause(e); if (unwrapped instanceof ResourceNotFoundException) { - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener); + SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, listener); } else { listener.onFailure(e); } diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudSystemDataStreamLifecycleIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudSystemDataStreamLifecycleIT.java index 4b2afaafdd495..c3edc02831756 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudSystemDataStreamLifecycleIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudSystemDataStreamLifecycleIT.java @@ -183,6 +183,7 @@ public void cleanup() { internalCluster().clusterService(), TestProjectResolvers.DEFAULT_PROJECT_ONLY, internalCluster().client(), + TEST_REQUEST_TIMEOUT, stateStatusPlainActionFuture ); stateStatusPlainActionFuture.actionGet(); @@ -241,6 +242,7 @@ public void cleanUpFeature( ClusterService clusterService, ProjectResolver projectResolver, Client client, + TimeValue masterNodeTimeout, ActionListener listener ) { Collection dataStreamDescriptors = getSystemDataStreamDescriptors(); @@ -258,11 +260,23 @@ public void cleanUpFeature( DeleteDataStreamAction.INSTANCE, request, ActionListener.wrap( - response -> SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener), + response -> SystemIndexPlugin.super.cleanUpFeature( + clusterService, + projectResolver, + client, + masterNodeTimeout, + listener + ), e -> { Throwable unwrapped = ExceptionsHelper.unwrapCause(e); if (unwrapped instanceof ResourceNotFoundException) { - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener); + SystemIndexPlugin.super.cleanUpFeature( + clusterService, + projectResolver, + client, + masterNodeTimeout, + listener + ); } else { listener.onFailure(e); } @@ -272,7 +286,7 @@ public void cleanUpFeature( } catch (Exception e) { Throwable unwrapped = ExceptionsHelper.unwrapCause(e); if (unwrapped instanceof ResourceNotFoundException) { - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener); + SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, listener); } else { listener.onFailure(e); } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/FeatureStateResetApiIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/FeatureStateResetApiIT.java index 813d9b09213fc..f06829a8eb011 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/FeatureStateResetApiIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/FeatureStateResetApiIT.java @@ -19,6 +19,7 @@ import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.indices.AssociatedIndexDescriptor; import org.elasticsearch.indices.SystemIndexDescriptor; @@ -220,6 +221,7 @@ public void cleanUpFeature( ClusterService clusterService, ProjectResolver projectResolver, Client client, + TimeValue masterNodeTimeout, ActionListener listener ) { if (isEvil()) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/features/TransportResetFeatureStateAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/features/TransportResetFeatureStateAction.java index aa90c3b95cf98..797f3c3452aa1 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/features/TransportResetFeatureStateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/features/TransportResetFeatureStateAction.java @@ -78,12 +78,13 @@ protected void masterOperation( ) ) { for (final var feature : features) { - feature.getCleanUpFunction().apply(clusterService, projectResolver, client, listeners.acquire(e -> { - assert e != null : feature.getName(); - synchronized (responses) { - responses.add(e); - } - })); + feature.getCleanUpFunction() + .apply(clusterService, projectResolver, client, request.masterNodeTimeout(), listeners.acquire(e -> { + assert e != null : feature.getName(); + synchronized (responses) { + responses.add(e); + } + })); } } } diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java index 3a3d141a32ede..4357f280412c6 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java @@ -36,6 +36,7 @@ import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Predicates; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.index.Index; @@ -751,7 +752,7 @@ public static void validateFeatureName(String name, String plugin) { * details about what constitutes a system feature. * *

This class has a static - * {@link #cleanUpFeature(Collection, Collection, String, ClusterService, ProjectResolver, Client, ActionListener)} method + * {@link #cleanUpFeature(Collection, Collection, String, ClusterService, ProjectResolver, Client, TimeValue, ActionListener)} method * that is the default implementation for resetting feature state. */ public static class Feature { @@ -808,13 +809,14 @@ public Feature(String name, String description, Collection cleanUpFeature( + (clusterService, projectResolver, client, masterNodeTimeout, listener) -> cleanUpFeature( indexDescriptors, Collections.emptyList(), name, clusterService, projectResolver, client, + masterNodeTimeout, listener ), Feature::noopPreMigrationFunction, @@ -841,13 +843,14 @@ public Feature( indexDescriptors, dataStreamDescriptors, Collections.emptyList(), - (clusterService, projectResolver, client, listener) -> cleanUpFeature( + (clusterService, projectResolver, client, masterNodeTimeout, listener) -> cleanUpFeature( indexDescriptors, Collections.emptyList(), name, clusterService, projectResolver, client, + masterNodeTimeout, listener ), Feature::noopPreMigrationFunction, @@ -918,10 +921,12 @@ private static void cleanUpFeatureForIndices( String name, Client client, String[] indexNames, + TimeValue masterNodeTimeout, final ActionListener listener ) { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(); deleteIndexRequest.indices(indexNames); + deleteIndexRequest.masterNodeTimeout(masterNodeTimeout); client.execute(TransportDeleteIndexAction.TYPE, deleteIndexRequest, new ActionListener<>() { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { @@ -944,6 +949,7 @@ public void onFailure(Exception e) { * @param clusterService A clusterService, for retrieving cluster metadata * @param projectResolver The project resolver * @param client A client, for issuing delete requests + * @param masterNodeTimeout Timeout for tasks enqueued on the master node * @param listener A listener to return success or failure of cleanup */ public static void cleanUpFeature( @@ -953,6 +959,7 @@ public static void cleanUpFeature( ClusterService clusterService, ProjectResolver projectResolver, Client client, + TimeValue masterNodeTimeout, final ActionListener listener ) { final ProjectMetadata project = projectResolver.getProjectMetadata(clusterService.state()); @@ -985,7 +992,7 @@ public static void cleanUpFeature( .flatMap(descriptor -> descriptor.getMatchingIndices(project).stream()) .toArray(String[]::new); if (associatedIndices.length > 0) { - cleanUpFeatureForIndices(name, client, associatedIndices, listeners.acquire(handleResponse)); + cleanUpFeatureForIndices(name, client, associatedIndices, masterNodeTimeout, listeners.acquire(handleResponse)); } // One descriptor at a time, create an originating client and clean up the feature @@ -1000,6 +1007,7 @@ public static void cleanUpFeature( name, clientWithOrigin, matchingIndices.toArray(Strings.EMPTY_ARRAY), + masterNodeTimeout, listeners.acquire(handleResponse) ); } @@ -1044,6 +1052,7 @@ void apply( ClusterService clusterService, ProjectResolver projectResolver, Client client, + TimeValue masterNodeTimeout, ActionListener listener ); } diff --git a/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java b/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java index ff2fc515ce2e0..7d7cce286f2a5 100644 --- a/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.indices.AssociatedIndexDescriptor; import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.indices.SystemIndexDescriptor; @@ -42,9 +43,9 @@ *

A SystemIndexPlugin may also specify “associated indices,” which hold plugin state in user space. These indices are not managed or * protected, but they are included in snapshots of the feature. * - *

An implementation of SystemIndexPlugin may override {@link #cleanUpFeature(ClusterService, ProjectResolver, Client, ActionListener)} - * in order to provide a “factory reset” of the plugin state. This can be useful for testing. The default method will simply retrieve a list - * of system and associated indices and delete them. + *

An implementation of SystemIndexPlugin may override {@link #cleanUpFeature(ClusterService, ProjectResolver, Client, TimeValue, + * ActionListener)} in order to provide a “factory reset” of the plugin state. This can be useful for testing. The default method will + * simply retrieve a list of system and associated indices and delete them. * *

An implementation may also override {@link #prepareForIndicesMigration(ProjectMetadata, Client, ActionListener)} and * {@link #indicesMigrationComplete(Map, Client, ActionListener)} in order to take special action before and after a @@ -100,12 +101,14 @@ default Collection getAssociatedIndexDescriptors() { * @param clusterService Cluster service to provide cluster state * @param projectResolver Project resolver * @param client A client, for executing actions + * @param masterNodeTimeout Timeout for tasks enqueued on the master node * @param listener Listener for post-cleanup result */ default void cleanUpFeature( ClusterService clusterService, ProjectResolver projectResolver, Client client, + TimeValue masterNodeTimeout, ActionListener listener ) { SystemIndices.Feature.cleanUpFeature( @@ -115,6 +118,7 @@ default void cleanUpFeature( clusterService, projectResolver, client, + masterNodeTimeout, listener ); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index 8f6183374e8ba..49ce2472dd675 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.IOUtils; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.http.HttpPreRequest; @@ -724,6 +725,7 @@ public void cleanUpFeature( ClusterService clusterService, ProjectResolver projectResolver, Client client, + TimeValue masterNodeTimeout, ActionListener finalListener ) { List systemPlugins = filterPlugins(SystemIndexPlugin.class); @@ -741,7 +743,7 @@ public void cleanUpFeature( } }) ); - systemPlugins.forEach(plugin -> plugin.cleanUpFeature(clusterService, projectResolver, client, allListeners)); + systemPlugins.forEach(plugin -> plugin.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, allListeners)); } @Override diff --git a/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/Fleet.java b/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/Fleet.java index 57d7f984694b8..d8e749b479382 100644 --- a/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/Fleet.java +++ b/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/Fleet.java @@ -316,6 +316,7 @@ public void cleanUpFeature( ClusterService clusterService, ProjectResolver projectResolver, Client client, + TimeValue masterNodeTimeout, ActionListener listener ) { Collection dataStreamDescriptors = getSystemDataStreamDescriptors(); @@ -335,11 +336,23 @@ public void cleanUpFeature( DeleteDataStreamAction.INSTANCE, request, ActionListener.wrap( - response -> SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener), + response -> SystemIndexPlugin.super.cleanUpFeature( + clusterService, + projectResolver, + client, + masterNodeTimeout, + listener + ), e -> { Throwable unwrapped = ExceptionsHelper.unwrapCause(e); if (unwrapped instanceof ResourceNotFoundException) { - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener); + SystemIndexPlugin.super.cleanUpFeature( + clusterService, + projectResolver, + client, + masterNodeTimeout, + listener + ); } else { listener.onFailure(e); } @@ -349,13 +362,13 @@ public void cleanUpFeature( } catch (Exception e) { Throwable unwrapped = ExceptionsHelper.unwrapCause(e); if (unwrapped instanceof ResourceNotFoundException) { - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener); + SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, listener); } else { listener.onFailure(e); } } } else { - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener); + SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, listener); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 704b0d7634db4..72c3b4ba3be01 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -2136,12 +2136,13 @@ public void cleanUpFeature( ClusterService clusterService, ProjectResolver projectResolver, Client unwrappedClient, + TimeValue masterNodeTimeout, ActionListener finalListener ) { if (this.enabled == false) { // if ML is disabled, the custom cleanup can fail, but we can still clean up indices // by calling the superclass cleanup method - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, unwrappedClient, finalListener); + SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, unwrappedClient, masterNodeTimeout, finalListener); return; } logger.info("Starting machine learning feature reset"); @@ -2204,6 +2205,7 @@ public void cleanUpFeature( clusterService, projectResolver, client, + masterNodeTimeout, delegate ), clearFailed -> { @@ -2211,14 +2213,20 @@ public void cleanUpFeature( "failed to clear memory tracker cache via machine learning reset feature API", clearFailed ); - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, delegate); + SystemIndexPlugin.super.cleanUpFeature( + clusterService, + projectResolver, + client, + masterNodeTimeout, + delegate + ); } ) ); return; } // Call into the original listener to clean up the indices and then clear ml memory cache - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, delegate); + SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, delegate); } else { final List failedComponents = results.entrySet() .stream() diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 6085d6bbe38af..3738d037bcc5c 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -439,6 +439,7 @@ public void cleanUpFeature( ClusterService clusterService, ProjectResolver projectResolver, Client unwrappedClient, + TimeValue masterNodeTimeout, ActionListener finalListener ) { OriginSettingClient client = new OriginSettingClient(unwrappedClient, TRANSFORM_ORIGIN); @@ -479,7 +480,7 @@ public void cleanUpFeature( ActionListener afterWaitingForTasks = ActionListener.wrap(listTasksResponse -> { listTasksResponse.rethrowFailures("Waiting for transform indexing tasks"); - SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, unsetResetModeListener); + SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, unsetResetModeListener); }, unsetResetModeListener::onFailure); ActionListener afterForceStoppingTransforms = ActionListener.wrap(stopTransformsResponse -> {