diff --git a/server/src/main/java/org/elasticsearch/reservedstate/ReservedClusterStateHandler.java b/server/src/main/java/org/elasticsearch/reservedstate/ReservedClusterStateHandler.java index 042e43891957f..47c412e0ea2f9 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/ReservedClusterStateHandler.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/ReservedClusterStateHandler.java @@ -68,9 +68,9 @@ public interface ReservedClusterStateHandler { *

* Sometimes certain parts of the cluster state cannot be created/updated without previously * setting other cluster state components, e.g. composable templates. Since the reserved cluster state handlers - * are processed in random order by the ReservedClusterStateController, this method gives an opportunity + * are processed in random order by the ReservedClusterStateService, this method gives an opportunity * to any reserved handler to declare other state handlers it depends on. Given dependencies exist, - * the ReservedClusterStateController will order those handlers such that the handlers that are dependent + * the ReservedClusterStateService will order those handlers such that the handlers that are dependent * on are processed first. * * @return a collection of reserved state handler names diff --git a/server/src/main/java/org/elasticsearch/reservedstate/TransformState.java b/server/src/main/java/org/elasticsearch/reservedstate/TransformState.java index 33e01d8b675c6..d91a92e3b2158 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/TransformState.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/TransformState.java @@ -13,7 +13,7 @@ import java.util.Set; /** - * A {@link ClusterState} wrapper used by the ReservedClusterStateController to pass the + * A {@link ClusterState} wrapper used by the ReservedClusterStateService to pass the * current state as well as previous keys set by an {@link ReservedClusterStateHandler} to each transform * step of the cluster state update. * diff --git a/server/src/main/java/org/elasticsearch/reservedstate/action/ReservedClusterSettingsAction.java b/server/src/main/java/org/elasticsearch/reservedstate/action/ReservedClusterSettingsAction.java index c0805cb95b030..ad0a0aefa1230 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/action/ReservedClusterSettingsAction.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/action/ReservedClusterSettingsAction.java @@ -27,7 +27,7 @@ /** * This Action is the reserved state save version of RestClusterUpdateSettingsAction *

- * It is used by the ReservedClusterStateController to update the persistent cluster settings. + * It is used by the ReservedClusterStateService to update the persistent cluster settings. * Since transient cluster settings are deprecated, this action doesn't support updating transient cluster settings. */ public class ReservedClusterSettingsAction implements ReservedClusterStateHandler> { diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ErrorState.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ErrorState.java new file mode 100644 index 0000000000000..401281a8274e0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ErrorState.java @@ -0,0 +1,25 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata; + +import java.util.List; + +import static org.elasticsearch.ExceptionsHelper.stackTrace; + +record ErrorState(String namespace, Long version, List errors, ReservedStateErrorMetadata.ErrorKind errorKind) { + ErrorState(String namespace, Long version, Exception e, ReservedStateErrorMetadata.ErrorKind errorKind) { + this(namespace, version, List.of(stackTrace(e)), errorKind); + } + + public String toString() { + return String.join(", ", errors()); + } +} diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java new file mode 100644 index 0000000000000..1c9bffc269de7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java @@ -0,0 +1,281 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata; +import org.elasticsearch.cluster.metadata.ReservedStateMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.reservedstate.ReservedClusterStateHandler; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParser; + +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.elasticsearch.core.Strings.format; + +/** + * Controller class for storing and reserving a portion of the {@link ClusterState} + *

+ * This class contains the logic about validation, ordering and applying of + * the cluster state specified in a file or through plugins/modules. Reserved cluster state + * cannot be modified through the REST APIs, only through this controller class. + */ +public class ReservedClusterStateService { + private static final Logger logger = LogManager.getLogger(ReservedClusterStateService.class); + + public static final ParseField STATE_FIELD = new ParseField("state"); + public static final ParseField METADATA_FIELD = new ParseField("metadata"); + + final Map> handlers; + final ClusterService clusterService; + private final ReservedStateUpdateTaskExecutor updateStateTaskExecutor; + private final ReservedStateErrorTaskExecutor errorStateTaskExecutor; + + @SuppressWarnings("unchecked") + private final ConstructingObjectParser stateChunkParser = new ConstructingObjectParser<>( + "reserved_state_chunk", + a -> { + List> tuples = (List>) a[0]; + Map stateMap = new HashMap<>(); + for (var tuple : tuples) { + stateMap.put(tuple.v1(), tuple.v2()); + } + + return new ReservedStateChunk(stateMap, (ReservedStateVersion) a[1]); + } + ); + + /** + * Controller class for saving and reserving {@link ClusterState}. + * @param clusterService for fetching and saving the modified state + * @param handlerList a list of reserved state handlers, which we use to transform the state + */ + public ReservedClusterStateService(ClusterService clusterService, List> handlerList) { + this.clusterService = clusterService; + this.updateStateTaskExecutor = new ReservedStateUpdateTaskExecutor(clusterService.getRerouteService()); + this.errorStateTaskExecutor = new ReservedStateErrorTaskExecutor(); + this.handlers = handlerList.stream().collect(Collectors.toMap(ReservedClusterStateHandler::name, Function.identity())); + stateChunkParser.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, name) -> { + if (handlers.containsKey(name) == false) { + throw new IllegalStateException("Missing handler definition for content key [" + name + "]"); + } + p.nextToken(); + return new Tuple<>(name, handlers.get(name).fromXContent(p)); + }, STATE_FIELD); + stateChunkParser.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> ReservedStateVersion.parse(p), METADATA_FIELD); + } + + /** + * Saves and reserves a chunk of the cluster state under a given 'namespace' from {@link XContentParser} + * + * @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata + * @param parser the XContentParser to process + * @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the + * cluster state cannot be correctly applied, null if successful or state couldn't be applied because of incompatible version. + */ + public void process(String namespace, XContentParser parser, Consumer errorListener) { + ReservedStateChunk stateChunk; + + try { + stateChunk = stateChunkParser.apply(parser, null); + } catch (Exception e) { + ErrorState errorState = new ErrorState(namespace, -1L, e, ReservedStateErrorMetadata.ErrorKind.PARSING); + saveErrorState(errorState); + logger.error("error processing state change request for [{}] with the following errors [{}]", namespace, errorState); + + errorListener.accept(new IllegalStateException("Error processing state change request for " + namespace, e)); + return; + } + + process(namespace, stateChunk, errorListener); + } + + /** + * Saves and reserves a chunk of the cluster state under a given 'namespace' from {@link XContentParser} + * + * @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata + * @param reservedStateChunk a {@link ReservedStateChunk} composite state object to process + * @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the + * cluster state cannot be correctly applied, null if successful or the state failed to apply because of incompatible version. + */ + public void process(String namespace, ReservedStateChunk reservedStateChunk, Consumer errorListener) { + Map reservedState = reservedStateChunk.state(); + ReservedStateVersion reservedStateVersion = reservedStateChunk.metadata(); + + LinkedHashSet orderedHandlers; + try { + orderedHandlers = orderedStateHandlers(reservedState.keySet()); + } catch (Exception e) { + ErrorState errorState = new ErrorState( + namespace, + reservedStateVersion.version(), + e, + ReservedStateErrorMetadata.ErrorKind.PARSING + ); + + saveErrorState(errorState); + logger.error("error processing state change request for [{}] with the following errors [{}]", namespace, errorState); + + errorListener.accept(new IllegalStateException("Error processing state change request for " + namespace, e)); + return; + } + + ClusterState state = clusterService.state(); + ReservedStateMetadata existingMetadata = state.metadata().reservedStateMetadata().get(namespace); + if (checkMetadataVersion(namespace, existingMetadata, reservedStateVersion) == false) { + return; + } + + clusterService.submitStateUpdateTask( + "reserved cluster state [" + namespace + "]", + new ReservedStateUpdateTask( + namespace, + reservedStateChunk, + handlers, + orderedHandlers, + (errorState) -> saveErrorState(errorState), + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) { + logger.info("Successfully applied new reserved cluster state for namespace [{}]", namespace); + errorListener.accept(null); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to apply reserved cluster state", e); + errorListener.accept(e); + } + } + ), + ClusterStateTaskConfig.build(Priority.URGENT), + updateStateTaskExecutor + ); + } + + // package private for testing + static boolean checkMetadataVersion( + String namespace, + ReservedStateMetadata existingMetadata, + ReservedStateVersion reservedStateVersion + ) { + if (Version.CURRENT.before(reservedStateVersion.minCompatibleVersion())) { + logger.warn( + () -> format( + "Reserved cluster state version [%s] for namespace [%s] is not compatible with this Elasticsearch node", + reservedStateVersion.minCompatibleVersion(), + namespace + ) + ); + return false; + } + + if (existingMetadata != null && existingMetadata.version() >= reservedStateVersion.version()) { + logger.warn( + () -> format( + "Not updating reserved cluster state for namespace [%s], because version [%s] is less or equal" + + " to the current metadata version [%s]", + namespace, + reservedStateVersion.version(), + existingMetadata.version() + ) + ); + return false; + } + + return true; + } + + private void saveErrorState(ErrorState state) { + clusterService.submitStateUpdateTask( + "reserved cluster state update error for [ " + state.namespace() + "]", + new ReservedStateErrorTask(state, new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) { + logger.info("Successfully applied new reserved error state for namespace [{}]", state.namespace()); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to apply reserved error cluster state", e); + } + }), + ClusterStateTaskConfig.build(Priority.URGENT), + errorStateTaskExecutor + ); + } + + /** + * Returns an ordered set ({@link LinkedHashSet}) of the cluster state handlers that need to + * execute for a given list of handler names supplied through the {@link ReservedStateChunk}. + * @param handlerNames Names of handlers found in the {@link ReservedStateChunk} + * @return + */ + LinkedHashSet orderedStateHandlers(Set handlerNames) { + LinkedHashSet orderedHandlers = new LinkedHashSet<>(); + LinkedHashSet dependencyStack = new LinkedHashSet<>(); + + for (String key : handlerNames) { + addStateHandler(key, handlerNames, orderedHandlers, dependencyStack); + } + + return orderedHandlers; + } + + private void addStateHandler(String key, Set keys, LinkedHashSet ordered, LinkedHashSet visited) { + if (visited.contains(key)) { + StringBuilder msg = new StringBuilder("Cycle found in settings dependencies: "); + visited.forEach(s -> { + msg.append(s); + msg.append(" -> "); + }); + msg.append(key); + throw new IllegalStateException(msg.toString()); + } + + if (ordered.contains(key)) { + // already added by another dependent handler + return; + } + + visited.add(key); + ReservedClusterStateHandler handler = handlers.get(key); + + if (handler == null) { + throw new IllegalStateException("Unknown handler type: " + key); + } + + for (String dependency : handler.dependencies()) { + if (keys.contains(dependency) == false) { + throw new IllegalStateException("Missing handler dependency definition: " + key + " -> " + dependency); + } + addStateHandler(dependency, keys, ordered, visited); + } + + visited.remove(key); + ordered.add(key); + } +} diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateChunk.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateChunk.java new file mode 100644 index 0000000000000..b5b04b509dad5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateChunk.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import java.util.Map; + +/** + * A holder for the cluster state to be saved and reserved and the version info + *

+ * Apart from the cluster state we want to store and reserve, the chunk requires that + * you supply the version metadata. This version metadata (see {@link ReservedStateVersion}) is checked to ensure + * that the update is safe, and it's not unnecessarily repeated. + */ +public record ReservedStateChunk(Map state, ReservedStateVersion metadata) {} diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTask.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTask.java new file mode 100644 index 0000000000000..b6380255f87ee --- /dev/null +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTask.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata; +import org.elasticsearch.cluster.metadata.ReservedStateMetadata; + +/** + * Cluster state update task that sets the error state of the reserved cluster state metadata. + *

+ * This is used when a reserved cluster state update encounters error(s) while processing + * the {@link ReservedStateChunk}. + */ +public class ReservedStateErrorTask implements ClusterStateTaskListener { + + private final ErrorState errorState; + private final ActionListener listener; + + public ReservedStateErrorTask(ErrorState errorState, ActionListener listener) { + this.errorState = errorState; + this.listener = listener; + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + ActionListener listener() { + return listener; + } + + ClusterState execute(ClusterState currentState) { + ClusterState.Builder stateBuilder = new ClusterState.Builder(currentState); + Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + ReservedStateMetadata reservedMetadata = currentState.metadata().reservedStateMetadata().get(errorState.namespace()); + ReservedStateMetadata.Builder resMetadataBuilder = ReservedStateMetadata.builder(errorState.namespace(), reservedMetadata); + resMetadataBuilder.errorMetadata(new ReservedStateErrorMetadata(errorState.version(), errorState.errorKind(), errorState.errors())); + metadataBuilder.put(resMetadataBuilder.build()); + ClusterState newState = stateBuilder.metadata(metadataBuilder).build(); + + return newState; + } +} diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTaskExecutor.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTaskExecutor.java new file mode 100644 index 0000000000000..5a3d70668855b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTaskExecutor.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; + +import java.util.List; + +/** + * Reserved cluster error state task executor + *

+ * We use this task executor to record any errors while updating and reserving the cluster state + */ +record ReservedStateErrorTaskExecutor() implements ClusterStateTaskExecutor { + private static final Logger logger = LogManager.getLogger(ReservedStateErrorTaskExecutor.class); + + @Override + public ClusterState execute(ClusterState currentState, List> taskContexts) { + for (final var taskContext : taskContexts) { + currentState = taskContext.getTask().execute(currentState); + taskContext.success( + () -> taskContext.getTask().listener().delegateFailure((l, s) -> l.onResponse(ActionResponse.Empty.INSTANCE)) + ); + } + return currentState; + } + + @Override + public void clusterStatePublished(ClusterState newClusterState) { + logger.debug("Wrote new error state in reserved cluster state metadata"); + } +} diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java new file mode 100644 index 0000000000000..421378e8ec60a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java @@ -0,0 +1,140 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata; +import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata; +import org.elasticsearch.cluster.metadata.ReservedStateMetadata; +import org.elasticsearch.reservedstate.ReservedClusterStateHandler; +import org.elasticsearch.reservedstate.TransformState; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; + +import static org.elasticsearch.ExceptionsHelper.stackTrace; +import static org.elasticsearch.core.Strings.format; + +/** + * Generic task to update and reserve parts of the cluster state + * + *

+ * Reserved cluster state can only be modified by using the {@link ReservedClusterStateService}. Updating + * the reserved cluster state through REST APIs is not permitted. + */ +public class ReservedStateUpdateTask implements ClusterStateTaskListener { + private static final Logger logger = LogManager.getLogger(ReservedStateUpdateTask.class); + + private final String namespace; + private final ReservedStateChunk stateChunk; + private final Map> handlers; + private final Collection orderedHandlers; + private final Consumer errorReporter; + private final ActionListener listener; + + public ReservedStateUpdateTask( + String namespace, + ReservedStateChunk stateChunk, + Map> handlers, + Collection orderedHandlers, + Consumer errorReporter, + ActionListener listener + ) { + this.namespace = namespace; + this.stateChunk = stateChunk; + this.handlers = handlers; + this.orderedHandlers = orderedHandlers; + this.errorReporter = errorReporter; + this.listener = listener; + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + ActionListener listener() { + return listener; + } + + protected ClusterState execute(final ClusterState currentState) { + ReservedStateMetadata existingMetadata = currentState.metadata().reservedStateMetadata().get(namespace); + Map reservedState = stateChunk.state(); + ReservedStateVersion reservedStateVersion = stateChunk.metadata(); + + var reservedMetadataBuilder = new ReservedStateMetadata.Builder(namespace).version(reservedStateVersion.version()); + List errors = new ArrayList<>(); + + ClusterState state = currentState; + for (var handlerName : orderedHandlers) { + ReservedClusterStateHandler handler = handlers.get(handlerName); + try { + Set existingKeys = keysForHandler(existingMetadata, handlerName); + TransformState transformState = handler.transform(reservedState.get(handlerName), new TransformState(state, existingKeys)); + state = transformState.state(); + reservedMetadataBuilder.putHandler(new ReservedStateHandlerMetadata(handlerName, transformState.keys())); + } catch (Exception e) { + errors.add(format("Error processing %s state change: %s", handler.name(), stackTrace(e))); + } + } + + if (errors.isEmpty() == false) { + // Check if we had previous error metadata with version information, don't spam with cluster state updates, if the + // version hasn't been updated. + logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); + if (existingMetadata != null + && existingMetadata.errorMetadata() != null + && existingMetadata.errorMetadata().version() >= reservedStateVersion.version()) { + + logger.info( + () -> format( + "Not updating error state because version [%s] is less or equal to the last state error version [%s]", + reservedStateVersion.version(), + existingMetadata.errorMetadata().version() + ) + ); + + return currentState; + } + + errorReporter.accept( + new ErrorState(namespace, reservedStateVersion.version(), errors, ReservedStateErrorMetadata.ErrorKind.VALIDATION) + ); + + throw new IllegalStateException("Error processing state change request for " + namespace); + } + + // remove the last error if we had previously encountered any + reservedMetadataBuilder.errorMetadata(null); + + ClusterState.Builder stateBuilder = new ClusterState.Builder(state); + Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).put(reservedMetadataBuilder.build()); + + return stateBuilder.metadata(metadataBuilder).build(); + } + + private Set keysForHandler(ReservedStateMetadata reservedStateMetadata, String handlerName) { + if (reservedStateMetadata == null || reservedStateMetadata.handlers().get(handlerName) == null) { + return Collections.emptySet(); + } + + return reservedStateMetadata.handlers().get(handlerName).keys(); + } +} diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java new file mode 100644 index 0000000000000..9a09f1b253bce --- /dev/null +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.routing.RerouteService; +import org.elasticsearch.common.Priority; + +import java.util.List; + +/** + * Reserved cluster state update task executor + * + * @param rerouteService instance of {@link RerouteService}, so that we can execute reroute after cluster state is published + */ +public record ReservedStateUpdateTaskExecutor(RerouteService rerouteService) implements ClusterStateTaskExecutor { + + private static final Logger logger = LogManager.getLogger(ReservedStateUpdateTaskExecutor.class); + + @Override + public ClusterState execute(ClusterState currentState, List> taskContexts) throws Exception { + for (final var taskContext : taskContexts) { + currentState = taskContext.getTask().execute(currentState); + taskContext.success(() -> taskContext.getTask().listener().onResponse(ActionResponse.Empty.INSTANCE)); + } + return currentState; + } + + @Override + public void clusterStatePublished(ClusterState newClusterState) { + rerouteService.reroute( + "reroute after saving and reserving part of the cluster state", + Priority.NORMAL, + ActionListener.wrap( + r -> logger.trace("reroute after applying and reserving part of the cluster state succeeded"), + e -> logger.debug("reroute after applying and reserving part of the cluster state failed", e) + ) + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateVersion.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateVersion.java new file mode 100644 index 0000000000000..c7c21b458482f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateVersion.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import org.elasticsearch.Version; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParser; + +/** + * File settings metadata class that holds information about + * versioning and Elasticsearch version compatibility + */ +public record ReservedStateVersion(Long version, Version compatibleWith) { + + public static final ParseField VERSION = new ParseField("version"); + public static final ParseField COMPATIBILITY = new ParseField("compatibility"); + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "reserved_cluster_state_version_metadata", + a -> { + Long updateId = Long.parseLong((String) a[0]); + Version minCompatVersion = Version.fromString((String) a[1]); + + return new ReservedStateVersion(updateId, minCompatVersion); + } + ); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), VERSION); + PARSER.declareString(ConstructingObjectParser.constructorArg(), COMPATIBILITY); + } + + public static ReservedStateVersion parse(XContentParser parser) { + return PARSER.apply(parser, null); + } + + public Version minCompatibleVersion() { + return compatibleWith; + } +} diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java new file mode 100644 index 0000000000000..eaf55cdb7c8c3 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java @@ -0,0 +1,455 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateAckListener; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata; +import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata; +import org.elasticsearch.cluster.metadata.ReservedStateMetadata; +import org.elasticsearch.cluster.routing.RerouteService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.reservedstate.ReservedClusterStateHandler; +import org.elasticsearch.reservedstate.TransformState; +import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ReservedClusterStateServiceTests extends ESTestCase { + + public void testOperatorController() throws IOException { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = mock(ClusterService.class); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + when(clusterService.state()).thenReturn(state); + + ReservedClusterStateService controller = new ReservedClusterStateService( + clusterService, + List.of(new ReservedClusterSettingsAction(clusterSettings)) + ); + + String testJSON = """ + { + "metadata": { + "version": "1234", + "compatibility": "8.4.0" + }, + "state": { + "cluster_settings": { + "indices.recovery.max_bytes_per_sec": "50mb" + + } + } + """; + + AtomicReference x = new AtomicReference<>(); + + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON)) { + controller.process("operator", parser, (e) -> x.set(e)); + + assertTrue(x.get() instanceof IllegalStateException); + assertEquals("Error processing state change request for operator", x.get().getMessage()); + } + + testJSON = """ + { + "metadata": { + "version": "1234", + "compatibility": "8.4.0" + }, + "state": { + "cluster_settings": { + "indices.recovery.max_bytes_per_sec": "50mb", + "cluster": { + "remote": { + "cluster_one": { + "seeds": [ + "127.0.0.1:9300" + ] + } + } + } + } + } + } + """; + + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON)) { + controller.process("operator", parser, (e) -> { + if (e != null) { + fail("Should not fail"); + } + }); + } + } + + public void testUpdateStateTasks() throws Exception { + ClusterService clusterService = mock(ClusterService.class); + RerouteService rerouteService = mock(RerouteService.class); + + when(clusterService.getRerouteService()).thenReturn(rerouteService); + ClusterState state = ClusterState.builder(new ClusterName("test")).build(); + + ReservedStateUpdateTaskExecutor taskExecutor = new ReservedStateUpdateTaskExecutor(clusterService.getRerouteService()); + + AtomicBoolean successCalled = new AtomicBoolean(false); + + ReservedStateUpdateTask task = spy( + new ReservedStateUpdateTask( + "test", + null, + Collections.emptyMap(), + Collections.emptySet(), + (errorState) -> {}, + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) {} + + @Override + public void onFailure(Exception e) {} + } + ) + ); + + doReturn(state).when(task).execute(any()); + + ClusterStateTaskExecutor.TaskContext taskContext = new ClusterStateTaskExecutor.TaskContext<>() { + @Override + public ReservedStateUpdateTask getTask() { + return task; + } + + @Override + public void success(Runnable onPublicationSuccess) { + onPublicationSuccess.run(); + successCalled.set(true); + } + + @Override + public void success(Consumer publishedStateConsumer) {} + + @Override + public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void success(Consumer publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void onFailure(Exception failure) {} + }; + + ClusterState newState = taskExecutor.execute(state, List.of(taskContext)); + assertEquals(state, newState); + assertTrue(successCalled.get()); + verify(task, times(1)).execute(any()); + + taskExecutor.clusterStatePublished(state); + verify(rerouteService, times(1)).reroute(anyString(), any(), any()); + } + + public void testErrorStateTask() throws Exception { + ClusterState state = ClusterState.builder(new ClusterName("test")).build(); + + ReservedStateErrorTask task = spy( + new ReservedStateErrorTask( + new ErrorState("test", 1L, List.of("some parse error", "some io error"), ReservedStateErrorMetadata.ErrorKind.PARSING), + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) {} + + @Override + public void onFailure(Exception e) {} + } + ) + ); + + ReservedStateErrorTaskExecutor.TaskContext taskContext = + new ReservedStateErrorTaskExecutor.TaskContext<>() { + @Override + public ReservedStateErrorTask getTask() { + return task; + } + + @Override + public void success(Runnable onPublicationSuccess) { + onPublicationSuccess.run(); + } + + @Override + public void success(Consumer publishedStateConsumer) {} + + @Override + public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void success(Consumer publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void onFailure(Exception failure) {} + }; + + ReservedStateErrorTaskExecutor executor = new ReservedStateErrorTaskExecutor(); + + ClusterState newState = executor.execute(state, List.of(taskContext)); + + verify(task, times(1)).execute(any()); + + ReservedStateMetadata operatorMetadata = newState.metadata().reservedStateMetadata().get("test"); + assertNotNull(operatorMetadata); + assertNotNull(operatorMetadata.errorMetadata()); + assertEquals(1L, (long) operatorMetadata.errorMetadata().version()); + assertEquals(ReservedStateErrorMetadata.ErrorKind.PARSING, operatorMetadata.errorMetadata().errorKind()); + assertThat(operatorMetadata.errorMetadata().errors(), contains("some parse error", "some io error")); + } + + public void testUpdateTaskDuplicateError() { + ReservedClusterStateHandler> newStateMaker = new ReservedClusterStateHandler<>() { + @Override + public String name() { + return "maker"; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + ClusterState newState = new ClusterState.Builder(prevState.state()).build(); + return new TransformState(newState, prevState.keys()); + } + + @Override + public Map fromXContent(XContentParser parser) throws IOException { + return parser.map(); + } + }; + + ReservedClusterStateHandler> exceptionThrower = new ReservedClusterStateHandler<>() { + @Override + public String name() { + return "one"; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + throw new Exception("anything"); + } + + @Override + public Map fromXContent(XContentParser parser) throws IOException { + return parser.map(); + } + }; + + // We submit a task with two handler, one will cause an exception, the other will create a new state. + // When we fail to update the metadata because of version, we ensure that the returned state is equal to the + // original state by pointer reference to avoid cluster state update task to run. + ReservedStateUpdateTask task = new ReservedStateUpdateTask( + "namespace_one", + new ReservedStateChunk(Map.of("one", "two", "maker", "three"), new ReservedStateVersion(1L, Version.CURRENT)), + Map.of(exceptionThrower.name(), exceptionThrower, newStateMaker.name(), newStateMaker), + List.of(exceptionThrower.name(), newStateMaker.name()), + (errorState) -> {}, + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) {} + + @Override + public void onFailure(Exception e) {} + } + ); + + ReservedStateHandlerMetadata hmOne = new ReservedStateHandlerMetadata("one", Set.of("a", "b")); + ReservedStateErrorMetadata emOne = new ReservedStateErrorMetadata( + 1L, + ReservedStateErrorMetadata.ErrorKind.VALIDATION, + List.of("Test error 1", "Test error 2") + ); + + ReservedStateMetadata operatorMetadata = ReservedStateMetadata.builder("namespace_one") + .errorMetadata(emOne) + .version(1L) + .putHandler(hmOne) + .build(); + + Metadata metadata = Metadata.builder().put(operatorMetadata).build(); + ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); + + // We exit on duplicate errors before we update the cluster state error metadata + // The reference == ensures we return the same object as the current state to avoid publishing no-op state update + assertTrue(state == task.execute(state)); + + emOne = new ReservedStateErrorMetadata( + 0L, + ReservedStateErrorMetadata.ErrorKind.VALIDATION, + List.of("Test error 1", "Test error 2") + ); + + // If we are writing with older error metadata, we should get proper IllegalStateException + operatorMetadata = ReservedStateMetadata.builder("namespace_one").errorMetadata(emOne).version(0L).putHandler(hmOne).build(); + + metadata = Metadata.builder().put(operatorMetadata).build(); + ClusterState newState = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); + + // We exit on duplicate errors before we update the cluster state error metadata + assertEquals( + "Error processing state change request for namespace_one", + expectThrows(IllegalStateException.class, () -> task.execute(newState)).getMessage() + ); + } + + public void testCheckMetadataVersion() { + ReservedStateMetadata operatorMetadata = ReservedStateMetadata.builder("test").version(123L).build(); + + assertTrue( + ReservedClusterStateService.checkMetadataVersion("operator", operatorMetadata, new ReservedStateVersion(124L, Version.CURRENT)) + ); + + assertFalse( + ReservedClusterStateService.checkMetadataVersion("operator", operatorMetadata, new ReservedStateVersion(123L, Version.CURRENT)) + ); + + assertFalse( + ReservedClusterStateService.checkMetadataVersion( + "operator", + operatorMetadata, + new ReservedStateVersion(124L, Version.fromId(Version.CURRENT.id + 1)) + ) + ); + } + + private ReservedClusterStateHandler> makeHandlerHelper(final String name, final List deps) { + return new ReservedClusterStateHandler<>() { + @Override + public String name() { + return name; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + return null; + } + + @Override + public Collection dependencies() { + return deps; + } + + @Override + public Map fromXContent(XContentParser parser) throws IOException { + return parser.map(); + } + }; + } + + public void testHandlerOrdering() { + ReservedClusterStateHandler> oh1 = makeHandlerHelper("one", List.of("two", "three")); + ReservedClusterStateHandler> oh2 = makeHandlerHelper("two", Collections.emptyList()); + ReservedClusterStateHandler> oh3 = makeHandlerHelper("three", List.of("two")); + + ClusterService clusterService = mock(ClusterService.class); + final var controller = new ReservedClusterStateService(clusterService, List.of(oh1, oh2, oh3)); + Collection ordered = controller.orderedStateHandlers(Set.of("one", "two", "three")); + assertThat(ordered, contains("two", "three", "one")); + + // assure that we bail on unknown handler + assertEquals( + "Unknown handler type: four", + expectThrows(IllegalStateException.class, () -> controller.orderedStateHandlers(Set.of("one", "two", "three", "four"))) + .getMessage() + ); + + // assure that we bail on missing dependency link + assertEquals( + "Missing handler dependency definition: one -> three", + expectThrows(IllegalStateException.class, () -> controller.orderedStateHandlers(Set.of("one", "two"))).getMessage() + ); + + // Change the second handler so that we create cycle + oh2 = makeHandlerHelper("two", List.of("one")); + + final var controller1 = new ReservedClusterStateService(clusterService, List.of(oh1, oh2)); + + assertThat( + expectThrows(IllegalStateException.class, () -> controller1.orderedStateHandlers(Set.of("one", "two"))).getMessage(), + anyOf( + is("Cycle found in settings dependencies: one -> two -> one"), + is("Cycle found in settings dependencies: two -> one -> two") + ) + ); + } + + public void testDuplicateHandlerNames() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = mock(ClusterService.class); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + when(clusterService.state()).thenReturn(state); + + assertTrue( + expectThrows( + IllegalStateException.class, + () -> new ReservedClusterStateService( + clusterService, + List.of(new ReservedClusterSettingsAction(clusterSettings), new TestHandler()) + ) + ).getMessage().startsWith("Duplicate key cluster_settings") + ); + } + + class TestHandler implements ReservedClusterStateHandler> { + + @Override + public String name() { + return ReservedClusterSettingsAction.NAME; + } + + @Override + public TransformState transform(Object source, TransformState prevState) { + return prevState; + } + + @Override + public Map fromXContent(XContentParser parser) throws IOException { + return parser.map(); + } + } +} diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateControllerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateServiceTests.java similarity index 50% rename from x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateControllerTests.java rename to x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateServiceTests.java index ba1dc0f4b9342..8fa8abcdd2aa7 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateControllerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateServiceTests.java @@ -7,13 +7,25 @@ package org.elasticsearch.xpack.ilm.action; +import org.elasticsearch.Version; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateAckListener; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.reservedstate.TransformState; +import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction; +import org.elasticsearch.reservedstate.service.ReservedClusterStateService; +import org.elasticsearch.reservedstate.service.ReservedStateChunk; +import org.elasticsearch.reservedstate.service.ReservedStateUpdateTask; +import org.elasticsearch.reservedstate.service.ReservedStateUpdateTaskExecutor; +import org.elasticsearch.reservedstate.service.ReservedStateVersion; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ParseField; @@ -27,8 +39,10 @@ import org.elasticsearch.xpack.core.ilm.FreezeAction; import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata; import org.elasticsearch.xpack.core.ilm.LifecycleAction; +import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; import org.elasticsearch.xpack.core.ilm.LifecycleType; import org.elasticsearch.xpack.core.ilm.MigrateAction; +import org.elasticsearch.xpack.core.ilm.Phase; import org.elasticsearch.xpack.core.ilm.ReadOnlyAction; import org.elasticsearch.xpack.core.ilm.RolloverAction; import org.elasticsearch.xpack.core.ilm.RollupILMAction; @@ -38,17 +52,26 @@ import org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType; import org.elasticsearch.xpack.core.ilm.UnfollowAction; import org.elasticsearch.xpack.core.ilm.WaitForSnapshotAction; +import org.mockito.stubbing.Answer; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class ReservedLifecycleStateControllerTests extends ESTestCase { +public class ReservedLifecycleStateServiceTests extends ESTestCase { protected NamedXContentRegistry xContentRegistry() { List entries = new ArrayList<>(ClusterModule.getNamedXWriteables()); @@ -209,4 +232,205 @@ public void testActionAddRemove() throws Exception { ilmMetadata = updatedState.state().metadata().custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY); assertThat(ilmMetadata.getPolicyMetadatas().keySet(), containsInAnyOrder("my_timeseries_lifecycle2")); } + + private void setupTaskMock(ClusterService clusterService, ClusterState state) { + doAnswer((Answer) invocation -> { + Object[] args = invocation.getArguments(); + + if ((args[3] instanceof ReservedStateUpdateTaskExecutor) == false) { + fail("Should have gotten a state update task to execute, instead got: " + args[3].getClass().getName()); + } + + ReservedStateUpdateTaskExecutor task = (ReservedStateUpdateTaskExecutor) args[3]; + + ClusterStateTaskExecutor.TaskContext context = new ClusterStateTaskExecutor.TaskContext<>() { + @Override + public ReservedStateUpdateTask getTask() { + return (ReservedStateUpdateTask) args[1]; + } + + @Override + public void success(Runnable onPublicationSuccess) {} + + @Override + public void success(Consumer publishedStateConsumer) {} + + @Override + public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void success(Consumer publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void onFailure(Exception failure) { + fail("Shouldn't fail here"); + } + }; + + task.execute(state, List.of(context)); + + return null; + }).when(clusterService).submitStateUpdateTask(anyString(), any(), any(), any()); + } + + public void testOperatorControllerFromJSONContent() throws IOException { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = mock(ClusterService.class); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + when(clusterService.state()).thenReturn(state); + + ReservedClusterStateService controller = new ReservedClusterStateService( + clusterService, + List.of(new ReservedClusterSettingsAction(clusterSettings)) + ); + + String testJSON = """ + { + "metadata": { + "version": "1234", + "compatibility": "8.4.0" + }, + "state": { + "cluster_settings": { + "indices.recovery.max_bytes_per_sec": "50mb" + }, + "ilm": { + "my_timeseries_lifecycle": { + "phases": { + "hot": { + "min_age": "10s", + "actions": { + "rollover": { + "max_primary_shard_size": "50gb", + "max_age": "30d" + } + } + }, + "delete": { + "min_age": "30s", + "actions": { + } + } + } + }, + "my_timeseries_lifecycle1": { + "phases": { + "warm": { + "min_age": "10s", + "actions": { + "shrink": { + "number_of_shards": 1 + }, + "forcemerge": { + "max_num_segments": 1 + } + } + }, + "delete": { + "min_age": "30s", + "actions": { + } + } + } + } + } + } + }"""; + + AtomicReference x = new AtomicReference<>(); + + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON)) { + controller.process("operator", parser, (e) -> x.set(e)); + + assertTrue(x.get() instanceof IllegalStateException); + assertEquals("Error processing state change request for operator", x.get().getMessage()); + } + + Client client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + + XPackLicenseState licenseState = mock(XPackLicenseState.class); + + controller = new ReservedClusterStateService( + clusterService, + List.of( + new ReservedClusterSettingsAction(clusterSettings), + new ReservedLifecycleAction(xContentRegistry(), client, licenseState) + ) + ); + + setupTaskMock(clusterService, state); + + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON)) { + controller.process("operator", parser, (e) -> { + if (e != null) { + fail("Should not fail"); + } + }); + } + } + + public void testOperatorControllerWithPluginPackage() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = mock(ClusterService.class); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + when(clusterService.state()).thenReturn(state); + + ReservedClusterStateService controller = new ReservedClusterStateService( + clusterService, + List.of(new ReservedClusterSettingsAction(clusterSettings)) + ); + + AtomicReference x = new AtomicReference<>(); + + ReservedStateChunk pack = new ReservedStateChunk( + Map.of( + ReservedClusterSettingsAction.NAME, + Map.of("indices.recovery.max_bytes_per_sec", "50mb"), + ReservedLifecycleAction.NAME, + List.of( + new LifecyclePolicy( + "my_timeseries_lifecycle", + Map.of( + "warm", + new Phase("warm", new TimeValue(10, TimeUnit.SECONDS), Collections.emptyMap()), + "delete", + new Phase("delete", new TimeValue(30, TimeUnit.SECONDS), Collections.emptyMap()) + ) + ) + ) + ), + new ReservedStateVersion(123L, Version.CURRENT) + ); + + controller.process("operator", pack, (e) -> x.set(e)); + + assertTrue(x.get() instanceof IllegalStateException); + assertEquals("Error processing state change request for operator", x.get().getMessage()); + + Client client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + + XPackLicenseState licenseState = mock(XPackLicenseState.class); + + controller = new ReservedClusterStateService( + clusterService, + List.of( + new ReservedClusterSettingsAction(clusterSettings), + new ReservedLifecycleAction(xContentRegistry(), client, licenseState) + ) + ); + + setupTaskMock(clusterService, state); + + controller.process("operator", pack, (e) -> { + if (e != null) { + fail("Should not fail"); + } + }); + } }