Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
import org.elasticsearch.indices.ExecutorNames;
Expand Down Expand Up @@ -366,6 +367,7 @@ public void cleanup() {
internalCluster().clusterService(),
TestProjectResolvers.DEFAULT_PROJECT_ONLY,
internalCluster().client(),
TEST_REQUEST_TIMEOUT,
stateStatusPlainActionFuture
);
stateStatusPlainActionFuture.actionGet();
Expand Down Expand Up @@ -446,6 +448,7 @@ public void cleanUpFeature(
ClusterService clusterService,
ProjectResolver projectResolver,
Client client,
TimeValue masterNodeTimeout,
ActionListener<ResetFeatureStateStatus> listener
) {
Collection<SystemDataStreamDescriptor> dataStreamDescriptors = getSystemDataStreamDescriptors();
Expand All @@ -466,11 +469,23 @@ public void cleanUpFeature(
DeleteDataStreamAction.INSTANCE,
request,
ActionListener.wrap(
response -> SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener),
response -> SystemIndexPlugin.super.cleanUpFeature(
clusterService,
projectResolver,
client,
masterNodeTimeout,
listener
),
e -> {
Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
if (unwrapped instanceof ResourceNotFoundException) {
SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener);
SystemIndexPlugin.super.cleanUpFeature(
clusterService,
projectResolver,
client,
masterNodeTimeout,
listener
);
} else {
listener.onFailure(e);
}
Expand All @@ -480,7 +495,7 @@ public void cleanUpFeature(
} catch (Exception e) {
Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
if (unwrapped instanceof ResourceNotFoundException) {
SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener);
SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, listener);
} else {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public void cleanup() {
internalCluster().clusterService(),
TestProjectResolvers.DEFAULT_PROJECT_ONLY,
internalCluster().client(),
TEST_REQUEST_TIMEOUT,
stateStatusPlainActionFuture
);
stateStatusPlainActionFuture.actionGet();
Expand Down Expand Up @@ -241,6 +242,7 @@ public void cleanUpFeature(
ClusterService clusterService,
ProjectResolver projectResolver,
Client client,
TimeValue masterNodeTimeout,
ActionListener<ResetFeatureStateStatus> listener
) {
Collection<SystemDataStreamDescriptor> dataStreamDescriptors = getSystemDataStreamDescriptors();
Expand All @@ -258,11 +260,23 @@ public void cleanUpFeature(
DeleteDataStreamAction.INSTANCE,
request,
ActionListener.wrap(
response -> SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener),
response -> SystemIndexPlugin.super.cleanUpFeature(
clusterService,
projectResolver,
client,
masterNodeTimeout,
listener
),
e -> {
Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
if (unwrapped instanceof ResourceNotFoundException) {
SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener);
SystemIndexPlugin.super.cleanUpFeature(
clusterService,
projectResolver,
client,
masterNodeTimeout,
listener
);
} else {
listener.onFailure(e);
}
Expand All @@ -272,7 +286,7 @@ public void cleanUpFeature(
} catch (Exception e) {
Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
if (unwrapped instanceof ResourceNotFoundException) {
SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener);
SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, listener);
} else {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.AssociatedIndexDescriptor;
import org.elasticsearch.indices.SystemIndexDescriptor;
Expand Down Expand Up @@ -220,6 +221,7 @@ public void cleanUpFeature(
ClusterService clusterService,
ProjectResolver projectResolver,
Client client,
TimeValue masterNodeTimeout,
ActionListener<ResetFeatureStateResponse.ResetFeatureStateStatus> listener
) {
if (isEvil()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,13 @@ protected void masterOperation(
)
) {
for (final var feature : features) {
feature.getCleanUpFunction().apply(clusterService, projectResolver, client, listeners.acquire(e -> {
assert e != null : feature.getName();
synchronized (responses) {
responses.add(e);
}
}));
feature.getCleanUpFunction()
.apply(clusterService, projectResolver, client, request.masterNodeTimeout(), listeners.acquire(e -> {
assert e != null : feature.getName();
synchronized (responses) {
responses.add(e);
}
}));
}
}
}
Expand Down
17 changes: 13 additions & 4 deletions server/src/main/java/org/elasticsearch/indices/SystemIndices.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Predicates;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -751,7 +752,7 @@ public static void validateFeatureName(String name, String plugin) {
* details about what constitutes a system feature.
*
* <p>This class has a static
* {@link #cleanUpFeature(Collection, Collection, String, ClusterService, ProjectResolver, Client, ActionListener)} method
* {@link #cleanUpFeature(Collection, Collection, String, ClusterService, ProjectResolver, Client, TimeValue, ActionListener)} method
* that is the default implementation for resetting feature state.
*/
public static class Feature {
Expand Down Expand Up @@ -808,13 +809,14 @@ public Feature(String name, String description, Collection<SystemIndexDescriptor
indexDescriptors,
Collections.emptyList(),
Collections.emptyList(),
(clusterService, projectResolver, client, listener) -> cleanUpFeature(
(clusterService, projectResolver, client, masterNodeTimeout, listener) -> cleanUpFeature(
indexDescriptors,
Collections.emptyList(),
name,
clusterService,
projectResolver,
client,
masterNodeTimeout,
listener
),
Feature::noopPreMigrationFunction,
Expand All @@ -841,13 +843,14 @@ public Feature(
indexDescriptors,
dataStreamDescriptors,
Collections.emptyList(),
(clusterService, projectResolver, client, listener) -> cleanUpFeature(
(clusterService, projectResolver, client, masterNodeTimeout, listener) -> cleanUpFeature(
indexDescriptors,
Collections.emptyList(),
name,
clusterService,
projectResolver,
client,
masterNodeTimeout,
listener
),
Feature::noopPreMigrationFunction,
Expand Down Expand Up @@ -918,10 +921,12 @@ private static void cleanUpFeatureForIndices(
String name,
Client client,
String[] indexNames,
TimeValue masterNodeTimeout,
final ActionListener<ResetFeatureStateStatus> listener
) {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest();
deleteIndexRequest.indices(indexNames);
deleteIndexRequest.masterNodeTimeout(masterNodeTimeout);
client.execute(TransportDeleteIndexAction.TYPE, deleteIndexRequest, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
Expand All @@ -944,6 +949,7 @@ public void onFailure(Exception e) {
* @param clusterService A clusterService, for retrieving cluster metadata
* @param projectResolver The project resolver
* @param client A client, for issuing delete requests
* @param masterNodeTimeout Timeout for tasks enqueued on the master node
* @param listener A listener to return success or failure of cleanup
*/
public static void cleanUpFeature(
Expand All @@ -953,6 +959,7 @@ public static void cleanUpFeature(
ClusterService clusterService,
ProjectResolver projectResolver,
Client client,
TimeValue masterNodeTimeout,
final ActionListener<ResetFeatureStateStatus> listener
) {
final ProjectMetadata project = projectResolver.getProjectMetadata(clusterService.state());
Expand Down Expand Up @@ -985,7 +992,7 @@ public static void cleanUpFeature(
.flatMap(descriptor -> descriptor.getMatchingIndices(project).stream())
.toArray(String[]::new);
if (associatedIndices.length > 0) {
cleanUpFeatureForIndices(name, client, associatedIndices, listeners.acquire(handleResponse));
cleanUpFeatureForIndices(name, client, associatedIndices, masterNodeTimeout, listeners.acquire(handleResponse));
}

// One descriptor at a time, create an originating client and clean up the feature
Expand All @@ -1000,6 +1007,7 @@ public static void cleanUpFeature(
name,
clientWithOrigin,
matchingIndices.toArray(Strings.EMPTY_ARRAY),
masterNodeTimeout,
listeners.acquire(handleResponse)
);
}
Expand Down Expand Up @@ -1044,6 +1052,7 @@ void apply(
ClusterService clusterService,
ProjectResolver projectResolver,
Client client,
TimeValue masterNodeTimeout,
ActionListener<ResetFeatureStateStatus> listener
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.AssociatedIndexDescriptor;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.indices.SystemIndexDescriptor;
Expand All @@ -42,9 +43,9 @@
* <p>A SystemIndexPlugin may also specify “associated indices,” which hold plugin state in user space. These indices are not managed or
* protected, but they are included in snapshots of the feature.
*
* <p>An implementation of SystemIndexPlugin may override {@link #cleanUpFeature(ClusterService, ProjectResolver, Client, ActionListener)}
* in order to provide a “factory reset” of the plugin state. This can be useful for testing. The default method will simply retrieve a list
* of system and associated indices and delete them.
* <p>An implementation of SystemIndexPlugin may override {@link #cleanUpFeature(ClusterService, ProjectResolver, Client, TimeValue,
* ActionListener)} in order to provide a “factory reset” of the plugin state. This can be useful for testing. The default method will
* simply retrieve a list of system and associated indices and delete them.
*
* <p>An implementation may also override {@link #prepareForIndicesMigration(ProjectMetadata, Client, ActionListener)} and
* {@link #indicesMigrationComplete(Map, Client, ActionListener)} in order to take special action before and after a
Expand Down Expand Up @@ -100,12 +101,14 @@ default Collection<AssociatedIndexDescriptor> getAssociatedIndexDescriptors() {
* @param clusterService Cluster service to provide cluster state
* @param projectResolver Project resolver
* @param client A client, for executing actions
* @param masterNodeTimeout Timeout for tasks enqueued on the master node
* @param listener Listener for post-cleanup result
*/
default void cleanUpFeature(
ClusterService clusterService,
ProjectResolver projectResolver,
Client client,
TimeValue masterNodeTimeout,
ActionListener<ResetFeatureStateResponse.ResetFeatureStateStatus> listener
) {
SystemIndices.Feature.cleanUpFeature(
Expand All @@ -115,6 +118,7 @@ default void cleanUpFeature(
clusterService,
projectResolver,
client,
masterNodeTimeout,
listener
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.http.HttpPreRequest;
Expand Down Expand Up @@ -724,6 +725,7 @@ public void cleanUpFeature(
ClusterService clusterService,
ProjectResolver projectResolver,
Client client,
TimeValue masterNodeTimeout,
ActionListener<ResetFeatureStateStatus> finalListener
) {
List<SystemIndexPlugin> systemPlugins = filterPlugins(SystemIndexPlugin.class);
Expand All @@ -741,7 +743,7 @@ public void cleanUpFeature(
}
})
);
systemPlugins.forEach(plugin -> plugin.cleanUpFeature(clusterService, projectResolver, client, allListeners));
systemPlugins.forEach(plugin -> plugin.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, allListeners));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ public void cleanUpFeature(
ClusterService clusterService,
ProjectResolver projectResolver,
Client client,
TimeValue masterNodeTimeout,
ActionListener<ResetFeatureStateStatus> listener
) {
Collection<SystemDataStreamDescriptor> dataStreamDescriptors = getSystemDataStreamDescriptors();
Expand All @@ -335,11 +336,23 @@ public void cleanUpFeature(
DeleteDataStreamAction.INSTANCE,
request,
ActionListener.wrap(
response -> SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener),
response -> SystemIndexPlugin.super.cleanUpFeature(
clusterService,
projectResolver,
client,
masterNodeTimeout,
listener
),
e -> {
Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
if (unwrapped instanceof ResourceNotFoundException) {
SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener);
SystemIndexPlugin.super.cleanUpFeature(
clusterService,
projectResolver,
client,
masterNodeTimeout,
listener
);
} else {
listener.onFailure(e);
}
Expand All @@ -349,13 +362,13 @@ public void cleanUpFeature(
} catch (Exception e) {
Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
if (unwrapped instanceof ResourceNotFoundException) {
SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener);
SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, listener);
} else {
listener.onFailure(e);
}
}
} else {
SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, listener);
SystemIndexPlugin.super.cleanUpFeature(clusterService, projectResolver, client, masterNodeTimeout, listener);
}
}

Expand Down
Loading