diff --git a/docs/changelog/88224.yaml b/docs/changelog/88224.yaml new file mode 100644 index 0000000000000..0e6265eef9354 --- /dev/null +++ b/docs/changelog/88224.yaml @@ -0,0 +1,5 @@ +pr: 88224 +summary: Immutable cluster state controller +area: Infra/Core +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 1858d8ab28784..0cef2657f2090 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -251,6 +251,7 @@ import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.NamedRegistry; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.TypeLiteral; @@ -262,6 +263,8 @@ import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards; import org.elasticsearch.health.GetHealthAction; import org.elasticsearch.health.RestGetHealthAction; +import org.elasticsearch.immutablestate.ImmutableClusterStateHandler; +import org.elasticsearch.immutablestate.service.ImmutableClusterStateController; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseActions; import org.elasticsearch.indices.SystemIndices; @@ -448,6 +451,7 @@ public class ActionModule extends AbstractModule { private final RequestValidators mappingRequestValidators; private final RequestValidators indicesAliasesRequestRequestValidators; private final ThreadPool threadPool; + private final ImmutableClusterStateController immutableStateController; public ActionModule( Settings settings, @@ -460,7 +464,9 @@ public ActionModule( NodeClient nodeClient, CircuitBreakerService circuitBreakerService, UsageService usageService, - SystemIndices systemIndices + SystemIndices systemIndices, + ClusterService clusterService, + List> reservedStateHandlers ) { this.settings = settings; this.indexNameExpressionResolver = indexNameExpressionResolver; @@ -511,6 +517,7 @@ public ActionModule( ); restController = new RestController(headers, restInterceptor, nodeClient, circuitBreakerService, usageService); + immutableStateController = new ImmutableClusterStateController(clusterService, reservedStateHandlers); } public Map> getActions() { @@ -920,4 +927,8 @@ public ActionFilters getActionFilters() { public RestController getRestController() { return restController; } + + public ImmutableClusterStateController getImmutableClusterStateController() { + return immutableStateController; + } } diff --git a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index 777b0c2356a72..26f9de8dd380b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.ImmutableStateMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -42,7 +43,9 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportService; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.function.Predicate; @@ -172,9 +175,39 @@ protected Set modifiedKeys(Request request) { return Collections.emptySet(); } + // package private for testing + void validateForImmutableState(Request request, ClusterState state) { + Optional handlerName = immutableStateHandlerName(); + assert handlerName.isPresent(); + + Set modified = modifiedKeys(request); + List errors = new ArrayList<>(); + + for (ImmutableStateMetadata metadata : state.metadata().immutableStateMetadata().values()) { + Set conflicts = metadata.conflicts(handlerName.get(), modified); + if (conflicts.isEmpty() == false) { + errors.add(format("[%s] set as read-only by [%s]", String.join(",", conflicts), metadata.namespace())); + } + } + + if (errors.isEmpty() == false) { + throw new IllegalArgumentException( + format("Failed to process request [%s] with errors: %s", request, String.join(System.lineSeparator(), errors)) + ); + } + } + + // package private for testing + boolean supportsImmutableState() { + return immutableStateHandlerName().isPresent(); + } + @Override protected void doExecute(Task task, final Request request, ActionListener listener) { ClusterState state = clusterService.state(); + if (supportsImmutableState()) { + validateForImmutableState(request, state); + } logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version()); if (task != null) { request.setParentTask(clusterService.localNode().getId(), task.getId()); diff --git a/server/src/main/java/org/elasticsearch/common/util/Maps.java b/server/src/main/java/org/elasticsearch/common/util/Maps.java index 5b02f9af64a06..f0a7f464ed8f3 100644 --- a/server/src/main/java/org/elasticsearch/common/util/Maps.java +++ b/server/src/main/java/org/elasticsearch/common/util/Maps.java @@ -284,18 +284,4 @@ static int capacity(int expectedSize) { return expectedSize < 2 ? expectedSize + 1 : (int) (expectedSize / 0.75 + 1.0); } - /** - * Convenience method to convert the passed in input object as a map with String keys. - * - * @param input the input passed into the operator handler after parsing the content - * @return - */ - @SuppressWarnings("unchecked") - public static Map asMap(Object input) { - if (input instanceof Map source) { - return (Map) source; - } - throw new IllegalStateException("Unsupported input format"); - } - } diff --git a/server/src/main/java/org/elasticsearch/immutablestate/ImmutableClusterStateHandler.java b/server/src/main/java/org/elasticsearch/immutablestate/ImmutableClusterStateHandler.java index f8428a9423c51..5ab1023d9c0f1 100644 --- a/server/src/main/java/org/elasticsearch/immutablestate/ImmutableClusterStateHandler.java +++ b/server/src/main/java/org/elasticsearch/immutablestate/ImmutableClusterStateHandler.java @@ -10,7 +10,9 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.xcontent.XContentParser; +import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -35,7 +37,6 @@ public interface ImmutableClusterStateHandler { * cluster state update and the cluster state is typically supplied as a combined content, * unlike the REST handlers. This name must match a desired content key name in the combined * cluster state update, e.g. "ilm" or "cluster_settings" (for persistent cluster settings update). - *

* * @return a String with the handler name, e.g "ilm". */ @@ -51,7 +52,6 @@ public interface ImmutableClusterStateHandler { * state in one go. For that reason, we supply a wrapper class to the cluster state called * {@link TransformState}, which contains the current cluster state as well as any previous keys * set by this handler on prior invocation. - *

* * @param source The parsed information specific to this handler from the combined cluster state content * @param prevState The previous cluster state and keys set by this handler (if any) @@ -70,7 +70,6 @@ public interface ImmutableClusterStateHandler { * to any immutable handler to declare other immutable state handlers it depends on. Given dependencies exist, * the ImmutableClusterStateController will order those handlers such that the handlers that are dependent * on are processed first. - *

* * @return a collection of immutable state handler names */ @@ -85,7 +84,6 @@ default Collection dependencies() { * All implementations of {@link ImmutableClusterStateHandler} should call the request validate method, by calling this default * implementation. To aid in any special validation logic that may need to be implemented by the immutable cluster state handler * we provide this convenience method. - *

* * @param request the master node request that we base this immutable state handler on */ @@ -95,4 +93,18 @@ default void validate(MasterNodeRequest request) { throw new IllegalStateException("Validation error", exception); } } + + /** + * The parse content method which is called during parsing of file based content. + * + *

+ * The immutable state can be provided as XContent, which means that each handler needs + * to implement a method to convert an XContent to an object it can consume later in + * transform + * + * @param parser the XContent parser we are parsing from + * @return + * @throws IOException + */ + T fromXContent(XContentParser parser) throws IOException; } diff --git a/server/src/main/java/org/elasticsearch/immutablestate/action/ImmutableClusterSettingsAction.java b/server/src/main/java/org/elasticsearch/immutablestate/action/ImmutableClusterSettingsAction.java index cd82f17ecfa8f..249914276d83d 100644 --- a/server/src/main/java/org/elasticsearch/immutablestate/action/ImmutableClusterSettingsAction.java +++ b/server/src/main/java/org/elasticsearch/immutablestate/action/ImmutableClusterSettingsAction.java @@ -15,22 +15,22 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.immutablestate.ImmutableClusterStateHandler; import org.elasticsearch.immutablestate.TransformState; +import org.elasticsearch.xcontent.XContentParser; +import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import static org.elasticsearch.common.util.Maps.asMap; - /** * This Action is the immutable state save version of RestClusterUpdateSettingsAction *

* It is used by the ImmutableClusterStateController to update the persistent cluster settings. * Since transient cluster settings are deprecated, this action doesn't support updating transient cluster settings. */ -public class ImmutableClusterSettingsAction implements ImmutableClusterStateHandler { +public class ImmutableClusterSettingsAction implements ImmutableClusterStateHandler> { public static final String NAME = "cluster_settings"; @@ -49,11 +49,12 @@ public String name() { private ClusterUpdateSettingsRequest prepare(Object input, Set previouslySet) { final ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = Requests.clusterUpdateSettingsRequest(); - Map source = asMap(input); Map persistentSettings = new HashMap<>(); Set toDelete = new HashSet<>(previouslySet); - source.forEach((k, v) -> { + Map settings = (Map) input; + + settings.forEach((k, v) -> { persistentSettings.put(k, v); toDelete.remove(k); }); @@ -87,4 +88,9 @@ public TransformState transform(Object input, TransformState prevState) { return new TransformState(state, currentKeys); } + + @Override + public Map fromXContent(XContentParser parser) throws IOException { + return parser.map(); + } } diff --git a/server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableClusterStateController.java b/server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableClusterStateController.java new file mode 100644 index 0000000000000..5e763625792f3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableClusterStateController.java @@ -0,0 +1,355 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.immutablestate.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.metadata.ImmutableStateErrorMetadata; +import org.elasticsearch.cluster.metadata.ImmutableStateMetadata; +import org.elasticsearch.cluster.routing.RerouteService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.immutablestate.ImmutableClusterStateHandler; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParser; + +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.elasticsearch.core.Strings.format; + +/** + * Controller class for applying immutable state to ClusterState. + *

+ * This class contains the logic about validation, ordering and applying of + * the cluster state specified in a file or through plugins/modules. + */ +public class ImmutableClusterStateController { + private static final Logger logger = LogManager.getLogger(ImmutableClusterStateController.class); + + public static final ParseField STATE_FIELD = new ParseField("state"); + public static final ParseField METADATA_FIELD = new ParseField("metadata"); + + final Map> handlers; + final ClusterService clusterService; + private final ImmutableUpdateStateTaskExecutor updateStateTaskExecutor; + private final ImmutableUpdateErrorTaskExecutor errorStateTaskExecutor; + + @SuppressWarnings("unchecked") + private final ConstructingObjectParser packageParser = new ConstructingObjectParser<>("immutable_cluster_package", a -> { + List> tuples = (List>) a[0]; + Map stateMap = new HashMap<>(); + for (var tuple : tuples) { + stateMap.put(tuple.v1(), tuple.v2()); + } + + return new Package(stateMap, (PackageVersion) a[1]); + }); + + /** + * Controller class for saving immutable ClusterState. + * @param clusterService for fetching and saving the modified state + */ + public ImmutableClusterStateController(ClusterService clusterService, List> handlerList) { + this.clusterService = clusterService; + this.updateStateTaskExecutor = new ImmutableUpdateStateTaskExecutor(clusterService.getRerouteService()); + this.errorStateTaskExecutor = new ImmutableUpdateErrorTaskExecutor(); + this.handlers = handlerList.stream().collect(Collectors.toMap(ImmutableClusterStateHandler::name, Function.identity())); + packageParser.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, name) -> { + if (handlers.containsKey(name) == false) { + throw new IllegalStateException("Missing handler definition for content key [" + name + "]"); + } + p.nextToken(); + return new Tuple<>(name, handlers.get(name).fromXContent(p)); + }, STATE_FIELD); + packageParser.declareObject(ConstructingObjectParser.constructorArg(), PackageVersion::parse, METADATA_FIELD); + } + + /** + * A package class containing the composite immutable cluster state + *

+ * Apart from the cluster state we want to store as immutable, the package requires that + * you supply the version metadata. This version metadata (see {@link PackageVersion}) is checked to ensure + * that the update is safe, and it's not unnecessarily repeated. + */ + public record Package(Map state, PackageVersion metadata) {} + + /** + * Saves an immutable cluster state for a given 'namespace' from {@link XContentParser} + * + * @param namespace the namespace under which we'll store the immutable keys in the cluster state metadata + * @param parser the XContentParser to process + * @param errorListener a consumer called with IllegalStateException if the content has errors and the + * cluster state cannot be correctly applied, IncompatibleVersionException if the content is stale or + * incompatible with this node {@link Version}, null if successful. + */ + public void process(String namespace, XContentParser parser, Consumer errorListener) { + Package immutableStatePackage; + + try { + immutableStatePackage = packageParser.apply(parser, null); + } catch (Exception e) { + List errors = List.of(e.getMessage()); + recordErrorState(new ImmutableUpdateErrorState(namespace, -1L, errors, ImmutableStateErrorMetadata.ErrorKind.PARSING)); + logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); + + errorListener.accept(new IllegalStateException("Error processing state change request for " + namespace, e)); + return; + } + + process(namespace, immutableStatePackage, errorListener); + } + + /** + * Saves an immutable cluster state for a given 'namespace' from {@link Package} + * + * @param namespace the namespace under which we'll store the immutable keys in the cluster state metadata + * @param immutableStateFilePackage a {@link Package} composite state object to process + * @param errorListener a consumer called with IllegalStateException if the content has errors and the + * cluster state cannot be correctly applied, IncompatibleVersionException if the content is stale or + * incompatible with this node {@link Version}, null if successful. + */ + public void process(String namespace, Package immutableStateFilePackage, Consumer errorListener) { + Map immutableState = immutableStateFilePackage.state; + PackageVersion packageVersion = immutableStateFilePackage.metadata; + + LinkedHashSet orderedHandlers; + try { + orderedHandlers = orderedStateHandlers(immutableState.keySet()); + } catch (Exception e) { + List errors = List.of(e.getMessage()); + recordErrorState( + new ImmutableUpdateErrorState(namespace, packageVersion.version(), errors, ImmutableStateErrorMetadata.ErrorKind.PARSING) + ); + logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); + + errorListener.accept(new IllegalStateException("Error processing state change request for " + namespace, e)); + return; + } + + ClusterState state = clusterService.state(); + ImmutableStateMetadata existingMetadata = state.metadata().immutableStateMetadata().get(namespace); + if (checkMetadataVersion(existingMetadata, packageVersion, errorListener) == false) { + return; + } + + clusterService.submitStateUpdateTask( + "immutable cluster state [" + namespace + "]", + new ImmutableStateUpdateStateTask( + namespace, + immutableStateFilePackage, + handlers, + orderedHandlers, + (errorState) -> recordErrorState(errorState), + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) { + logger.info("Successfully applied new cluster state for namespace [{}]", namespace); + errorListener.accept(null); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to apply immutable cluster state", e); + errorListener.accept(e); + } + } + ), + ClusterStateTaskConfig.build(Priority.URGENT), + updateStateTaskExecutor + ); + } + + // package private for testing + static boolean checkMetadataVersion( + ImmutableStateMetadata existingMetadata, + PackageVersion packageVersion, + Consumer errorListener + ) { + if (Version.CURRENT.before(packageVersion.minCompatibleVersion())) { + errorListener.accept( + new IncompatibleVersionException( + format( + "Cluster state version [%s] is not compatible with this Elasticsearch node", + packageVersion.minCompatibleVersion() + ) + ) + ); + return false; + } + + if (existingMetadata != null && existingMetadata.version() >= packageVersion.version()) { + errorListener.accept( + new IncompatibleVersionException( + format( + "Not updating cluster state because version [%s] is less or equal to the current metadata version [%s]", + packageVersion.version(), + existingMetadata.version() + ) + ) + ); + return false; + } + + return true; + } + + record ImmutableUpdateErrorState( + String namespace, + Long version, + List errors, + ImmutableStateErrorMetadata.ErrorKind errorKind + ) {} + + private void recordErrorState(ImmutableUpdateErrorState state) { + clusterService.submitStateUpdateTask( + "immutable cluster state update error for [ " + state.namespace + "]", + new ImmutableStateUpdateErrorTask(state, new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) { + logger.info("Successfully applied new immutable error state for namespace [{}]", state.namespace); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to apply immutable error cluster state", e); + } + }), + ClusterStateTaskConfig.build(Priority.URGENT), + errorStateTaskExecutor + ); + } + + // package private for testing + LinkedHashSet orderedStateHandlers(Set keys) { + LinkedHashSet orderedHandlers = new LinkedHashSet<>(); + LinkedHashSet dependencyStack = new LinkedHashSet<>(); + + for (String key : keys) { + addStateHandler(key, keys, orderedHandlers, dependencyStack); + } + + return orderedHandlers; + } + + private void addStateHandler(String key, Set keys, LinkedHashSet ordered, LinkedHashSet visited) { + if (visited.contains(key)) { + StringBuilder msg = new StringBuilder("Cycle found in settings dependencies: "); + visited.forEach(s -> { + msg.append(s); + msg.append(" -> "); + }); + msg.append(key); + throw new IllegalStateException(msg.toString()); + } + + if (ordered.contains(key)) { + // already added by another dependent handler + return; + } + + visited.add(key); + ImmutableClusterStateHandler handler = handlers.get(key); + + if (handler == null) { + throw new IllegalStateException("Unknown settings definition type: " + key); + } + + for (String dependency : handler.dependencies()) { + if (keys.contains(dependency) == false) { + throw new IllegalStateException("Missing settings dependency definition: " + key + " -> " + dependency); + } + addStateHandler(dependency, keys, ordered, visited); + } + + visited.remove(key); + ordered.add(key); + } + + /** + * {@link IncompatibleVersionException} is thrown when we try to update the cluster state + * without changing the update version id, or if we try to update cluster state on + * an incompatible Elasticsearch version in mixed cluster mode. + */ + public static class IncompatibleVersionException extends RuntimeException { + public IncompatibleVersionException(String message) { + super(message); + } + } + + /** + * Immutable cluster state update task executor + * + * @param rerouteService instance of {@link RerouteService}, so that we can execute reroute after cluster state is published + */ + public record ImmutableUpdateStateTaskExecutor(RerouteService rerouteService) + implements + ClusterStateTaskExecutor { + + @Override + public ClusterState execute(ClusterState currentState, List> taskContexts) + throws Exception { + for (final var taskContext : taskContexts) { + currentState = taskContext.getTask().execute(currentState); + taskContext.success(() -> taskContext.getTask().listener().onResponse(ActionResponse.Empty.INSTANCE)); + } + return currentState; + } + + @Override + public void clusterStatePublished(ClusterState newClusterState) { + rerouteService.reroute( + "reroute after applying immutable cluster state", + Priority.NORMAL, + ActionListener.wrap( + r -> logger.trace("reroute after applying immutable cluster state succeeded"), + e -> logger.debug("reroute after applying immutable cluster state failed", e) + ) + ); + } + } + + /** + * Immutable cluster error state task executor + *

+ * We use this task executor to record any errors while updating immutable cluster state + */ + public record ImmutableUpdateErrorTaskExecutor() implements ClusterStateTaskExecutor { + @Override + public ClusterState execute(ClusterState currentState, List> taskContexts) + throws Exception { + for (final var taskContext : taskContexts) { + currentState = taskContext.getTask().execute(currentState); + taskContext.success( + () -> taskContext.getTask().listener().delegateFailure((l, s) -> l.onResponse(ActionResponse.Empty.INSTANCE)) + ); + } + return currentState; + } + + @Override + public void clusterStatePublished(ClusterState newClusterState) { + logger.info("Wrote new error state in immutable metadata"); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableStateUpdateErrorTask.java b/server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableStateUpdateErrorTask.java new file mode 100644 index 0000000000000..2e2c276a3ac1e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableStateUpdateErrorTask.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.immutablestate.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.metadata.ImmutableStateErrorMetadata; +import org.elasticsearch.cluster.metadata.ImmutableStateMetadata; +import org.elasticsearch.cluster.metadata.Metadata; + +/** + * Cluster state update task that sets the error state of the immutable cluster state metadata. + *

+ * This is used when an immutable cluster state update encounters error(s) while processing + * the {@link org.elasticsearch.immutablestate.service.ImmutableClusterStateController.Package}. + */ +public class ImmutableStateUpdateErrorTask implements ClusterStateTaskListener { + + private final ImmutableClusterStateController.ImmutableUpdateErrorState errorState; + private final ActionListener listener; + + public ImmutableStateUpdateErrorTask( + ImmutableClusterStateController.ImmutableUpdateErrorState errorState, + ActionListener listener + ) { + this.errorState = errorState; + this.listener = listener; + } + + private static final Logger logger = LogManager.getLogger(ImmutableStateUpdateErrorTask.class); + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + ActionListener listener() { + return listener; + } + + ClusterState execute(ClusterState currentState) { + ClusterState.Builder stateBuilder = new ClusterState.Builder(currentState); + Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + ImmutableStateMetadata immutableMetadata = currentState.metadata().immutableStateMetadata().get(errorState.namespace()); + ImmutableStateMetadata.Builder immMetadataBuilder = ImmutableStateMetadata.builder(errorState.namespace(), immutableMetadata); + immMetadataBuilder.errorMetadata( + new ImmutableStateErrorMetadata(errorState.version(), errorState.errorKind(), errorState.errors()) + ); + metadataBuilder.put(immMetadataBuilder.build()); + ClusterState newState = stateBuilder.metadata(metadataBuilder).build(); + + return newState; + } +} diff --git a/server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableStateUpdateStateTask.java b/server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableStateUpdateStateTask.java new file mode 100644 index 0000000000000..97e005b4f23a2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableStateUpdateStateTask.java @@ -0,0 +1,138 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.immutablestate.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.metadata.ImmutableStateErrorMetadata; +import org.elasticsearch.cluster.metadata.ImmutableStateHandlerMetadata; +import org.elasticsearch.cluster.metadata.ImmutableStateMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.immutablestate.ImmutableClusterStateHandler; +import org.elasticsearch.immutablestate.TransformState; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; + +import static org.elasticsearch.core.Strings.format; + +/** + * Generic immutable cluster state update task + */ +public class ImmutableStateUpdateStateTask implements ClusterStateTaskListener { + private static final Logger logger = LogManager.getLogger(ImmutableStateUpdateStateTask.class); + + private final String namespace; + private final ImmutableClusterStateController.Package immutableStatePackage; + private final Map> handlers; + private final Collection orderedHandlers; + private final Consumer recordErrorState; + private final ActionListener listener; + + public ImmutableStateUpdateStateTask( + String namespace, + ImmutableClusterStateController.Package immutableStatePackage, + Map> handlers, + Collection orderedHandlers, + Consumer recordErrorState, + ActionListener listener + ) { + this.namespace = namespace; + this.immutableStatePackage = immutableStatePackage; + this.handlers = handlers; + this.orderedHandlers = orderedHandlers; + this.recordErrorState = recordErrorState; + this.listener = listener; + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + ActionListener listener() { + return listener; + } + + protected ClusterState execute(ClusterState state) { + ImmutableStateMetadata existingMetadata = state.metadata().immutableStateMetadata().get(namespace); + Map immutableState = immutableStatePackage.state(); + PackageVersion packageVersion = immutableStatePackage.metadata(); + + var immutableMetadataBuilder = new ImmutableStateMetadata.Builder(namespace).version(packageVersion.version()); + List errors = new ArrayList<>(); + + for (var handlerName : orderedHandlers) { + ImmutableClusterStateHandler handler = handlers.get(handlerName); + try { + Set existingKeys = keysForHandler(existingMetadata, handlerName); + TransformState transformState = handler.transform(immutableState.get(handlerName), new TransformState(state, existingKeys)); + state = transformState.state(); + immutableMetadataBuilder.putHandler(new ImmutableStateHandlerMetadata(handlerName, transformState.keys())); + } catch (Exception e) { + errors.add(format("Error processing %s state change: %s", handler.name(), e.getMessage())); + } + } + + if (errors.isEmpty() == false) { + // Check if we had previous error metadata with version information, don't spam with cluster state updates, if the + // version hasn't been updated. + if (existingMetadata != null + && existingMetadata.errorMetadata() != null + && existingMetadata.errorMetadata().version() >= packageVersion.version()) { + logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); + + throw new ImmutableClusterStateController.IncompatibleVersionException( + format( + "Not updating error state because version [%s] is less or equal to the last state error version [%s]", + packageVersion.version(), + existingMetadata.errorMetadata().version() + ) + ); + } + + recordErrorState.accept( + new ImmutableClusterStateController.ImmutableUpdateErrorState( + namespace, + packageVersion.version(), + errors, + ImmutableStateErrorMetadata.ErrorKind.VALIDATION + ) + ); + logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); + + throw new IllegalStateException("Error processing state change request for " + namespace); + } + + // remove the last error if we had previously encountered any + immutableMetadataBuilder.errorMetadata(null); + + ClusterState.Builder stateBuilder = new ClusterState.Builder(state); + Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).put(immutableMetadataBuilder.build()); + + return stateBuilder.metadata(metadataBuilder).build(); + } + + private Set keysForHandler(ImmutableStateMetadata immutableStateMetadata, String handlerName) { + if (immutableStateMetadata == null || immutableStateMetadata.handlers().get(handlerName) == null) { + return Collections.emptySet(); + } + + return immutableStateMetadata.handlers().get(handlerName).keys(); + } +} diff --git a/server/src/main/java/org/elasticsearch/immutablestate/service/PackageVersion.java b/server/src/main/java/org/elasticsearch/immutablestate/service/PackageVersion.java new file mode 100644 index 0000000000000..95a48112515da --- /dev/null +++ b/server/src/main/java/org/elasticsearch/immutablestate/service/PackageVersion.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.immutablestate.service; + +import org.elasticsearch.Version; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParser; + +/** + * File settings metadata class that holds information about + * versioning and Elasticsearch version compatibility + */ +public record PackageVersion(Long version, Version compatibleWith) { + public static final ParseField VERSION = new ParseField("version"); + public static final ParseField COMPATIBILITY = new ParseField("compatibility"); + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "immutable_cluster_state_version_metadata", + a -> { + Long updateId = Long.parseLong((String) a[0]); + Version minCompatVersion = Version.fromString((String) a[1]); + + return new PackageVersion(updateId, minCompatVersion); + } + ); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), VERSION); + PARSER.declareString(ConstructingObjectParser.constructorArg(), COMPATIBILITY); + } + + public static PackageVersion parse(XContentParser parser, Void v) { + return PARSER.apply(parser, v); + } + + public Version minCompatibleVersion() { + return compatibleWith; + } +} diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 7f87c76001c39..9f32316694d3a 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -105,6 +105,9 @@ import org.elasticsearch.health.node.selection.HealthNode; import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor; import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.immutablestate.ImmutableClusterStateHandler; +import org.elasticsearch.immutablestate.ImmutableClusterStateHandlerProvider; +import org.elasticsearch.immutablestate.action.ImmutableClusterSettingsAction; import org.elasticsearch.index.IndexSettingProvider; import org.elasticsearch.index.IndexSettingProviders; import org.elasticsearch.index.IndexSettings; @@ -705,6 +708,17 @@ protected Node( ) ).toList(); + List> reservedStateHandlers = new ArrayList<>(); + + // add all reserved state handlers from server + reservedStateHandlers.add(new ImmutableClusterSettingsAction(settingsModule.getClusterSettings())); + + // add all reserved state handlers from plugins + List pluginHandlers = pluginsService.loadServiceProviders( + ImmutableClusterStateHandlerProvider.class + ); + pluginHandlers.forEach(h -> reservedStateHandlers.addAll(h.handlers())); + ActionModule actionModule = new ActionModule( settings, clusterModule.getIndexNameExpressionResolver(), @@ -716,7 +730,9 @@ protected Node( client, circuitBreakerService, usageService, - systemIndices + systemIndices, + clusterService, + reservedStateHandlers ); modules.add(actionModule); diff --git a/server/src/main/java/org/elasticsearch/plugins/PluginsService.java b/server/src/main/java/org/elasticsearch/plugins/PluginsService.java index 7e2e13d5343f5..9e251fae881c5 100644 --- a/server/src/main/java/org/elasticsearch/plugins/PluginsService.java +++ b/server/src/main/java/org/elasticsearch/plugins/PluginsService.java @@ -309,7 +309,10 @@ public List loadServiceProviders(Class service) { List result = new ArrayList<>(); for (LoadedPlugin pluginTuple : plugins()) { - result.addAll(createExtensions(service, pluginTuple.instance)); + // Only load SPI providers if they are loaded by different class loader + if (pluginTuple.loader().equals(this.getClass().getClassLoader()) == false) { + result.addAll(createExtensions(service, pluginTuple.instance())); + } } return Collections.unmodifiableList(result); diff --git a/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java b/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java index a1993bd942f91..bdac66ba98acf 100644 --- a/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java +++ b/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; @@ -40,6 +41,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.function.Supplier; import java.util.function.UnaryOperator; @@ -49,6 +51,7 @@ import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.startsWith; +import static org.mockito.Mockito.mock; public class ActionModuleTests extends ESTestCase { public void testSetupActionsContainsKnownBuiltin() { @@ -116,7 +119,9 @@ public void testSetupRestHandlerContainsKnownBuiltin() { null, null, usageService, - null + null, + mock(ClusterService.class), + Collections.emptyList() ); actionModule.initRestHandlers(null); // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail @@ -172,7 +177,9 @@ public String getName() { null, null, usageService, - null + null, + mock(ClusterService.class), + Collections.emptyList() ); Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null)); assertThat(e.getMessage(), startsWith("Cannot replace existing handler for [/] for method: GET")); @@ -221,7 +228,9 @@ public List getRestHandlers( null, null, usageService, - null + null, + mock(ClusterService.class), + Collections.emptyList() ); actionModule.initRestHandlers(null); // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail @@ -265,7 +274,9 @@ public void test3rdPartyHandlerIsNotInstalled() { null, null, usageService, - null + null, + mock(ClusterService.class), + Collections.emptyList() ) ); assertThat( diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsRequestTests.java index 042f7f150788a..d31c53adfae63 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsRequestTests.java @@ -8,9 +8,17 @@ package org.elasticsearch.action.admin.cluster.settings; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.immutablestate.action.ImmutableClusterSettingsAction; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.XContentTestUtils; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentParseException; import org.elasticsearch.xcontent.XContentParser; @@ -21,6 +29,8 @@ import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.mockito.Mockito.mock; public class ClusterUpdateSettingsRequestTests extends ESTestCase { @@ -71,4 +81,43 @@ private static ClusterUpdateSettingsRequest createTestItem() { request.transientSettings(ClusterUpdateSettingsResponseTests.randomClusterSettings(0, 2)); return request; } + + public void testOperatorHandler() throws IOException { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + TransportClusterUpdateSettingsAction action = new TransportClusterUpdateSettingsAction( + mock(TransportService.class), + mock(ClusterService.class), + mock(ThreadPool.class), + mock(ActionFilters.class), + mock(IndexNameExpressionResolver.class), + clusterSettings + ); + + assertEquals(ImmutableClusterSettingsAction.NAME, action.immutableStateHandlerName().get()); + + String oneSettingJSON = """ + { + "persistent": { + "indices.recovery.max_bytes_per_sec": "25mb", + "cluster": { + "remote": { + "cluster_one": { + "seeds": [ + "127.0.0.1:9300" + ] + } + } + } + } + }"""; + + try (XContentParser parser = createParser(XContentType.JSON.xContent(), oneSettingJSON)) { + ClusterUpdateSettingsRequest parsedRequest = ClusterUpdateSettingsRequest.fromXContent(parser); + assertThat( + action.modifiedKeys(parsedRequest), + containsInAnyOrder("indices.recovery.max_bytes_per_sec", "cluster.remote.cluster_one.seeds") + ); + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java index 9770b1c42dc0f..5bef002fc3511 100644 --- a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java @@ -14,12 +14,14 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlock; @@ -27,6 +29,8 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; +import org.elasticsearch.cluster.metadata.ImmutableStateHandlerMetadata; +import org.elasticsearch.cluster.metadata.ImmutableStateMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; @@ -41,6 +45,7 @@ import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.core.TimeValue; import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.immutablestate.action.ImmutableClusterSettingsAction; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; @@ -65,6 +70,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -254,6 +260,63 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) } } + class ImmutableStateAction extends Action { + ImmutableStateAction(String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) { + super(actionName, transportService, clusterService, threadPool, ThreadPool.Names.SAME); + } + + @Override + protected Optional immutableStateHandlerName() { + return Optional.of("test_immutable_state_action"); + } + } + + class FakeClusterStateUpdateAction extends TransportMasterNodeAction { + FakeClusterStateUpdateAction( + String actionName, + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + String executor + ) { + super( + actionName, + transportService, + clusterService, + threadPool, + new ActionFilters(new HashSet<>()), + ClusterUpdateSettingsRequest::new, + TestIndexNameExpressionResolver.newInstance(), + Response::new, + executor + ); + } + + @Override + protected void masterOperation( + Task task, + ClusterUpdateSettingsRequest request, + ClusterState state, + ActionListener listener + ) {} + + @Override + protected ClusterBlockException checkBlock(ClusterUpdateSettingsRequest request, ClusterState state) { + return null; + } + + @Override + protected Optional immutableStateHandlerName() { + return Optional.of(ImmutableClusterSettingsAction.NAME); + } + + @Override + protected Set modifiedKeys(ClusterUpdateSettingsRequest request) { + Settings allSettings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build(); + return allSettings.keySet(); + } + } + public void testLocalOperationWithoutBlocks() throws ExecutionException, InterruptedException { final boolean masterOperationFailure = randomBoolean(); @@ -686,7 +749,6 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, request) ); } - }; PlainActionFuture listener = new PlainActionFuture<>(); @@ -697,6 +759,54 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) assertThat(ex.getCause().getCause(), instanceOf(ClusterBlockException.class)); } + public void testRejectImmutableConflictClusterStateUpdate() { + ImmutableStateHandlerMetadata hmOne = new ImmutableStateHandlerMetadata(ImmutableClusterSettingsAction.NAME, Set.of("a", "b")); + ImmutableStateHandlerMetadata hmThree = new ImmutableStateHandlerMetadata(ImmutableClusterSettingsAction.NAME, Set.of("e", "f")); + ImmutableStateMetadata omOne = ImmutableStateMetadata.builder("namespace_one").putHandler(hmOne).build(); + ImmutableStateMetadata omTwo = ImmutableStateMetadata.builder("namespace_two").putHandler(hmThree).build(); + + Metadata metadata = Metadata.builder().put(omOne).put(omTwo).build(); + + ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); + + Action noHandler = new Action("internal:testAction", transportService, clusterService, threadPool, ThreadPool.Names.SAME); + + assertFalse(noHandler.supportsImmutableState()); + + noHandler = new ImmutableStateAction("internal:testOpAction", transportService, clusterService, threadPool); + + assertTrue(noHandler.supportsImmutableState()); + + // nothing should happen here, since the request doesn't touch any of the immutable state keys + noHandler.validateForImmutableState(new Request(), clusterState); + + ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest().persistentSettings( + Settings.builder().put("a", "a value").build() + ).transientSettings(Settings.builder().put("e", "e value").build()); + + FakeClusterStateUpdateAction action = new FakeClusterStateUpdateAction( + "internal:testClusterSettings", + transportService, + clusterService, + threadPool, + ThreadPool.Names.SAME + ); + + assertTrue(action.supportsImmutableState()); + + assertTrue( + expectThrows(IllegalArgumentException.class, () -> action.validateForImmutableState(request, clusterState)).getMessage() + .contains("with errors: [a] set as read-only by [namespace_one]\n" + "[e] set as read-only by [namespace_two]") + ); + + ClusterUpdateSettingsRequest okRequest = new ClusterUpdateSettingsRequest().persistentSettings( + Settings.builder().put("m", "m value").build() + ).transientSettings(Settings.builder().put("n", "n value").build()); + + // this should just work, no conflicts + action.validateForImmutableState(okRequest, clusterState); + } + private Runnable blockAllThreads(String executorName) throws Exception { final int numberOfThreads = threadPool.info(executorName).getMax(); final EsThreadPoolExecutor executor = (EsThreadPoolExecutor) threadPool.executor(executorName); diff --git a/server/src/test/java/org/elasticsearch/immutablestate/ImmutableClusterStateHandlerTests.java b/server/src/test/java/org/elasticsearch/immutablestate/ImmutableClusterStateHandlerTests.java index 86eddbaadad17..c46ac036eb823 100644 --- a/server/src/test/java/org/elasticsearch/immutablestate/ImmutableClusterStateHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/immutablestate/ImmutableClusterStateHandlerTests.java @@ -10,18 +10,11 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.master.MasterNodeRequest; -import org.elasticsearch.common.util.Maps; -import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.indices.settings.InternalOrPrivateSettingsPlugin; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.XContentParser; -import org.elasticsearch.xcontent.XContentParserConfiguration; -import org.elasticsearch.xcontent.XContentType; import java.io.IOException; -import java.util.Map; - -import static org.hamcrest.Matchers.containsInAnyOrder; public class ImmutableClusterStateHandlerTests extends ESTestCase { public void testValidation() { @@ -35,6 +28,11 @@ public String name() { public TransformState transform(Object source, TransformState prevState) throws Exception { return prevState; } + + @Override + public ValidRequest fromXContent(XContentParser parser) throws IOException { + return new ValidRequest(); + } }; handler.validate(new ValidRequest()); @@ -44,53 +42,6 @@ public TransformState transform(Object source, TransformState prevState) throws ); } - public void testAsMapAndFromMap() throws IOException { - String someJSON = """ - { - "persistent": { - "indices.recovery.max_bytes_per_sec": "25mb", - "cluster": { - "remote": { - "cluster_one": { - "seeds": [ - "127.0.0.1:9300" - ] - } - } - } - } - }"""; - - ImmutableClusterStateHandler persistentHandler = new ImmutableClusterStateHandler<>() { - @Override - public String name() { - return "persistent"; - } - - @Override - public TransformState transform(Object source, TransformState prevState) throws Exception { - return prevState; - } - }; - - try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, someJSON)) { - Map originalMap = parser.map(); - - Map internalHandlerMap = Maps.asMap(originalMap.get(persistentHandler.name())); - assertThat(internalHandlerMap.keySet(), containsInAnyOrder("indices.recovery.max_bytes_per_sec", "cluster")); - assertEquals( - "Unsupported input format", - expectThrows(IllegalStateException.class, () -> Maps.asMap(Integer.valueOf(123))).getMessage() - ); - - try (XContentParser newParser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, originalMap)) { - Map newMap = newParser.map(); - - assertThat(newMap.keySet(), containsInAnyOrder("persistent")); - } - } - } - static class ValidRequest extends MasterNodeRequest { @Override public ActionRequestValidationException validate() { diff --git a/server/src/test/java/org/elasticsearch/immutablestate/service/ImmutableClusterStateControllerTests.java b/server/src/test/java/org/elasticsearch/immutablestate/service/ImmutableClusterStateControllerTests.java new file mode 100644 index 0000000000000..baf282805eda0 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/immutablestate/service/ImmutableClusterStateControllerTests.java @@ -0,0 +1,510 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.immutablestate.service; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateAckListener; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.metadata.ImmutableStateErrorMetadata; +import org.elasticsearch.cluster.metadata.ImmutableStateHandlerMetadata; +import org.elasticsearch.cluster.metadata.ImmutableStateMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.routing.RerouteService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.immutablestate.ImmutableClusterStateHandler; +import org.elasticsearch.immutablestate.TransformState; +import org.elasticsearch.immutablestate.action.ImmutableClusterSettingsAction; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ImmutableClusterStateControllerTests extends ESTestCase { + + public void testOperatorController() throws IOException { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = mock(ClusterService.class); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + when(clusterService.state()).thenReturn(state); + + ImmutableClusterStateController controller = new ImmutableClusterStateController( + clusterService, + List.of(new ImmutableClusterSettingsAction(clusterSettings)) + ); + + String testJSON = """ + { + "metadata": { + "version": "1234", + "compatibility": "8.4.0" + }, + "state": { + "cluster_settings": { + "indices.recovery.max_bytes_per_sec": "50mb" + + } + } + """; + + AtomicReference x = new AtomicReference<>(); + + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON)) { + controller.process("operator", parser, (e) -> x.set(e)); + + assertTrue(x.get() instanceof IllegalStateException); + assertEquals("Error processing state change request for operator", x.get().getMessage()); + } + + testJSON = """ + { + "metadata": { + "version": "1234", + "compatibility": "8.4.0" + }, + "state": { + "cluster_settings": { + "indices.recovery.max_bytes_per_sec": "50mb", + "cluster": { + "remote": { + "cluster_one": { + "seeds": [ + "127.0.0.1:9300" + ] + } + } + } + } + } + } + """; + + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON)) { + controller.process("operator", parser, (e) -> { + if (e != null) { + fail("Should not fail"); + } + }); + } + } + + public void testUpdateStateTasks() throws Exception { + ClusterService clusterService = mock(ClusterService.class); + RerouteService rerouteService = mock(RerouteService.class); + + when(clusterService.getRerouteService()).thenReturn(rerouteService); + ClusterState state = ClusterState.builder(new ClusterName("test")).build(); + + ImmutableClusterStateController.ImmutableUpdateStateTaskExecutor taskExecutor = + new ImmutableClusterStateController.ImmutableUpdateStateTaskExecutor(clusterService.getRerouteService()); + + AtomicBoolean successCalled = new AtomicBoolean(false); + + ImmutableStateUpdateStateTask task = spy( + new ImmutableStateUpdateStateTask( + "test", + null, + Collections.emptyMap(), + Collections.emptySet(), + (errorState) -> {}, + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) {} + + @Override + public void onFailure(Exception e) {} + } + ) + ); + + doReturn(state).when(task).execute(any()); + + ClusterStateTaskExecutor.TaskContext taskContext = new ClusterStateTaskExecutor.TaskContext<>() { + @Override + public ImmutableStateUpdateStateTask getTask() { + return task; + } + + @Override + public void success(Runnable onPublicationSuccess) { + onPublicationSuccess.run(); + successCalled.set(true); + } + + @Override + public void success(Consumer publishedStateConsumer) {} + + @Override + public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void success(Consumer publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void onFailure(Exception failure) {} + }; + + ClusterState newState = taskExecutor.execute(state, List.of(taskContext)); + assertEquals(state, newState); + assertTrue(successCalled.get()); + verify(task, times(1)).execute(any()); + + taskExecutor.clusterStatePublished(state); + verify(rerouteService, times(1)).reroute(anyString(), any(), any()); + } + + public void testErrorStateTask() throws Exception { + ClusterState state = ClusterState.builder(new ClusterName("test")).build(); + + ImmutableStateUpdateErrorTask task = spy( + new ImmutableStateUpdateErrorTask( + new ImmutableClusterStateController.ImmutableUpdateErrorState( + "test", + 1L, + List.of("some parse error", "some io error"), + ImmutableStateErrorMetadata.ErrorKind.PARSING + ), + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) {} + + @Override + public void onFailure(Exception e) {} + } + ) + ); + + ImmutableClusterStateController.ImmutableUpdateErrorTaskExecutor.TaskContext taskContext = + new ImmutableClusterStateController.ImmutableUpdateErrorTaskExecutor.TaskContext<>() { + @Override + public ImmutableStateUpdateErrorTask getTask() { + return task; + } + + @Override + public void success(Runnable onPublicationSuccess) { + onPublicationSuccess.run(); + } + + @Override + public void success(Consumer publishedStateConsumer) {} + + @Override + public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void success(Consumer publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void onFailure(Exception failure) {} + }; + + ImmutableClusterStateController.ImmutableUpdateErrorTaskExecutor executor = + new ImmutableClusterStateController.ImmutableUpdateErrorTaskExecutor(); + + ClusterState newState = executor.execute(state, List.of(taskContext)); + + verify(task, times(1)).execute(any()); + + ImmutableStateMetadata operatorMetadata = newState.metadata().immutableStateMetadata().get("test"); + assertNotNull(operatorMetadata); + assertNotNull(operatorMetadata.errorMetadata()); + assertEquals(1L, (long) operatorMetadata.errorMetadata().version()); + assertEquals(ImmutableStateErrorMetadata.ErrorKind.PARSING, operatorMetadata.errorMetadata().errorKind()); + assertThat(operatorMetadata.errorMetadata().errors(), contains("some parse error", "some io error")); + } + + public void testUpdateTaskDuplicateError() { + ImmutableClusterStateHandler> dummy = new ImmutableClusterStateHandler<>() { + @Override + public String name() { + return "one"; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + throw new Exception("anything"); + } + + @Override + public Map fromXContent(XContentParser parser) throws IOException { + return parser.map(); + } + }; + + ImmutableStateUpdateStateTask task = spy( + new ImmutableStateUpdateStateTask( + "namespace_one", + new ImmutableClusterStateController.Package(Map.of("one", "two"), new PackageVersion(1L, Version.CURRENT)), + Map.of("one", dummy), + List.of(dummy.name()), + (errorState) -> {}, + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) {} + + @Override + public void onFailure(Exception e) {} + } + ) + ); + + ImmutableStateHandlerMetadata hmOne = new ImmutableStateHandlerMetadata("one", Set.of("a", "b")); + ImmutableStateErrorMetadata emOne = new ImmutableStateErrorMetadata( + 1L, + ImmutableStateErrorMetadata.ErrorKind.VALIDATION, + List.of("Test error 1", "Test error 2") + ); + + ImmutableStateMetadata operatorMetadata = ImmutableStateMetadata.builder("namespace_one") + .errorMetadata(emOne) + .version(1L) + .putHandler(hmOne) + .build(); + + Metadata metadata = Metadata.builder().put(operatorMetadata).build(); + ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); + + // We exit on duplicate errors before we update the cluster state error metadata + assertEquals( + "Not updating error state because version [1] is less or equal to the last state error version [1]", + expectThrows(ImmutableClusterStateController.IncompatibleVersionException.class, () -> task.execute(state)).getMessage() + ); + + emOne = new ImmutableStateErrorMetadata( + 0L, + ImmutableStateErrorMetadata.ErrorKind.VALIDATION, + List.of("Test error 1", "Test error 2") + ); + + // If we are writing with older error metadata, we should get proper IllegalStateException + operatorMetadata = ImmutableStateMetadata.builder("namespace_one").errorMetadata(emOne).version(0L).putHandler(hmOne).build(); + + metadata = Metadata.builder().put(operatorMetadata).build(); + ClusterState newState = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); + + // We exit on duplicate errors before we update the cluster state error metadata + assertEquals( + "Error processing state change request for namespace_one", + expectThrows(IllegalStateException.class, () -> task.execute(newState)).getMessage() + ); + } + + public void testCheckMetadataVersion() { + ImmutableStateMetadata operatorMetadata = ImmutableStateMetadata.builder("test").version(123L).build(); + + assertTrue( + ImmutableClusterStateController.checkMetadataVersion(operatorMetadata, new PackageVersion(124L, Version.CURRENT), (e) -> {}) + ); + + AtomicReference x = new AtomicReference<>(); + + assertFalse( + ImmutableClusterStateController.checkMetadataVersion( + operatorMetadata, + new PackageVersion(123L, Version.CURRENT), + (e) -> x.set(e) + ) + ); + + assertTrue(x.get() instanceof ImmutableClusterStateController.IncompatibleVersionException); + assertTrue(x.get().getMessage().contains("is less or equal to the current metadata version")); + + assertFalse( + ImmutableClusterStateController.checkMetadataVersion( + operatorMetadata, + new PackageVersion(124L, Version.fromId(Version.CURRENT.id + 1)), + (e) -> x.set(e) + ) + ); + + assertEquals(ImmutableClusterStateController.IncompatibleVersionException.class, x.get().getClass()); + assertTrue(x.get().getMessage().contains("is not compatible with this Elasticsearch node")); + } + + public void testHandlerOrdering() { + ImmutableClusterStateHandler> oh1 = new ImmutableClusterStateHandler<>() { + @Override + public String name() { + return "one"; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + return null; + } + + @Override + public Collection dependencies() { + return List.of("two", "three"); + } + + @Override + public Map fromXContent(XContentParser parser) throws IOException { + return parser.map(); + } + }; + + ImmutableClusterStateHandler> oh2 = new ImmutableClusterStateHandler<>() { + @Override + public String name() { + return "two"; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + return null; + } + + @Override + public Map fromXContent(XContentParser parser) throws IOException { + return parser.map(); + } + }; + + ImmutableClusterStateHandler> oh3 = new ImmutableClusterStateHandler<>() { + @Override + public String name() { + return "three"; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + return null; + } + + @Override + public Collection dependencies() { + return List.of("two"); + } + + @Override + public Map fromXContent(XContentParser parser) throws IOException { + return parser.map(); + } + }; + + ClusterService clusterService = mock(ClusterService.class); + final var controller = new ImmutableClusterStateController(clusterService, List.of(oh1, oh2, oh3)); + Collection ordered = controller.orderedStateHandlers(Set.of("one", "two", "three")); + assertThat(ordered, contains("two", "three", "one")); + + // assure that we bail on unknown handler + assertEquals( + "Unknown settings definition type: four", + expectThrows(IllegalStateException.class, () -> controller.orderedStateHandlers(Set.of("one", "two", "three", "four"))) + .getMessage() + ); + + // assure that we bail on missing dependency link + assertEquals( + "Missing settings dependency definition: one -> three", + expectThrows(IllegalStateException.class, () -> controller.orderedStateHandlers(Set.of("one", "two"))).getMessage() + ); + + // Change the second handler so that we create cycle + oh2 = new ImmutableClusterStateHandler<>() { + @Override + public String name() { + return "two"; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + return null; + } + + @Override + public Collection dependencies() { + return List.of("one"); + } + + @Override + public Map fromXContent(XContentParser parser) throws IOException { + return parser.map(); + } + }; + + final var controller1 = new ImmutableClusterStateController(clusterService, List.of(oh1, oh2)); + + assertThat( + expectThrows(IllegalStateException.class, () -> controller1.orderedStateHandlers(Set.of("one", "two"))).getMessage(), + anyOf( + is("Cycle found in settings dependencies: one -> two -> one"), + is("Cycle found in settings dependencies: two -> one -> two") + ) + ); + } + + public void testDuplicateHandlerNames() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = mock(ClusterService.class); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + when(clusterService.state()).thenReturn(state); + + assertTrue( + expectThrows( + IllegalStateException.class, + () -> new ImmutableClusterStateController( + clusterService, + List.of(new ImmutableClusterSettingsAction(clusterSettings), new TestHandler()) + ) + ).getMessage().startsWith("Duplicate key cluster_settings") + ); + } + + class TestHandler implements ImmutableClusterStateHandler> { + + @Override + public String name() { + return ImmutableClusterSettingsAction.NAME; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + return prevState; + } + + @Override + public Map fromXContent(XContentParser parser) throws IOException { + return parser.map(); + } + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/ImmutableLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/ImmutableLifecycleAction.java index 68b6d994ae63f..0cb6b115d158e 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/ImmutableLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/ImmutableLifecycleAction.java @@ -28,7 +28,6 @@ import java.util.Set; import java.util.stream.Collectors; -import static org.elasticsearch.common.util.Maps.asMap; import static org.elasticsearch.common.xcontent.XContentHelper.mapToXContentParser; /** @@ -38,7 +37,7 @@ * Internally it uses {@link TransportPutLifecycleAction} and * {@link TransportDeleteLifecycleAction} to add, update and delete ILM policies. */ -public class ImmutableLifecycleAction implements ImmutableClusterStateHandler { +public class ImmutableLifecycleAction implements ImmutableClusterStateHandler> { private final NamedXContentRegistry xContentRegistry; private final Client client; @@ -60,18 +59,12 @@ public String name() { @SuppressWarnings("unchecked") public Collection prepare(Object input) throws IOException { List result = new ArrayList<>(); + List policies = (List) input; - Map source = asMap(input); - - for (String name : source.keySet()) { - Map content = (Map) source.get(name); - var config = XContentParserConfiguration.EMPTY.withRegistry(LifecyclePolicyConfig.DEFAULT_X_CONTENT_REGISTRY); - try (XContentParser parser = mapToXContentParser(config, content)) { - LifecyclePolicy policy = LifecyclePolicy.parse(parser, name); - PutLifecycleAction.Request request = new PutLifecycleAction.Request(policy); - validate(request); - result.add(request); - } + for (var policy : policies) { + PutLifecycleAction.Request request = new PutLifecycleAction.Request(policy); + validate(request); + result.add(request); } return result; @@ -108,4 +101,22 @@ public TransformState transform(Object source, TransformState prevState) throws return new TransformState(state, entities); } + + @Override + public List fromXContent(XContentParser parser) throws IOException { + List result = new ArrayList<>(); + + Map source = parser.map(); + var config = XContentParserConfiguration.EMPTY.withRegistry(LifecyclePolicyConfig.DEFAULT_X_CONTENT_REGISTRY); + + for (String name : source.keySet()) { + @SuppressWarnings("unchecked") + Map content = (Map) source.get(name); + try (XContentParser policyParser = mapToXContentParser(config, content)) { + result.add(LifecyclePolicy.parse(policyParser, name)); + } + } + + return result; + } } diff --git a/x-pack/plugin/ilm/src/main/resources/META-INF/services/org.elasticsearch.immutablestate.ImmutableClusterStateHandlerProvider b/x-pack/plugin/ilm/src/main/resources/META-INF/services/org.elasticsearch.immutablestate.ImmutableClusterStateHandlerProvider new file mode 100644 index 0000000000000..5a9c1fe140016 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/resources/META-INF/services/org.elasticsearch.immutablestate.ImmutableClusterStateHandlerProvider @@ -0,0 +1,8 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License +# 2.0; you may not use this file except in compliance with the Elastic License +# 2.0. +# + +org.elasticsearch.xpack.ilm.ILMImmutableStateHandlerProvider diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ImmutableILMStateControllerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ImmutableILMStateControllerTests.java index a9c776f100623..3c72e6e846f54 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ImmutableILMStateControllerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ImmutableILMStateControllerTests.java @@ -7,12 +7,22 @@ package org.elasticsearch.xpack.ilm.action; +import org.elasticsearch.Version; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateAckListener; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.immutablestate.TransformState; +import org.elasticsearch.immutablestate.action.ImmutableClusterSettingsAction; +import org.elasticsearch.immutablestate.service.ImmutableClusterStateController; +import org.elasticsearch.immutablestate.service.ImmutableStateUpdateStateTask; +import org.elasticsearch.immutablestate.service.PackageVersion; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -27,8 +37,10 @@ import org.elasticsearch.xpack.core.ilm.FreezeAction; import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata; import org.elasticsearch.xpack.core.ilm.LifecycleAction; +import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; import org.elasticsearch.xpack.core.ilm.LifecycleType; import org.elasticsearch.xpack.core.ilm.MigrateAction; +import org.elasticsearch.xpack.core.ilm.Phase; import org.elasticsearch.xpack.core.ilm.ReadOnlyAction; import org.elasticsearch.xpack.core.ilm.RolloverAction; import org.elasticsearch.xpack.core.ilm.RollupILMAction; @@ -38,13 +50,22 @@ import org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType; import org.elasticsearch.xpack.core.ilm.UnfollowAction; import org.elasticsearch.xpack.core.ilm.WaitForSnapshotAction; +import org.mockito.stubbing.Answer; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -87,7 +108,7 @@ protected NamedXContentRegistry xContentRegistry() { private TransformState processJSON(ImmutableLifecycleAction action, TransformState prevState, String json) throws Exception { try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, json)) { - return action.transform(parser.map(), prevState); + return action.transform(action.fromXContent(parser), prevState); } } @@ -209,4 +230,207 @@ public void testActionAddRemove() throws Exception { ilmMetadata = updatedState.state().metadata().custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY); assertThat(ilmMetadata.getPolicyMetadatas().keySet(), containsInAnyOrder("my_timeseries_lifecycle2")); } + + private void setupTaskMock(ClusterService clusterService, ClusterState state) { + doAnswer((Answer) invocation -> { + Object[] args = invocation.getArguments(); + + if ((args[3] instanceof ImmutableClusterStateController.ImmutableUpdateStateTaskExecutor) == false) { + fail("Should have gotten a state update task to execute, instead got: " + args[3].getClass().getName()); + } + + ImmutableClusterStateController.ImmutableUpdateStateTaskExecutor task = + (ImmutableClusterStateController.ImmutableUpdateStateTaskExecutor) args[3]; + + ClusterStateTaskExecutor.TaskContext context = new ClusterStateTaskExecutor.TaskContext<>() { + @Override + public ImmutableStateUpdateStateTask getTask() { + return (ImmutableStateUpdateStateTask) args[1]; + } + + @Override + public void success(Runnable onPublicationSuccess) {} + + @Override + public void success(Consumer publishedStateConsumer) {} + + @Override + public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void success(Consumer publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void onFailure(Exception failure) { + fail("Shouldn't fail here"); + } + }; + + task.execute(state, List.of(context)); + + return null; + }).when(clusterService).submitStateUpdateTask(anyString(), any(), any(), any()); + } + + public void testOperatorControllerFromJSONContent() throws IOException { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = mock(ClusterService.class); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + when(clusterService.state()).thenReturn(state); + + ImmutableClusterStateController controller = new ImmutableClusterStateController( + clusterService, + List.of(new ImmutableClusterSettingsAction(clusterSettings)) + ); + + String testJSON = """ + { + "metadata": { + "version": "1234", + "compatibility": "8.4.0" + }, + "state": { + "cluster_settings": { + "indices.recovery.max_bytes_per_sec": "50mb" + }, + "ilm": { + "my_timeseries_lifecycle": { + "phases": { + "hot": { + "min_age": "10s", + "actions": { + "rollover": { + "max_primary_shard_size": "50gb", + "max_age": "30d" + } + } + }, + "delete": { + "min_age": "30s", + "actions": { + } + } + } + }, + "my_timeseries_lifecycle1": { + "phases": { + "warm": { + "min_age": "10s", + "actions": { + "shrink": { + "number_of_shards": 1 + }, + "forcemerge": { + "max_num_segments": 1 + } + } + }, + "delete": { + "min_age": "30s", + "actions": { + } + } + } + } + } + } + }"""; + + AtomicReference x = new AtomicReference<>(); + + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON)) { + controller.process("operator", parser, (e) -> x.set(e)); + + assertTrue(x.get() instanceof IllegalStateException); + assertEquals("Error processing state change request for operator", x.get().getMessage()); + } + + Client client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + + XPackLicenseState licenseState = mock(XPackLicenseState.class); + + controller = new ImmutableClusterStateController( + clusterService, + List.of( + new ImmutableClusterSettingsAction(clusterSettings), + new ImmutableLifecycleAction(xContentRegistry(), client, licenseState) + ) + ); + + setupTaskMock(clusterService, state); + + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON)) { + controller.process("operator", parser, (e) -> { + if (e != null) { + fail("Should not fail"); + } + }); + } + } + + public void testOperatorControllerWithPluginPackage() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = mock(ClusterService.class); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + when(clusterService.state()).thenReturn(state); + + ImmutableClusterStateController controller = new ImmutableClusterStateController( + clusterService, + List.of(new ImmutableClusterSettingsAction(clusterSettings)) + ); + + AtomicReference x = new AtomicReference<>(); + + ImmutableClusterStateController.Package pack = new ImmutableClusterStateController.Package( + Map.of( + ImmutableClusterSettingsAction.NAME, + Map.of("indices.recovery.max_bytes_per_sec", "50mb"), + ImmutableLifecycleAction.NAME, + List.of( + new LifecyclePolicy( + "my_timeseries_lifecycle", + Map.of( + "warm", + new Phase("warm", new TimeValue(10, TimeUnit.SECONDS), Collections.emptyMap()), + "delete", + new Phase("delete", new TimeValue(30, TimeUnit.SECONDS), Collections.emptyMap()) + ) + ) + ) + ), + new PackageVersion(123L, Version.CURRENT) + ); + + controller.process("operator", pack, (e) -> x.set(e)); + + assertTrue(x.get() instanceof IllegalStateException); + assertEquals("Error processing state change request for operator", x.get().getMessage()); + + Client client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + + XPackLicenseState licenseState = mock(XPackLicenseState.class); + + controller = new ImmutableClusterStateController( + clusterService, + List.of( + new ImmutableClusterSettingsAction(clusterSettings), + new ImmutableLifecycleAction(xContentRegistry(), client, licenseState) + ) + ); + + setupTaskMock(clusterService, state); + + controller.process("operator", pack, (e) -> { + if (e != null) { + fail("Should not fail"); + } + }); + } + } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java index 65d4267f541ec..4e739c1111042 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java @@ -778,7 +778,9 @@ public void testSecurityRestHandlerInterceptorCanBeInstalled() throws IllegalAcc null, null, usageService, - null + null, + mock(ClusterService.class), + Collections.emptyList() ); actionModule.initRestHandlers(null);