From f1670493860ef8066e60a18f2dc3f332c9dc1d7a Mon Sep 17 00:00:00 2001 From: Nikola Grcevski Date: Wed, 13 Jul 2022 15:04:52 -0400 Subject: [PATCH 1/5] Immutable cluster state controller --- .../ImmutableClusterStateController.java | 355 ++++++++++++ .../ImmutableStateUpdateErrorTask.java | 64 +++ .../ImmutableStateUpdateStateTask.java | 138 +++++ .../service/PackageVersion.java | 46 ++ .../ImmutableClusterStateControllerTests.java | 510 ++++++++++++++++++ .../ImmutableILMStateControllerTests.java | 224 ++++++++ 6 files changed, 1337 insertions(+) create mode 100644 server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableClusterStateController.java create mode 100644 server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableStateUpdateErrorTask.java create mode 100644 server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableStateUpdateStateTask.java create mode 100644 server/src/main/java/org/elasticsearch/immutablestate/service/PackageVersion.java create mode 100644 server/src/test/java/org/elasticsearch/immutablestate/service/ImmutableClusterStateControllerTests.java diff --git a/server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableClusterStateController.java b/server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableClusterStateController.java new file mode 100644 index 0000000000000..5e763625792f3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableClusterStateController.java @@ -0,0 +1,355 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.immutablestate.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.metadata.ImmutableStateErrorMetadata; +import org.elasticsearch.cluster.metadata.ImmutableStateMetadata; +import org.elasticsearch.cluster.routing.RerouteService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.immutablestate.ImmutableClusterStateHandler; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParser; + +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.elasticsearch.core.Strings.format; + +/** + * Controller class for applying immutable state to ClusterState. + *

+ * This class contains the logic about validation, ordering and applying of + * the cluster state specified in a file or through plugins/modules. + */ +public class ImmutableClusterStateController { + private static final Logger logger = LogManager.getLogger(ImmutableClusterStateController.class); + + public static final ParseField STATE_FIELD = new ParseField("state"); + public static final ParseField METADATA_FIELD = new ParseField("metadata"); + + final Map> handlers; + final ClusterService clusterService; + private final ImmutableUpdateStateTaskExecutor updateStateTaskExecutor; + private final ImmutableUpdateErrorTaskExecutor errorStateTaskExecutor; + + @SuppressWarnings("unchecked") + private final ConstructingObjectParser packageParser = new ConstructingObjectParser<>("immutable_cluster_package", a -> { + List> tuples = (List>) a[0]; + Map stateMap = new HashMap<>(); + for (var tuple : tuples) { + stateMap.put(tuple.v1(), tuple.v2()); + } + + return new Package(stateMap, (PackageVersion) a[1]); + }); + + /** + * Controller class for saving immutable ClusterState. + * @param clusterService for fetching and saving the modified state + */ + public ImmutableClusterStateController(ClusterService clusterService, List> handlerList) { + this.clusterService = clusterService; + this.updateStateTaskExecutor = new ImmutableUpdateStateTaskExecutor(clusterService.getRerouteService()); + this.errorStateTaskExecutor = new ImmutableUpdateErrorTaskExecutor(); + this.handlers = handlerList.stream().collect(Collectors.toMap(ImmutableClusterStateHandler::name, Function.identity())); + packageParser.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, name) -> { + if (handlers.containsKey(name) == false) { + throw new IllegalStateException("Missing handler definition for content key [" + name + "]"); + } + p.nextToken(); + return new Tuple<>(name, handlers.get(name).fromXContent(p)); + }, STATE_FIELD); + packageParser.declareObject(ConstructingObjectParser.constructorArg(), PackageVersion::parse, METADATA_FIELD); + } + + /** + * A package class containing the composite immutable cluster state + *

+ * Apart from the cluster state we want to store as immutable, the package requires that + * you supply the version metadata. This version metadata (see {@link PackageVersion}) is checked to ensure + * that the update is safe, and it's not unnecessarily repeated. + */ + public record Package(Map state, PackageVersion metadata) {} + + /** + * Saves an immutable cluster state for a given 'namespace' from {@link XContentParser} + * + * @param namespace the namespace under which we'll store the immutable keys in the cluster state metadata + * @param parser the XContentParser to process + * @param errorListener a consumer called with IllegalStateException if the content has errors and the + * cluster state cannot be correctly applied, IncompatibleVersionException if the content is stale or + * incompatible with this node {@link Version}, null if successful. + */ + public void process(String namespace, XContentParser parser, Consumer errorListener) { + Package immutableStatePackage; + + try { + immutableStatePackage = packageParser.apply(parser, null); + } catch (Exception e) { + List errors = List.of(e.getMessage()); + recordErrorState(new ImmutableUpdateErrorState(namespace, -1L, errors, ImmutableStateErrorMetadata.ErrorKind.PARSING)); + logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); + + errorListener.accept(new IllegalStateException("Error processing state change request for " + namespace, e)); + return; + } + + process(namespace, immutableStatePackage, errorListener); + } + + /** + * Saves an immutable cluster state for a given 'namespace' from {@link Package} + * + * @param namespace the namespace under which we'll store the immutable keys in the cluster state metadata + * @param immutableStateFilePackage a {@link Package} composite state object to process + * @param errorListener a consumer called with IllegalStateException if the content has errors and the + * cluster state cannot be correctly applied, IncompatibleVersionException if the content is stale or + * incompatible with this node {@link Version}, null if successful. + */ + public void process(String namespace, Package immutableStateFilePackage, Consumer errorListener) { + Map immutableState = immutableStateFilePackage.state; + PackageVersion packageVersion = immutableStateFilePackage.metadata; + + LinkedHashSet orderedHandlers; + try { + orderedHandlers = orderedStateHandlers(immutableState.keySet()); + } catch (Exception e) { + List errors = List.of(e.getMessage()); + recordErrorState( + new ImmutableUpdateErrorState(namespace, packageVersion.version(), errors, ImmutableStateErrorMetadata.ErrorKind.PARSING) + ); + logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); + + errorListener.accept(new IllegalStateException("Error processing state change request for " + namespace, e)); + return; + } + + ClusterState state = clusterService.state(); + ImmutableStateMetadata existingMetadata = state.metadata().immutableStateMetadata().get(namespace); + if (checkMetadataVersion(existingMetadata, packageVersion, errorListener) == false) { + return; + } + + clusterService.submitStateUpdateTask( + "immutable cluster state [" + namespace + "]", + new ImmutableStateUpdateStateTask( + namespace, + immutableStateFilePackage, + handlers, + orderedHandlers, + (errorState) -> recordErrorState(errorState), + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) { + logger.info("Successfully applied new cluster state for namespace [{}]", namespace); + errorListener.accept(null); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to apply immutable cluster state", e); + errorListener.accept(e); + } + } + ), + ClusterStateTaskConfig.build(Priority.URGENT), + updateStateTaskExecutor + ); + } + + // package private for testing + static boolean checkMetadataVersion( + ImmutableStateMetadata existingMetadata, + PackageVersion packageVersion, + Consumer errorListener + ) { + if (Version.CURRENT.before(packageVersion.minCompatibleVersion())) { + errorListener.accept( + new IncompatibleVersionException( + format( + "Cluster state version [%s] is not compatible with this Elasticsearch node", + packageVersion.minCompatibleVersion() + ) + ) + ); + return false; + } + + if (existingMetadata != null && existingMetadata.version() >= packageVersion.version()) { + errorListener.accept( + new IncompatibleVersionException( + format( + "Not updating cluster state because version [%s] is less or equal to the current metadata version [%s]", + packageVersion.version(), + existingMetadata.version() + ) + ) + ); + return false; + } + + return true; + } + + record ImmutableUpdateErrorState( + String namespace, + Long version, + List errors, + ImmutableStateErrorMetadata.ErrorKind errorKind + ) {} + + private void recordErrorState(ImmutableUpdateErrorState state) { + clusterService.submitStateUpdateTask( + "immutable cluster state update error for [ " + state.namespace + "]", + new ImmutableStateUpdateErrorTask(state, new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) { + logger.info("Successfully applied new immutable error state for namespace [{}]", state.namespace); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to apply immutable error cluster state", e); + } + }), + ClusterStateTaskConfig.build(Priority.URGENT), + errorStateTaskExecutor + ); + } + + // package private for testing + LinkedHashSet orderedStateHandlers(Set keys) { + LinkedHashSet orderedHandlers = new LinkedHashSet<>(); + LinkedHashSet dependencyStack = new LinkedHashSet<>(); + + for (String key : keys) { + addStateHandler(key, keys, orderedHandlers, dependencyStack); + } + + return orderedHandlers; + } + + private void addStateHandler(String key, Set keys, LinkedHashSet ordered, LinkedHashSet visited) { + if (visited.contains(key)) { + StringBuilder msg = new StringBuilder("Cycle found in settings dependencies: "); + visited.forEach(s -> { + msg.append(s); + msg.append(" -> "); + }); + msg.append(key); + throw new IllegalStateException(msg.toString()); + } + + if (ordered.contains(key)) { + // already added by another dependent handler + return; + } + + visited.add(key); + ImmutableClusterStateHandler handler = handlers.get(key); + + if (handler == null) { + throw new IllegalStateException("Unknown settings definition type: " + key); + } + + for (String dependency : handler.dependencies()) { + if (keys.contains(dependency) == false) { + throw new IllegalStateException("Missing settings dependency definition: " + key + " -> " + dependency); + } + addStateHandler(dependency, keys, ordered, visited); + } + + visited.remove(key); + ordered.add(key); + } + + /** + * {@link IncompatibleVersionException} is thrown when we try to update the cluster state + * without changing the update version id, or if we try to update cluster state on + * an incompatible Elasticsearch version in mixed cluster mode. + */ + public static class IncompatibleVersionException extends RuntimeException { + public IncompatibleVersionException(String message) { + super(message); + } + } + + /** + * Immutable cluster state update task executor + * + * @param rerouteService instance of {@link RerouteService}, so that we can execute reroute after cluster state is published + */ + public record ImmutableUpdateStateTaskExecutor(RerouteService rerouteService) + implements + ClusterStateTaskExecutor { + + @Override + public ClusterState execute(ClusterState currentState, List> taskContexts) + throws Exception { + for (final var taskContext : taskContexts) { + currentState = taskContext.getTask().execute(currentState); + taskContext.success(() -> taskContext.getTask().listener().onResponse(ActionResponse.Empty.INSTANCE)); + } + return currentState; + } + + @Override + public void clusterStatePublished(ClusterState newClusterState) { + rerouteService.reroute( + "reroute after applying immutable cluster state", + Priority.NORMAL, + ActionListener.wrap( + r -> logger.trace("reroute after applying immutable cluster state succeeded"), + e -> logger.debug("reroute after applying immutable cluster state failed", e) + ) + ); + } + } + + /** + * Immutable cluster error state task executor + *

+ * We use this task executor to record any errors while updating immutable cluster state + */ + public record ImmutableUpdateErrorTaskExecutor() implements ClusterStateTaskExecutor { + @Override + public ClusterState execute(ClusterState currentState, List> taskContexts) + throws Exception { + for (final var taskContext : taskContexts) { + currentState = taskContext.getTask().execute(currentState); + taskContext.success( + () -> taskContext.getTask().listener().delegateFailure((l, s) -> l.onResponse(ActionResponse.Empty.INSTANCE)) + ); + } + return currentState; + } + + @Override + public void clusterStatePublished(ClusterState newClusterState) { + logger.info("Wrote new error state in immutable metadata"); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableStateUpdateErrorTask.java b/server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableStateUpdateErrorTask.java new file mode 100644 index 0000000000000..2e2c276a3ac1e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableStateUpdateErrorTask.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.immutablestate.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.metadata.ImmutableStateErrorMetadata; +import org.elasticsearch.cluster.metadata.ImmutableStateMetadata; +import org.elasticsearch.cluster.metadata.Metadata; + +/** + * Cluster state update task that sets the error state of the immutable cluster state metadata. + *

+ * This is used when an immutable cluster state update encounters error(s) while processing + * the {@link org.elasticsearch.immutablestate.service.ImmutableClusterStateController.Package}. + */ +public class ImmutableStateUpdateErrorTask implements ClusterStateTaskListener { + + private final ImmutableClusterStateController.ImmutableUpdateErrorState errorState; + private final ActionListener listener; + + public ImmutableStateUpdateErrorTask( + ImmutableClusterStateController.ImmutableUpdateErrorState errorState, + ActionListener listener + ) { + this.errorState = errorState; + this.listener = listener; + } + + private static final Logger logger = LogManager.getLogger(ImmutableStateUpdateErrorTask.class); + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + ActionListener listener() { + return listener; + } + + ClusterState execute(ClusterState currentState) { + ClusterState.Builder stateBuilder = new ClusterState.Builder(currentState); + Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + ImmutableStateMetadata immutableMetadata = currentState.metadata().immutableStateMetadata().get(errorState.namespace()); + ImmutableStateMetadata.Builder immMetadataBuilder = ImmutableStateMetadata.builder(errorState.namespace(), immutableMetadata); + immMetadataBuilder.errorMetadata( + new ImmutableStateErrorMetadata(errorState.version(), errorState.errorKind(), errorState.errors()) + ); + metadataBuilder.put(immMetadataBuilder.build()); + ClusterState newState = stateBuilder.metadata(metadataBuilder).build(); + + return newState; + } +} diff --git a/server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableStateUpdateStateTask.java b/server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableStateUpdateStateTask.java new file mode 100644 index 0000000000000..97e005b4f23a2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/immutablestate/service/ImmutableStateUpdateStateTask.java @@ -0,0 +1,138 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.immutablestate.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.metadata.ImmutableStateErrorMetadata; +import org.elasticsearch.cluster.metadata.ImmutableStateHandlerMetadata; +import org.elasticsearch.cluster.metadata.ImmutableStateMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.immutablestate.ImmutableClusterStateHandler; +import org.elasticsearch.immutablestate.TransformState; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; + +import static org.elasticsearch.core.Strings.format; + +/** + * Generic immutable cluster state update task + */ +public class ImmutableStateUpdateStateTask implements ClusterStateTaskListener { + private static final Logger logger = LogManager.getLogger(ImmutableStateUpdateStateTask.class); + + private final String namespace; + private final ImmutableClusterStateController.Package immutableStatePackage; + private final Map> handlers; + private final Collection orderedHandlers; + private final Consumer recordErrorState; + private final ActionListener listener; + + public ImmutableStateUpdateStateTask( + String namespace, + ImmutableClusterStateController.Package immutableStatePackage, + Map> handlers, + Collection orderedHandlers, + Consumer recordErrorState, + ActionListener listener + ) { + this.namespace = namespace; + this.immutableStatePackage = immutableStatePackage; + this.handlers = handlers; + this.orderedHandlers = orderedHandlers; + this.recordErrorState = recordErrorState; + this.listener = listener; + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + ActionListener listener() { + return listener; + } + + protected ClusterState execute(ClusterState state) { + ImmutableStateMetadata existingMetadata = state.metadata().immutableStateMetadata().get(namespace); + Map immutableState = immutableStatePackage.state(); + PackageVersion packageVersion = immutableStatePackage.metadata(); + + var immutableMetadataBuilder = new ImmutableStateMetadata.Builder(namespace).version(packageVersion.version()); + List errors = new ArrayList<>(); + + for (var handlerName : orderedHandlers) { + ImmutableClusterStateHandler handler = handlers.get(handlerName); + try { + Set existingKeys = keysForHandler(existingMetadata, handlerName); + TransformState transformState = handler.transform(immutableState.get(handlerName), new TransformState(state, existingKeys)); + state = transformState.state(); + immutableMetadataBuilder.putHandler(new ImmutableStateHandlerMetadata(handlerName, transformState.keys())); + } catch (Exception e) { + errors.add(format("Error processing %s state change: %s", handler.name(), e.getMessage())); + } + } + + if (errors.isEmpty() == false) { + // Check if we had previous error metadata with version information, don't spam with cluster state updates, if the + // version hasn't been updated. + if (existingMetadata != null + && existingMetadata.errorMetadata() != null + && existingMetadata.errorMetadata().version() >= packageVersion.version()) { + logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); + + throw new ImmutableClusterStateController.IncompatibleVersionException( + format( + "Not updating error state because version [%s] is less or equal to the last state error version [%s]", + packageVersion.version(), + existingMetadata.errorMetadata().version() + ) + ); + } + + recordErrorState.accept( + new ImmutableClusterStateController.ImmutableUpdateErrorState( + namespace, + packageVersion.version(), + errors, + ImmutableStateErrorMetadata.ErrorKind.VALIDATION + ) + ); + logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); + + throw new IllegalStateException("Error processing state change request for " + namespace); + } + + // remove the last error if we had previously encountered any + immutableMetadataBuilder.errorMetadata(null); + + ClusterState.Builder stateBuilder = new ClusterState.Builder(state); + Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).put(immutableMetadataBuilder.build()); + + return stateBuilder.metadata(metadataBuilder).build(); + } + + private Set keysForHandler(ImmutableStateMetadata immutableStateMetadata, String handlerName) { + if (immutableStateMetadata == null || immutableStateMetadata.handlers().get(handlerName) == null) { + return Collections.emptySet(); + } + + return immutableStateMetadata.handlers().get(handlerName).keys(); + } +} diff --git a/server/src/main/java/org/elasticsearch/immutablestate/service/PackageVersion.java b/server/src/main/java/org/elasticsearch/immutablestate/service/PackageVersion.java new file mode 100644 index 0000000000000..95a48112515da --- /dev/null +++ b/server/src/main/java/org/elasticsearch/immutablestate/service/PackageVersion.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.immutablestate.service; + +import org.elasticsearch.Version; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParser; + +/** + * File settings metadata class that holds information about + * versioning and Elasticsearch version compatibility + */ +public record PackageVersion(Long version, Version compatibleWith) { + public static final ParseField VERSION = new ParseField("version"); + public static final ParseField COMPATIBILITY = new ParseField("compatibility"); + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "immutable_cluster_state_version_metadata", + a -> { + Long updateId = Long.parseLong((String) a[0]); + Version minCompatVersion = Version.fromString((String) a[1]); + + return new PackageVersion(updateId, minCompatVersion); + } + ); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), VERSION); + PARSER.declareString(ConstructingObjectParser.constructorArg(), COMPATIBILITY); + } + + public static PackageVersion parse(XContentParser parser, Void v) { + return PARSER.apply(parser, v); + } + + public Version minCompatibleVersion() { + return compatibleWith; + } +} diff --git a/server/src/test/java/org/elasticsearch/immutablestate/service/ImmutableClusterStateControllerTests.java b/server/src/test/java/org/elasticsearch/immutablestate/service/ImmutableClusterStateControllerTests.java new file mode 100644 index 0000000000000..baf282805eda0 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/immutablestate/service/ImmutableClusterStateControllerTests.java @@ -0,0 +1,510 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.immutablestate.service; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateAckListener; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.metadata.ImmutableStateErrorMetadata; +import org.elasticsearch.cluster.metadata.ImmutableStateHandlerMetadata; +import org.elasticsearch.cluster.metadata.ImmutableStateMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.routing.RerouteService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.immutablestate.ImmutableClusterStateHandler; +import org.elasticsearch.immutablestate.TransformState; +import org.elasticsearch.immutablestate.action.ImmutableClusterSettingsAction; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ImmutableClusterStateControllerTests extends ESTestCase { + + public void testOperatorController() throws IOException { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = mock(ClusterService.class); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + when(clusterService.state()).thenReturn(state); + + ImmutableClusterStateController controller = new ImmutableClusterStateController( + clusterService, + List.of(new ImmutableClusterSettingsAction(clusterSettings)) + ); + + String testJSON = """ + { + "metadata": { + "version": "1234", + "compatibility": "8.4.0" + }, + "state": { + "cluster_settings": { + "indices.recovery.max_bytes_per_sec": "50mb" + + } + } + """; + + AtomicReference x = new AtomicReference<>(); + + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON)) { + controller.process("operator", parser, (e) -> x.set(e)); + + assertTrue(x.get() instanceof IllegalStateException); + assertEquals("Error processing state change request for operator", x.get().getMessage()); + } + + testJSON = """ + { + "metadata": { + "version": "1234", + "compatibility": "8.4.0" + }, + "state": { + "cluster_settings": { + "indices.recovery.max_bytes_per_sec": "50mb", + "cluster": { + "remote": { + "cluster_one": { + "seeds": [ + "127.0.0.1:9300" + ] + } + } + } + } + } + } + """; + + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON)) { + controller.process("operator", parser, (e) -> { + if (e != null) { + fail("Should not fail"); + } + }); + } + } + + public void testUpdateStateTasks() throws Exception { + ClusterService clusterService = mock(ClusterService.class); + RerouteService rerouteService = mock(RerouteService.class); + + when(clusterService.getRerouteService()).thenReturn(rerouteService); + ClusterState state = ClusterState.builder(new ClusterName("test")).build(); + + ImmutableClusterStateController.ImmutableUpdateStateTaskExecutor taskExecutor = + new ImmutableClusterStateController.ImmutableUpdateStateTaskExecutor(clusterService.getRerouteService()); + + AtomicBoolean successCalled = new AtomicBoolean(false); + + ImmutableStateUpdateStateTask task = spy( + new ImmutableStateUpdateStateTask( + "test", + null, + Collections.emptyMap(), + Collections.emptySet(), + (errorState) -> {}, + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) {} + + @Override + public void onFailure(Exception e) {} + } + ) + ); + + doReturn(state).when(task).execute(any()); + + ClusterStateTaskExecutor.TaskContext taskContext = new ClusterStateTaskExecutor.TaskContext<>() { + @Override + public ImmutableStateUpdateStateTask getTask() { + return task; + } + + @Override + public void success(Runnable onPublicationSuccess) { + onPublicationSuccess.run(); + successCalled.set(true); + } + + @Override + public void success(Consumer publishedStateConsumer) {} + + @Override + public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void success(Consumer publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void onFailure(Exception failure) {} + }; + + ClusterState newState = taskExecutor.execute(state, List.of(taskContext)); + assertEquals(state, newState); + assertTrue(successCalled.get()); + verify(task, times(1)).execute(any()); + + taskExecutor.clusterStatePublished(state); + verify(rerouteService, times(1)).reroute(anyString(), any(), any()); + } + + public void testErrorStateTask() throws Exception { + ClusterState state = ClusterState.builder(new ClusterName("test")).build(); + + ImmutableStateUpdateErrorTask task = spy( + new ImmutableStateUpdateErrorTask( + new ImmutableClusterStateController.ImmutableUpdateErrorState( + "test", + 1L, + List.of("some parse error", "some io error"), + ImmutableStateErrorMetadata.ErrorKind.PARSING + ), + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) {} + + @Override + public void onFailure(Exception e) {} + } + ) + ); + + ImmutableClusterStateController.ImmutableUpdateErrorTaskExecutor.TaskContext taskContext = + new ImmutableClusterStateController.ImmutableUpdateErrorTaskExecutor.TaskContext<>() { + @Override + public ImmutableStateUpdateErrorTask getTask() { + return task; + } + + @Override + public void success(Runnable onPublicationSuccess) { + onPublicationSuccess.run(); + } + + @Override + public void success(Consumer publishedStateConsumer) {} + + @Override + public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void success(Consumer publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void onFailure(Exception failure) {} + }; + + ImmutableClusterStateController.ImmutableUpdateErrorTaskExecutor executor = + new ImmutableClusterStateController.ImmutableUpdateErrorTaskExecutor(); + + ClusterState newState = executor.execute(state, List.of(taskContext)); + + verify(task, times(1)).execute(any()); + + ImmutableStateMetadata operatorMetadata = newState.metadata().immutableStateMetadata().get("test"); + assertNotNull(operatorMetadata); + assertNotNull(operatorMetadata.errorMetadata()); + assertEquals(1L, (long) operatorMetadata.errorMetadata().version()); + assertEquals(ImmutableStateErrorMetadata.ErrorKind.PARSING, operatorMetadata.errorMetadata().errorKind()); + assertThat(operatorMetadata.errorMetadata().errors(), contains("some parse error", "some io error")); + } + + public void testUpdateTaskDuplicateError() { + ImmutableClusterStateHandler> dummy = new ImmutableClusterStateHandler<>() { + @Override + public String name() { + return "one"; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + throw new Exception("anything"); + } + + @Override + public Map fromXContent(XContentParser parser) throws IOException { + return parser.map(); + } + }; + + ImmutableStateUpdateStateTask task = spy( + new ImmutableStateUpdateStateTask( + "namespace_one", + new ImmutableClusterStateController.Package(Map.of("one", "two"), new PackageVersion(1L, Version.CURRENT)), + Map.of("one", dummy), + List.of(dummy.name()), + (errorState) -> {}, + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) {} + + @Override + public void onFailure(Exception e) {} + } + ) + ); + + ImmutableStateHandlerMetadata hmOne = new ImmutableStateHandlerMetadata("one", Set.of("a", "b")); + ImmutableStateErrorMetadata emOne = new ImmutableStateErrorMetadata( + 1L, + ImmutableStateErrorMetadata.ErrorKind.VALIDATION, + List.of("Test error 1", "Test error 2") + ); + + ImmutableStateMetadata operatorMetadata = ImmutableStateMetadata.builder("namespace_one") + .errorMetadata(emOne) + .version(1L) + .putHandler(hmOne) + .build(); + + Metadata metadata = Metadata.builder().put(operatorMetadata).build(); + ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); + + // We exit on duplicate errors before we update the cluster state error metadata + assertEquals( + "Not updating error state because version [1] is less or equal to the last state error version [1]", + expectThrows(ImmutableClusterStateController.IncompatibleVersionException.class, () -> task.execute(state)).getMessage() + ); + + emOne = new ImmutableStateErrorMetadata( + 0L, + ImmutableStateErrorMetadata.ErrorKind.VALIDATION, + List.of("Test error 1", "Test error 2") + ); + + // If we are writing with older error metadata, we should get proper IllegalStateException + operatorMetadata = ImmutableStateMetadata.builder("namespace_one").errorMetadata(emOne).version(0L).putHandler(hmOne).build(); + + metadata = Metadata.builder().put(operatorMetadata).build(); + ClusterState newState = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); + + // We exit on duplicate errors before we update the cluster state error metadata + assertEquals( + "Error processing state change request for namespace_one", + expectThrows(IllegalStateException.class, () -> task.execute(newState)).getMessage() + ); + } + + public void testCheckMetadataVersion() { + ImmutableStateMetadata operatorMetadata = ImmutableStateMetadata.builder("test").version(123L).build(); + + assertTrue( + ImmutableClusterStateController.checkMetadataVersion(operatorMetadata, new PackageVersion(124L, Version.CURRENT), (e) -> {}) + ); + + AtomicReference x = new AtomicReference<>(); + + assertFalse( + ImmutableClusterStateController.checkMetadataVersion( + operatorMetadata, + new PackageVersion(123L, Version.CURRENT), + (e) -> x.set(e) + ) + ); + + assertTrue(x.get() instanceof ImmutableClusterStateController.IncompatibleVersionException); + assertTrue(x.get().getMessage().contains("is less or equal to the current metadata version")); + + assertFalse( + ImmutableClusterStateController.checkMetadataVersion( + operatorMetadata, + new PackageVersion(124L, Version.fromId(Version.CURRENT.id + 1)), + (e) -> x.set(e) + ) + ); + + assertEquals(ImmutableClusterStateController.IncompatibleVersionException.class, x.get().getClass()); + assertTrue(x.get().getMessage().contains("is not compatible with this Elasticsearch node")); + } + + public void testHandlerOrdering() { + ImmutableClusterStateHandler> oh1 = new ImmutableClusterStateHandler<>() { + @Override + public String name() { + return "one"; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + return null; + } + + @Override + public Collection dependencies() { + return List.of("two", "three"); + } + + @Override + public Map fromXContent(XContentParser parser) throws IOException { + return parser.map(); + } + }; + + ImmutableClusterStateHandler> oh2 = new ImmutableClusterStateHandler<>() { + @Override + public String name() { + return "two"; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + return null; + } + + @Override + public Map fromXContent(XContentParser parser) throws IOException { + return parser.map(); + } + }; + + ImmutableClusterStateHandler> oh3 = new ImmutableClusterStateHandler<>() { + @Override + public String name() { + return "three"; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + return null; + } + + @Override + public Collection dependencies() { + return List.of("two"); + } + + @Override + public Map fromXContent(XContentParser parser) throws IOException { + return parser.map(); + } + }; + + ClusterService clusterService = mock(ClusterService.class); + final var controller = new ImmutableClusterStateController(clusterService, List.of(oh1, oh2, oh3)); + Collection ordered = controller.orderedStateHandlers(Set.of("one", "two", "three")); + assertThat(ordered, contains("two", "three", "one")); + + // assure that we bail on unknown handler + assertEquals( + "Unknown settings definition type: four", + expectThrows(IllegalStateException.class, () -> controller.orderedStateHandlers(Set.of("one", "two", "three", "four"))) + .getMessage() + ); + + // assure that we bail on missing dependency link + assertEquals( + "Missing settings dependency definition: one -> three", + expectThrows(IllegalStateException.class, () -> controller.orderedStateHandlers(Set.of("one", "two"))).getMessage() + ); + + // Change the second handler so that we create cycle + oh2 = new ImmutableClusterStateHandler<>() { + @Override + public String name() { + return "two"; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + return null; + } + + @Override + public Collection dependencies() { + return List.of("one"); + } + + @Override + public Map fromXContent(XContentParser parser) throws IOException { + return parser.map(); + } + }; + + final var controller1 = new ImmutableClusterStateController(clusterService, List.of(oh1, oh2)); + + assertThat( + expectThrows(IllegalStateException.class, () -> controller1.orderedStateHandlers(Set.of("one", "two"))).getMessage(), + anyOf( + is("Cycle found in settings dependencies: one -> two -> one"), + is("Cycle found in settings dependencies: two -> one -> two") + ) + ); + } + + public void testDuplicateHandlerNames() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = mock(ClusterService.class); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + when(clusterService.state()).thenReturn(state); + + assertTrue( + expectThrows( + IllegalStateException.class, + () -> new ImmutableClusterStateController( + clusterService, + List.of(new ImmutableClusterSettingsAction(clusterSettings), new TestHandler()) + ) + ).getMessage().startsWith("Duplicate key cluster_settings") + ); + } + + class TestHandler implements ImmutableClusterStateHandler> { + + @Override + public String name() { + return ImmutableClusterSettingsAction.NAME; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + return prevState; + } + + @Override + public Map fromXContent(XContentParser parser) throws IOException { + return parser.map(); + } + } +} diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ImmutableILMStateControllerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ImmutableILMStateControllerTests.java index 50620d1730bbf..3c72e6e846f54 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ImmutableILMStateControllerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ImmutableILMStateControllerTests.java @@ -7,12 +7,22 @@ package org.elasticsearch.xpack.ilm.action; +import org.elasticsearch.Version; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateAckListener; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.immutablestate.TransformState; +import org.elasticsearch.immutablestate.action.ImmutableClusterSettingsAction; +import org.elasticsearch.immutablestate.service.ImmutableClusterStateController; +import org.elasticsearch.immutablestate.service.ImmutableStateUpdateStateTask; +import org.elasticsearch.immutablestate.service.PackageVersion; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -27,8 +37,10 @@ import org.elasticsearch.xpack.core.ilm.FreezeAction; import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata; import org.elasticsearch.xpack.core.ilm.LifecycleAction; +import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; import org.elasticsearch.xpack.core.ilm.LifecycleType; import org.elasticsearch.xpack.core.ilm.MigrateAction; +import org.elasticsearch.xpack.core.ilm.Phase; import org.elasticsearch.xpack.core.ilm.ReadOnlyAction; import org.elasticsearch.xpack.core.ilm.RolloverAction; import org.elasticsearch.xpack.core.ilm.RollupILMAction; @@ -38,13 +50,22 @@ import org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType; import org.elasticsearch.xpack.core.ilm.UnfollowAction; import org.elasticsearch.xpack.core.ilm.WaitForSnapshotAction; +import org.mockito.stubbing.Answer; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -209,4 +230,207 @@ public void testActionAddRemove() throws Exception { ilmMetadata = updatedState.state().metadata().custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY); assertThat(ilmMetadata.getPolicyMetadatas().keySet(), containsInAnyOrder("my_timeseries_lifecycle2")); } + + private void setupTaskMock(ClusterService clusterService, ClusterState state) { + doAnswer((Answer) invocation -> { + Object[] args = invocation.getArguments(); + + if ((args[3] instanceof ImmutableClusterStateController.ImmutableUpdateStateTaskExecutor) == false) { + fail("Should have gotten a state update task to execute, instead got: " + args[3].getClass().getName()); + } + + ImmutableClusterStateController.ImmutableUpdateStateTaskExecutor task = + (ImmutableClusterStateController.ImmutableUpdateStateTaskExecutor) args[3]; + + ClusterStateTaskExecutor.TaskContext context = new ClusterStateTaskExecutor.TaskContext<>() { + @Override + public ImmutableStateUpdateStateTask getTask() { + return (ImmutableStateUpdateStateTask) args[1]; + } + + @Override + public void success(Runnable onPublicationSuccess) {} + + @Override + public void success(Consumer publishedStateConsumer) {} + + @Override + public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void success(Consumer publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void onFailure(Exception failure) { + fail("Shouldn't fail here"); + } + }; + + task.execute(state, List.of(context)); + + return null; + }).when(clusterService).submitStateUpdateTask(anyString(), any(), any(), any()); + } + + public void testOperatorControllerFromJSONContent() throws IOException { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = mock(ClusterService.class); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + when(clusterService.state()).thenReturn(state); + + ImmutableClusterStateController controller = new ImmutableClusterStateController( + clusterService, + List.of(new ImmutableClusterSettingsAction(clusterSettings)) + ); + + String testJSON = """ + { + "metadata": { + "version": "1234", + "compatibility": "8.4.0" + }, + "state": { + "cluster_settings": { + "indices.recovery.max_bytes_per_sec": "50mb" + }, + "ilm": { + "my_timeseries_lifecycle": { + "phases": { + "hot": { + "min_age": "10s", + "actions": { + "rollover": { + "max_primary_shard_size": "50gb", + "max_age": "30d" + } + } + }, + "delete": { + "min_age": "30s", + "actions": { + } + } + } + }, + "my_timeseries_lifecycle1": { + "phases": { + "warm": { + "min_age": "10s", + "actions": { + "shrink": { + "number_of_shards": 1 + }, + "forcemerge": { + "max_num_segments": 1 + } + } + }, + "delete": { + "min_age": "30s", + "actions": { + } + } + } + } + } + } + }"""; + + AtomicReference x = new AtomicReference<>(); + + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON)) { + controller.process("operator", parser, (e) -> x.set(e)); + + assertTrue(x.get() instanceof IllegalStateException); + assertEquals("Error processing state change request for operator", x.get().getMessage()); + } + + Client client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + + XPackLicenseState licenseState = mock(XPackLicenseState.class); + + controller = new ImmutableClusterStateController( + clusterService, + List.of( + new ImmutableClusterSettingsAction(clusterSettings), + new ImmutableLifecycleAction(xContentRegistry(), client, licenseState) + ) + ); + + setupTaskMock(clusterService, state); + + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON)) { + controller.process("operator", parser, (e) -> { + if (e != null) { + fail("Should not fail"); + } + }); + } + } + + public void testOperatorControllerWithPluginPackage() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = mock(ClusterService.class); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + when(clusterService.state()).thenReturn(state); + + ImmutableClusterStateController controller = new ImmutableClusterStateController( + clusterService, + List.of(new ImmutableClusterSettingsAction(clusterSettings)) + ); + + AtomicReference x = new AtomicReference<>(); + + ImmutableClusterStateController.Package pack = new ImmutableClusterStateController.Package( + Map.of( + ImmutableClusterSettingsAction.NAME, + Map.of("indices.recovery.max_bytes_per_sec", "50mb"), + ImmutableLifecycleAction.NAME, + List.of( + new LifecyclePolicy( + "my_timeseries_lifecycle", + Map.of( + "warm", + new Phase("warm", new TimeValue(10, TimeUnit.SECONDS), Collections.emptyMap()), + "delete", + new Phase("delete", new TimeValue(30, TimeUnit.SECONDS), Collections.emptyMap()) + ) + ) + ) + ), + new PackageVersion(123L, Version.CURRENT) + ); + + controller.process("operator", pack, (e) -> x.set(e)); + + assertTrue(x.get() instanceof IllegalStateException); + assertEquals("Error processing state change request for operator", x.get().getMessage()); + + Client client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + + XPackLicenseState licenseState = mock(XPackLicenseState.class); + + controller = new ImmutableClusterStateController( + clusterService, + List.of( + new ImmutableClusterSettingsAction(clusterSettings), + new ImmutableLifecycleAction(xContentRegistry(), client, licenseState) + ) + ); + + setupTaskMock(clusterService, state); + + controller.process("operator", pack, (e) -> { + if (e != null) { + fail("Should not fail"); + } + }); + } + } From b6adf1d0ef1dae6d8f589e7228bb0601f3b216aa Mon Sep 17 00:00:00 2001 From: Nikola Grcevski Date: Wed, 13 Jul 2022 17:33:24 -0400 Subject: [PATCH 2/5] Refactor with smaller files --- .../org/elasticsearch/ExceptionsHelper.java | 21 ++ .../reservedstate/service/ErrorState.java | 33 ++ .../service/HandlerDependencyManager.java | 79 ++++ .../ImmutableClusterStateController.java | 355 ------------------ .../ImmutableStateUpdateErrorTask.java | 64 ---- .../ReservedClusterStateController.java | 232 ++++++++++++ .../service/ReservedStateChunk.java | 20 + .../service/ReservedStateErrorTask.java | 55 +++ .../ReservedStateErrorTaskExecutor.java | 42 +++ ...Task.java => ReservedStateUpdateTask.java} | 82 ++-- .../ReservedStateUpdateTaskExecutor.java | 51 +++ ...Version.java => ReservedStateVersion.java} | 24 +- ... ReservedClusterStateControllerTests.java} | 141 +++---- ...ReservedLifecycleStateControllerTests.java | 51 ++- 14 files changed, 690 insertions(+), 560 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/reservedstate/service/ErrorState.java create mode 100644 server/src/main/java/org/elasticsearch/reservedstate/service/HandlerDependencyManager.java delete mode 100644 server/src/main/java/org/elasticsearch/reservedstate/service/ImmutableClusterStateController.java delete mode 100644 server/src/main/java/org/elasticsearch/reservedstate/service/ImmutableStateUpdateErrorTask.java create mode 100644 server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateController.java create mode 100644 server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateChunk.java create mode 100644 server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTask.java create mode 100644 server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTaskExecutor.java rename server/src/main/java/org/elasticsearch/reservedstate/service/{ImmutableStateUpdateStateTask.java => ReservedStateUpdateTask.java} (55%) create mode 100644 server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java rename server/src/main/java/org/elasticsearch/reservedstate/service/{PackageVersion.java => ReservedStateVersion.java} (59%) rename server/src/test/java/org/elasticsearch/reservedstate/service/{ImmutableClusterStateControllerTests.java => ReservedClusterStateControllerTests.java} (72%) diff --git a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java index 7e22e1797b527..1f9dcf2008753 100644 --- a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java +++ b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java @@ -34,6 +34,7 @@ import java.util.Optional; import java.util.Queue; import java.util.Set; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -262,6 +263,26 @@ public static void maybeDieOnAnotherThread(final Throwable throwable) { }); } + /** + * Unwraps a throwable and calls an error listener with every unwrapped throwable + * @param t the starting throwable + * @param errorListener a listener function that will be called with every unwrapped throwable we find + * @param limit after how many encountered throwables should we stop unwrapping. Prevents stack overflows. 10 is reasonable max. + */ + public static void unwrap(Throwable t, Consumer errorListener, int limit) { + int counter = 0; + Throwable cause; + Throwable prev = t; + errorListener.accept(prev); + while ((cause = prev.getCause()) != null && (prev != cause)) { + prev = cause; + errorListener.accept(prev); + if (counter++ > limit) { + return; + } + } + } + /** * Deduplicate the failures by exception message and index. */ diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ErrorState.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ErrorState.java new file mode 100644 index 0000000000000..3a09862922172 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ErrorState.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata; + +import java.util.List; + +record ErrorState(String namespace, Long version, List errors, ReservedStateErrorMetadata.ErrorKind errorKind) { + ErrorState(String namespace, Long version, Exception e, ReservedStateErrorMetadata.ErrorKind errorKind) { + this(namespace, version, List.of(unwrapException(e)), errorKind); + } + + public static String unwrapException(Exception e) { + StringBuilder stringBuilder = new StringBuilder(); + ExceptionsHelper.unwrap(e, (t) -> stringBuilder.append(t.getMessage()).append(", "), 10); + if (stringBuilder.length() > 2) { + stringBuilder.setLength(stringBuilder.length() - 2); + } + return stringBuilder.toString(); + } + + public String toString() { + return String.join(", ", errors()); + } +} diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/HandlerDependencyManager.java b/server/src/main/java/org/elasticsearch/reservedstate/service/HandlerDependencyManager.java new file mode 100644 index 0000000000000..8660aa0a1262c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/HandlerDependencyManager.java @@ -0,0 +1,79 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import org.elasticsearch.reservedstate.ReservedClusterStateHandler; + +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +/** + * Package private class used by the {@link ReservedClusterStateController} to order the reserved + * cluster state handlers, by their declared dependencies. See also {@link ReservedClusterStateHandler}. + */ +class HandlerDependencyManager { + /** + * Returns an ordered set ({@link LinkedHashSet}) of the cluster state handlers that need to + * execute for a given list of handler names supplied through the {@link ReservedStateChunk}. + * @param handlers All possible handlers that are registered with the {@link ReservedClusterStateController} + * @param handlerNames Names of handlers found in the {@link ReservedStateChunk} + * @return + */ + static LinkedHashSet orderedStateHandlers(Map> handlers, Set handlerNames) { + LinkedHashSet orderedHandlers = new LinkedHashSet<>(); + LinkedHashSet dependencyStack = new LinkedHashSet<>(); + + for (String key : handlerNames) { + addStateHandler(handlers, key, handlerNames, orderedHandlers, dependencyStack); + } + + return orderedHandlers; + } + + private static void addStateHandler( + Map> handlers, + String key, + Set keys, + LinkedHashSet ordered, + LinkedHashSet visited + ) { + if (visited.contains(key)) { + StringBuilder msg = new StringBuilder("Cycle found in settings dependencies: "); + visited.forEach(s -> { + msg.append(s); + msg.append(" -> "); + }); + msg.append(key); + throw new IllegalStateException(msg.toString()); + } + + if (ordered.contains(key)) { + // already added by another dependent handler + return; + } + + visited.add(key); + ReservedClusterStateHandler handler = handlers.get(key); + + if (handler == null) { + throw new IllegalStateException("Unknown handler type: " + key); + } + + for (String dependency : handler.dependencies()) { + if (keys.contains(dependency) == false) { + throw new IllegalStateException("Missing handler dependency definition: " + key + " -> " + dependency); + } + addStateHandler(handlers, dependency, keys, ordered, visited); + } + + visited.remove(key); + ordered.add(key); + } +} diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ImmutableClusterStateController.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ImmutableClusterStateController.java deleted file mode 100644 index 5e763625792f3..0000000000000 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ImmutableClusterStateController.java +++ /dev/null @@ -1,355 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.immutablestate.service; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateTaskConfig; -import org.elasticsearch.cluster.ClusterStateTaskExecutor; -import org.elasticsearch.cluster.metadata.ImmutableStateErrorMetadata; -import org.elasticsearch.cluster.metadata.ImmutableStateMetadata; -import org.elasticsearch.cluster.routing.RerouteService; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Priority; -import org.elasticsearch.core.Tuple; -import org.elasticsearch.immutablestate.ImmutableClusterStateHandler; -import org.elasticsearch.xcontent.ConstructingObjectParser; -import org.elasticsearch.xcontent.ParseField; -import org.elasticsearch.xcontent.XContentParser; - -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.stream.Collectors; - -import static org.elasticsearch.core.Strings.format; - -/** - * Controller class for applying immutable state to ClusterState. - *

- * This class contains the logic about validation, ordering and applying of - * the cluster state specified in a file or through plugins/modules. - */ -public class ImmutableClusterStateController { - private static final Logger logger = LogManager.getLogger(ImmutableClusterStateController.class); - - public static final ParseField STATE_FIELD = new ParseField("state"); - public static final ParseField METADATA_FIELD = new ParseField("metadata"); - - final Map> handlers; - final ClusterService clusterService; - private final ImmutableUpdateStateTaskExecutor updateStateTaskExecutor; - private final ImmutableUpdateErrorTaskExecutor errorStateTaskExecutor; - - @SuppressWarnings("unchecked") - private final ConstructingObjectParser packageParser = new ConstructingObjectParser<>("immutable_cluster_package", a -> { - List> tuples = (List>) a[0]; - Map stateMap = new HashMap<>(); - for (var tuple : tuples) { - stateMap.put(tuple.v1(), tuple.v2()); - } - - return new Package(stateMap, (PackageVersion) a[1]); - }); - - /** - * Controller class for saving immutable ClusterState. - * @param clusterService for fetching and saving the modified state - */ - public ImmutableClusterStateController(ClusterService clusterService, List> handlerList) { - this.clusterService = clusterService; - this.updateStateTaskExecutor = new ImmutableUpdateStateTaskExecutor(clusterService.getRerouteService()); - this.errorStateTaskExecutor = new ImmutableUpdateErrorTaskExecutor(); - this.handlers = handlerList.stream().collect(Collectors.toMap(ImmutableClusterStateHandler::name, Function.identity())); - packageParser.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, name) -> { - if (handlers.containsKey(name) == false) { - throw new IllegalStateException("Missing handler definition for content key [" + name + "]"); - } - p.nextToken(); - return new Tuple<>(name, handlers.get(name).fromXContent(p)); - }, STATE_FIELD); - packageParser.declareObject(ConstructingObjectParser.constructorArg(), PackageVersion::parse, METADATA_FIELD); - } - - /** - * A package class containing the composite immutable cluster state - *

- * Apart from the cluster state we want to store as immutable, the package requires that - * you supply the version metadata. This version metadata (see {@link PackageVersion}) is checked to ensure - * that the update is safe, and it's not unnecessarily repeated. - */ - public record Package(Map state, PackageVersion metadata) {} - - /** - * Saves an immutable cluster state for a given 'namespace' from {@link XContentParser} - * - * @param namespace the namespace under which we'll store the immutable keys in the cluster state metadata - * @param parser the XContentParser to process - * @param errorListener a consumer called with IllegalStateException if the content has errors and the - * cluster state cannot be correctly applied, IncompatibleVersionException if the content is stale or - * incompatible with this node {@link Version}, null if successful. - */ - public void process(String namespace, XContentParser parser, Consumer errorListener) { - Package immutableStatePackage; - - try { - immutableStatePackage = packageParser.apply(parser, null); - } catch (Exception e) { - List errors = List.of(e.getMessage()); - recordErrorState(new ImmutableUpdateErrorState(namespace, -1L, errors, ImmutableStateErrorMetadata.ErrorKind.PARSING)); - logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); - - errorListener.accept(new IllegalStateException("Error processing state change request for " + namespace, e)); - return; - } - - process(namespace, immutableStatePackage, errorListener); - } - - /** - * Saves an immutable cluster state for a given 'namespace' from {@link Package} - * - * @param namespace the namespace under which we'll store the immutable keys in the cluster state metadata - * @param immutableStateFilePackage a {@link Package} composite state object to process - * @param errorListener a consumer called with IllegalStateException if the content has errors and the - * cluster state cannot be correctly applied, IncompatibleVersionException if the content is stale or - * incompatible with this node {@link Version}, null if successful. - */ - public void process(String namespace, Package immutableStateFilePackage, Consumer errorListener) { - Map immutableState = immutableStateFilePackage.state; - PackageVersion packageVersion = immutableStateFilePackage.metadata; - - LinkedHashSet orderedHandlers; - try { - orderedHandlers = orderedStateHandlers(immutableState.keySet()); - } catch (Exception e) { - List errors = List.of(e.getMessage()); - recordErrorState( - new ImmutableUpdateErrorState(namespace, packageVersion.version(), errors, ImmutableStateErrorMetadata.ErrorKind.PARSING) - ); - logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); - - errorListener.accept(new IllegalStateException("Error processing state change request for " + namespace, e)); - return; - } - - ClusterState state = clusterService.state(); - ImmutableStateMetadata existingMetadata = state.metadata().immutableStateMetadata().get(namespace); - if (checkMetadataVersion(existingMetadata, packageVersion, errorListener) == false) { - return; - } - - clusterService.submitStateUpdateTask( - "immutable cluster state [" + namespace + "]", - new ImmutableStateUpdateStateTask( - namespace, - immutableStateFilePackage, - handlers, - orderedHandlers, - (errorState) -> recordErrorState(errorState), - new ActionListener<>() { - @Override - public void onResponse(ActionResponse.Empty empty) { - logger.info("Successfully applied new cluster state for namespace [{}]", namespace); - errorListener.accept(null); - } - - @Override - public void onFailure(Exception e) { - logger.error("Failed to apply immutable cluster state", e); - errorListener.accept(e); - } - } - ), - ClusterStateTaskConfig.build(Priority.URGENT), - updateStateTaskExecutor - ); - } - - // package private for testing - static boolean checkMetadataVersion( - ImmutableStateMetadata existingMetadata, - PackageVersion packageVersion, - Consumer errorListener - ) { - if (Version.CURRENT.before(packageVersion.minCompatibleVersion())) { - errorListener.accept( - new IncompatibleVersionException( - format( - "Cluster state version [%s] is not compatible with this Elasticsearch node", - packageVersion.minCompatibleVersion() - ) - ) - ); - return false; - } - - if (existingMetadata != null && existingMetadata.version() >= packageVersion.version()) { - errorListener.accept( - new IncompatibleVersionException( - format( - "Not updating cluster state because version [%s] is less or equal to the current metadata version [%s]", - packageVersion.version(), - existingMetadata.version() - ) - ) - ); - return false; - } - - return true; - } - - record ImmutableUpdateErrorState( - String namespace, - Long version, - List errors, - ImmutableStateErrorMetadata.ErrorKind errorKind - ) {} - - private void recordErrorState(ImmutableUpdateErrorState state) { - clusterService.submitStateUpdateTask( - "immutable cluster state update error for [ " + state.namespace + "]", - new ImmutableStateUpdateErrorTask(state, new ActionListener<>() { - @Override - public void onResponse(ActionResponse.Empty empty) { - logger.info("Successfully applied new immutable error state for namespace [{}]", state.namespace); - } - - @Override - public void onFailure(Exception e) { - logger.error("Failed to apply immutable error cluster state", e); - } - }), - ClusterStateTaskConfig.build(Priority.URGENT), - errorStateTaskExecutor - ); - } - - // package private for testing - LinkedHashSet orderedStateHandlers(Set keys) { - LinkedHashSet orderedHandlers = new LinkedHashSet<>(); - LinkedHashSet dependencyStack = new LinkedHashSet<>(); - - for (String key : keys) { - addStateHandler(key, keys, orderedHandlers, dependencyStack); - } - - return orderedHandlers; - } - - private void addStateHandler(String key, Set keys, LinkedHashSet ordered, LinkedHashSet visited) { - if (visited.contains(key)) { - StringBuilder msg = new StringBuilder("Cycle found in settings dependencies: "); - visited.forEach(s -> { - msg.append(s); - msg.append(" -> "); - }); - msg.append(key); - throw new IllegalStateException(msg.toString()); - } - - if (ordered.contains(key)) { - // already added by another dependent handler - return; - } - - visited.add(key); - ImmutableClusterStateHandler handler = handlers.get(key); - - if (handler == null) { - throw new IllegalStateException("Unknown settings definition type: " + key); - } - - for (String dependency : handler.dependencies()) { - if (keys.contains(dependency) == false) { - throw new IllegalStateException("Missing settings dependency definition: " + key + " -> " + dependency); - } - addStateHandler(dependency, keys, ordered, visited); - } - - visited.remove(key); - ordered.add(key); - } - - /** - * {@link IncompatibleVersionException} is thrown when we try to update the cluster state - * without changing the update version id, or if we try to update cluster state on - * an incompatible Elasticsearch version in mixed cluster mode. - */ - public static class IncompatibleVersionException extends RuntimeException { - public IncompatibleVersionException(String message) { - super(message); - } - } - - /** - * Immutable cluster state update task executor - * - * @param rerouteService instance of {@link RerouteService}, so that we can execute reroute after cluster state is published - */ - public record ImmutableUpdateStateTaskExecutor(RerouteService rerouteService) - implements - ClusterStateTaskExecutor { - - @Override - public ClusterState execute(ClusterState currentState, List> taskContexts) - throws Exception { - for (final var taskContext : taskContexts) { - currentState = taskContext.getTask().execute(currentState); - taskContext.success(() -> taskContext.getTask().listener().onResponse(ActionResponse.Empty.INSTANCE)); - } - return currentState; - } - - @Override - public void clusterStatePublished(ClusterState newClusterState) { - rerouteService.reroute( - "reroute after applying immutable cluster state", - Priority.NORMAL, - ActionListener.wrap( - r -> logger.trace("reroute after applying immutable cluster state succeeded"), - e -> logger.debug("reroute after applying immutable cluster state failed", e) - ) - ); - } - } - - /** - * Immutable cluster error state task executor - *

- * We use this task executor to record any errors while updating immutable cluster state - */ - public record ImmutableUpdateErrorTaskExecutor() implements ClusterStateTaskExecutor { - @Override - public ClusterState execute(ClusterState currentState, List> taskContexts) - throws Exception { - for (final var taskContext : taskContexts) { - currentState = taskContext.getTask().execute(currentState); - taskContext.success( - () -> taskContext.getTask().listener().delegateFailure((l, s) -> l.onResponse(ActionResponse.Empty.INSTANCE)) - ); - } - return currentState; - } - - @Override - public void clusterStatePublished(ClusterState newClusterState) { - logger.info("Wrote new error state in immutable metadata"); - } - } -} diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ImmutableStateUpdateErrorTask.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ImmutableStateUpdateErrorTask.java deleted file mode 100644 index 2e2c276a3ac1e..0000000000000 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ImmutableStateUpdateErrorTask.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.immutablestate.service; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateTaskListener; -import org.elasticsearch.cluster.metadata.ImmutableStateErrorMetadata; -import org.elasticsearch.cluster.metadata.ImmutableStateMetadata; -import org.elasticsearch.cluster.metadata.Metadata; - -/** - * Cluster state update task that sets the error state of the immutable cluster state metadata. - *

- * This is used when an immutable cluster state update encounters error(s) while processing - * the {@link org.elasticsearch.immutablestate.service.ImmutableClusterStateController.Package}. - */ -public class ImmutableStateUpdateErrorTask implements ClusterStateTaskListener { - - private final ImmutableClusterStateController.ImmutableUpdateErrorState errorState; - private final ActionListener listener; - - public ImmutableStateUpdateErrorTask( - ImmutableClusterStateController.ImmutableUpdateErrorState errorState, - ActionListener listener - ) { - this.errorState = errorState; - this.listener = listener; - } - - private static final Logger logger = LogManager.getLogger(ImmutableStateUpdateErrorTask.class); - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - - ActionListener listener() { - return listener; - } - - ClusterState execute(ClusterState currentState) { - ClusterState.Builder stateBuilder = new ClusterState.Builder(currentState); - Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); - ImmutableStateMetadata immutableMetadata = currentState.metadata().immutableStateMetadata().get(errorState.namespace()); - ImmutableStateMetadata.Builder immMetadataBuilder = ImmutableStateMetadata.builder(errorState.namespace(), immutableMetadata); - immMetadataBuilder.errorMetadata( - new ImmutableStateErrorMetadata(errorState.version(), errorState.errorKind(), errorState.errors()) - ); - metadataBuilder.put(immMetadataBuilder.build()); - ClusterState newState = stateBuilder.metadata(metadataBuilder).build(); - - return newState; - } -} diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateController.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateController.java new file mode 100644 index 0000000000000..8149da0afebf7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateController.java @@ -0,0 +1,232 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata; +import org.elasticsearch.cluster.metadata.ReservedStateMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.reservedstate.ReservedClusterStateHandler; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParser; + +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.elasticsearch.core.Strings.format; + +/** + * Controller class for storing and reserving a portion of the {@link ClusterState} + *

+ * This class contains the logic about validation, ordering and applying of + * the cluster state specified in a file or through plugins/modules. Reserved cluster state + * cannot be modified throught the REST APIs, only through this controller class. + */ +public class ReservedClusterStateController { + private static final Logger logger = LogManager.getLogger(ReservedClusterStateController.class); + + public static final ParseField STATE_FIELD = new ParseField("state"); + public static final ParseField METADATA_FIELD = new ParseField("metadata"); + + final Map> handlers; + final ClusterService clusterService; + private final ReservedStateUpdateTaskExecutor updateStateTaskExecutor; + private final ReservedStateErrorTaskExecutor errorStateTaskExecutor; + + @SuppressWarnings("unchecked") + private final ConstructingObjectParser stateChunkParser = new ConstructingObjectParser<>( + "reserved_state_chunk", + a -> { + List> tuples = (List>) a[0]; + Map stateMap = new HashMap<>(); + for (var tuple : tuples) { + stateMap.put(tuple.v1(), tuple.v2()); + } + + return new ReservedStateChunk(stateMap, (ReservedStateVersion) a[1]); + } + ); + + /** + * Controller class for saving and reserving {@link ClusterState}. + * @param clusterService for fetching and saving the modified state + * @param handlerList a list of reserved state handlers, which we use to transform the state + */ + public ReservedClusterStateController(ClusterService clusterService, List> handlerList) { + this.clusterService = clusterService; + this.updateStateTaskExecutor = new ReservedStateUpdateTaskExecutor(clusterService.getRerouteService()); + this.errorStateTaskExecutor = new ReservedStateErrorTaskExecutor(); + this.handlers = handlerList.stream().collect(Collectors.toMap(ReservedClusterStateHandler::name, Function.identity())); + stateChunkParser.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, name) -> { + if (handlers.containsKey(name) == false) { + throw new IllegalStateException("Missing handler definition for content key [" + name + "]"); + } + p.nextToken(); + return new Tuple<>(name, handlers.get(name).fromXContent(p)); + }, STATE_FIELD); + stateChunkParser.declareObject(ConstructingObjectParser.constructorArg(), ReservedStateVersion::parse, METADATA_FIELD); + } + + /** + * Saves and reserves a chunk of the cluster state under a given 'namespace' from {@link XContentParser} + * + * @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata + * @param parser the XContentParser to process + * @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the + * cluster state cannot be correctly applied, {@link ReservedStateVersion.IncompatibleVersionException} if the content is stale + * or incompatible with this node {@link Version}, null if successful. + */ + public void process(String namespace, XContentParser parser, Consumer errorListener) { + ReservedStateChunk stateChunk; + + try { + stateChunk = stateChunkParser.apply(parser, null); + } catch (Exception e) { + ErrorState errorState = new ErrorState(namespace, -1L, e, ReservedStateErrorMetadata.ErrorKind.PARSING); + saveErrorState(errorState); + logger.error("error processing state change request for [{}] with the following errors [{}]", namespace, errorState); + + errorListener.accept(new IllegalStateException("Error processing state change request for " + namespace, e)); + return; + } + + process(namespace, stateChunk, errorListener); + } + + /** + * Saves and reserves a chunk of the cluster state under a given 'namespace' from {@link XContentParser} + * + * @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata + * @param reservedStateChunk a {@link ReservedStateChunk} composite state object to process + * @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the + * cluster state cannot be correctly applied, {@link ReservedStateVersion.IncompatibleVersionException} if the content is stale + * or incompatible with this node {@link Version}, null if successful. + */ + public void process(String namespace, ReservedStateChunk reservedStateChunk, Consumer errorListener) { + Map reservedState = reservedStateChunk.state(); + ReservedStateVersion reservedStateVersion = reservedStateChunk.metadata(); + + LinkedHashSet orderedHandlers; + try { + orderedHandlers = HandlerDependencyManager.orderedStateHandlers(handlers, reservedState.keySet()); + } catch (Exception e) { + ErrorState errorState = new ErrorState( + namespace, + reservedStateVersion.version(), + e, + ReservedStateErrorMetadata.ErrorKind.PARSING + ); + + saveErrorState(errorState); + logger.error("error processing state change request for [{}] with the following errors [{}]", namespace, errorState); + + errorListener.accept(new IllegalStateException("Error processing state change request for " + namespace, e)); + return; + } + + ClusterState state = clusterService.state(); + ReservedStateMetadata existingMetadata = state.metadata().reservedStateMetadata().get(namespace); + if (checkMetadataVersion(existingMetadata, reservedStateVersion, errorListener) == false) { + return; + } + + clusterService.submitStateUpdateTask( + "reserved cluster state [" + namespace + "]", + new ReservedStateUpdateTask( + namespace, + reservedStateChunk, + handlers, + orderedHandlers, + (errorState) -> saveErrorState(errorState), + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) { + logger.info("Successfully applied new reserved cluster state for namespace [{}]", namespace); + errorListener.accept(null); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to apply reserved cluster state", e); + errorListener.accept(e); + } + } + ), + ClusterStateTaskConfig.build(Priority.URGENT), + updateStateTaskExecutor + ); + } + + // package private for testing + static boolean checkMetadataVersion( + ReservedStateMetadata existingMetadata, + ReservedStateVersion reservedStateVersion, + Consumer errorListener + ) { + if (Version.CURRENT.before(reservedStateVersion.minCompatibleVersion())) { + errorListener.accept( + new ReservedStateVersion.IncompatibleVersionException( + format( + "Cluster state version [%s] is not compatible with this Elasticsearch node", + reservedStateVersion.minCompatibleVersion() + ) + ) + ); + return false; + } + + if (existingMetadata != null && existingMetadata.version() >= reservedStateVersion.version()) { + errorListener.accept( + new ReservedStateVersion.IncompatibleVersionException( + format( + "Not updating cluster state because version [%s] is less or equal to the current metadata version [%s]", + reservedStateVersion.version(), + existingMetadata.version() + ) + ) + ); + return false; + } + + return true; + } + + private void saveErrorState(ErrorState state) { + clusterService.submitStateUpdateTask( + "reserved cluster state update error for [ " + state.namespace() + "]", + new ReservedStateErrorTask(state, new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) { + logger.info("Successfully applied new reserved error state for namespace [{}]", state.namespace()); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to apply reserved error cluster state", e); + } + }), + ClusterStateTaskConfig.build(Priority.URGENT), + errorStateTaskExecutor + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateChunk.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateChunk.java new file mode 100644 index 0000000000000..ee7e0892e2aba --- /dev/null +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateChunk.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import java.util.Map; + +/** + * Tuple class containing the cluster state to be saved and reserved and the version info + *

+ * Apart from the cluster state we want to store and reserve, the chunk requires that + * you supply the version metadata. This version metadata (see {@link ReservedStateVersion}) is checked to ensure + * that the update is safe, and it's not unnecessarily repeated. + */ +public record ReservedStateChunk(Map state, ReservedStateVersion metadata) {} diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTask.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTask.java new file mode 100644 index 0000000000000..b6380255f87ee --- /dev/null +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTask.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata; +import org.elasticsearch.cluster.metadata.ReservedStateMetadata; + +/** + * Cluster state update task that sets the error state of the reserved cluster state metadata. + *

+ * This is used when a reserved cluster state update encounters error(s) while processing + * the {@link ReservedStateChunk}. + */ +public class ReservedStateErrorTask implements ClusterStateTaskListener { + + private final ErrorState errorState; + private final ActionListener listener; + + public ReservedStateErrorTask(ErrorState errorState, ActionListener listener) { + this.errorState = errorState; + this.listener = listener; + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + ActionListener listener() { + return listener; + } + + ClusterState execute(ClusterState currentState) { + ClusterState.Builder stateBuilder = new ClusterState.Builder(currentState); + Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + ReservedStateMetadata reservedMetadata = currentState.metadata().reservedStateMetadata().get(errorState.namespace()); + ReservedStateMetadata.Builder resMetadataBuilder = ReservedStateMetadata.builder(errorState.namespace(), reservedMetadata); + resMetadataBuilder.errorMetadata(new ReservedStateErrorMetadata(errorState.version(), errorState.errorKind(), errorState.errors())); + metadataBuilder.put(resMetadataBuilder.build()); + ClusterState newState = stateBuilder.metadata(metadataBuilder).build(); + + return newState; + } +} diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTaskExecutor.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTaskExecutor.java new file mode 100644 index 0000000000000..bdb11c849934f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTaskExecutor.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; + +import java.util.List; + +/** + * Reserved cluster error state task executor + *

+ * We use this task executor to record any errors while updating and reserving the cluster state + */ +record ReservedStateErrorTaskExecutor() implements ClusterStateTaskExecutor { + private static final Logger logger = LogManager.getLogger(ReservedStateErrorTaskExecutor.class); + + @Override + public ClusterState execute(ClusterState currentState, List> taskContexts) { + for (final var taskContext : taskContexts) { + currentState = taskContext.getTask().execute(currentState); + taskContext.success( + () -> taskContext.getTask().listener().delegateFailure((l, s) -> l.onResponse(ActionResponse.Empty.INSTANCE)) + ); + } + return currentState; + } + + @Override + public void clusterStatePublished(ClusterState newClusterState) { + logger.info("Wrote new error state in immutable metadata"); + } +} diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ImmutableStateUpdateStateTask.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java similarity index 55% rename from server/src/main/java/org/elasticsearch/reservedstate/service/ImmutableStateUpdateStateTask.java rename to server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java index 97e005b4f23a2..1cf0be3342256 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ImmutableStateUpdateStateTask.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -package org.elasticsearch.immutablestate.service; +package org.elasticsearch.reservedstate.service; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -14,12 +14,12 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskListener; -import org.elasticsearch.cluster.metadata.ImmutableStateErrorMetadata; -import org.elasticsearch.cluster.metadata.ImmutableStateHandlerMetadata; -import org.elasticsearch.cluster.metadata.ImmutableStateMetadata; import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.immutablestate.ImmutableClusterStateHandler; -import org.elasticsearch.immutablestate.TransformState; +import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata; +import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata; +import org.elasticsearch.cluster.metadata.ReservedStateMetadata; +import org.elasticsearch.reservedstate.ReservedClusterStateHandler; +import org.elasticsearch.reservedstate.TransformState; import java.util.ArrayList; import java.util.Collection; @@ -30,33 +30,38 @@ import java.util.function.Consumer; import static org.elasticsearch.core.Strings.format; +import static org.elasticsearch.reservedstate.service.ErrorState.unwrapException; /** - * Generic immutable cluster state update task + * Generic task to update and reserve parts of the cluster state + * + *

+ * Reserved cluster state can only be modified by using the {@link ReservedClusterStateController}. Updating + * the reserved cluster state through REST APIs is not permitted. */ -public class ImmutableStateUpdateStateTask implements ClusterStateTaskListener { - private static final Logger logger = LogManager.getLogger(ImmutableStateUpdateStateTask.class); +public class ReservedStateUpdateTask implements ClusterStateTaskListener { + private static final Logger logger = LogManager.getLogger(ReservedStateUpdateTask.class); private final String namespace; - private final ImmutableClusterStateController.Package immutableStatePackage; - private final Map> handlers; + private final ReservedStateChunk stateChunk; + private final Map> handlers; private final Collection orderedHandlers; - private final Consumer recordErrorState; + private final Consumer errorReporter; private final ActionListener listener; - public ImmutableStateUpdateStateTask( + public ReservedStateUpdateTask( String namespace, - ImmutableClusterStateController.Package immutableStatePackage, - Map> handlers, + ReservedStateChunk stateChunk, + Map> handlers, Collection orderedHandlers, - Consumer recordErrorState, + Consumer errorReporter, ActionListener listener ) { this.namespace = namespace; - this.immutableStatePackage = immutableStatePackage; + this.stateChunk = stateChunk; this.handlers = handlers; this.orderedHandlers = orderedHandlers; - this.recordErrorState = recordErrorState; + this.errorReporter = errorReporter; this.listener = listener; } @@ -70,22 +75,22 @@ ActionListener listener() { } protected ClusterState execute(ClusterState state) { - ImmutableStateMetadata existingMetadata = state.metadata().immutableStateMetadata().get(namespace); - Map immutableState = immutableStatePackage.state(); - PackageVersion packageVersion = immutableStatePackage.metadata(); + ReservedStateMetadata existingMetadata = state.metadata().reservedStateMetadata().get(namespace); + Map reservedState = stateChunk.state(); + ReservedStateVersion reservedStateVersion = stateChunk.metadata(); - var immutableMetadataBuilder = new ImmutableStateMetadata.Builder(namespace).version(packageVersion.version()); + var reservedMetadataBuilder = new ReservedStateMetadata.Builder(namespace).version(reservedStateVersion.version()); List errors = new ArrayList<>(); for (var handlerName : orderedHandlers) { - ImmutableClusterStateHandler handler = handlers.get(handlerName); + ReservedClusterStateHandler handler = handlers.get(handlerName); try { Set existingKeys = keysForHandler(existingMetadata, handlerName); - TransformState transformState = handler.transform(immutableState.get(handlerName), new TransformState(state, existingKeys)); + TransformState transformState = handler.transform(reservedState.get(handlerName), new TransformState(state, existingKeys)); state = transformState.state(); - immutableMetadataBuilder.putHandler(new ImmutableStateHandlerMetadata(handlerName, transformState.keys())); + reservedMetadataBuilder.putHandler(new ReservedStateHandlerMetadata(handlerName, transformState.keys())); } catch (Exception e) { - errors.add(format("Error processing %s state change: %s", handler.name(), e.getMessage())); + errors.add(format("Error processing %s state change: %s", handler.name(), unwrapException(e))); } } @@ -94,25 +99,20 @@ protected ClusterState execute(ClusterState state) { // version hasn't been updated. if (existingMetadata != null && existingMetadata.errorMetadata() != null - && existingMetadata.errorMetadata().version() >= packageVersion.version()) { + && existingMetadata.errorMetadata().version() >= reservedStateVersion.version()) { logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); - throw new ImmutableClusterStateController.IncompatibleVersionException( + throw new ReservedStateVersion.IncompatibleVersionException( format( "Not updating error state because version [%s] is less or equal to the last state error version [%s]", - packageVersion.version(), + reservedStateVersion.version(), existingMetadata.errorMetadata().version() ) ); } - recordErrorState.accept( - new ImmutableClusterStateController.ImmutableUpdateErrorState( - namespace, - packageVersion.version(), - errors, - ImmutableStateErrorMetadata.ErrorKind.VALIDATION - ) + errorReporter.accept( + new ErrorState(namespace, reservedStateVersion.version(), errors, ReservedStateErrorMetadata.ErrorKind.VALIDATION) ); logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); @@ -120,19 +120,19 @@ protected ClusterState execute(ClusterState state) { } // remove the last error if we had previously encountered any - immutableMetadataBuilder.errorMetadata(null); + reservedMetadataBuilder.errorMetadata(null); ClusterState.Builder stateBuilder = new ClusterState.Builder(state); - Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).put(immutableMetadataBuilder.build()); + Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).put(reservedMetadataBuilder.build()); return stateBuilder.metadata(metadataBuilder).build(); } - private Set keysForHandler(ImmutableStateMetadata immutableStateMetadata, String handlerName) { - if (immutableStateMetadata == null || immutableStateMetadata.handlers().get(handlerName) == null) { + private Set keysForHandler(ReservedStateMetadata reservedStateMetadata, String handlerName) { + if (reservedStateMetadata == null || reservedStateMetadata.handlers().get(handlerName) == null) { return Collections.emptySet(); } - return immutableStateMetadata.handlers().get(handlerName).keys(); + return reservedStateMetadata.handlers().get(handlerName).keys(); } } diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java new file mode 100644 index 0000000000000..9a09f1b253bce --- /dev/null +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.reservedstate.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.routing.RerouteService; +import org.elasticsearch.common.Priority; + +import java.util.List; + +/** + * Reserved cluster state update task executor + * + * @param rerouteService instance of {@link RerouteService}, so that we can execute reroute after cluster state is published + */ +public record ReservedStateUpdateTaskExecutor(RerouteService rerouteService) implements ClusterStateTaskExecutor { + + private static final Logger logger = LogManager.getLogger(ReservedStateUpdateTaskExecutor.class); + + @Override + public ClusterState execute(ClusterState currentState, List> taskContexts) throws Exception { + for (final var taskContext : taskContexts) { + currentState = taskContext.getTask().execute(currentState); + taskContext.success(() -> taskContext.getTask().listener().onResponse(ActionResponse.Empty.INSTANCE)); + } + return currentState; + } + + @Override + public void clusterStatePublished(ClusterState newClusterState) { + rerouteService.reroute( + "reroute after saving and reserving part of the cluster state", + Priority.NORMAL, + ActionListener.wrap( + r -> logger.trace("reroute after applying and reserving part of the cluster state succeeded"), + e -> logger.debug("reroute after applying and reserving part of the cluster state failed", e) + ) + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/PackageVersion.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateVersion.java similarity index 59% rename from server/src/main/java/org/elasticsearch/reservedstate/service/PackageVersion.java rename to server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateVersion.java index 95a48112515da..43c755d96a14d 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/PackageVersion.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateVersion.java @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -package org.elasticsearch.immutablestate.service; +package org.elasticsearch.reservedstate.service; import org.elasticsearch.Version; import org.elasticsearch.xcontent.ConstructingObjectParser; @@ -17,17 +17,18 @@ * File settings metadata class that holds information about * versioning and Elasticsearch version compatibility */ -public record PackageVersion(Long version, Version compatibleWith) { +public record ReservedStateVersion(Long version, Version compatibleWith) { + public static final ParseField VERSION = new ParseField("version"); public static final ParseField COMPATIBILITY = new ParseField("compatibility"); - private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - "immutable_cluster_state_version_metadata", + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "reserved_cluster_state_version_metadata", a -> { Long updateId = Long.parseLong((String) a[0]); Version minCompatVersion = Version.fromString((String) a[1]); - return new PackageVersion(updateId, minCompatVersion); + return new ReservedStateVersion(updateId, minCompatVersion); } ); @@ -36,11 +37,22 @@ public record PackageVersion(Long version, Version compatibleWith) { PARSER.declareString(ConstructingObjectParser.constructorArg(), COMPATIBILITY); } - public static PackageVersion parse(XContentParser parser, Void v) { + public static ReservedStateVersion parse(XContentParser parser, Void v) { return PARSER.apply(parser, v); } public Version minCompatibleVersion() { return compatibleWith; } + + /** + * {@link IncompatibleVersionException} is thrown when we try to update the cluster state + * without changing the update version id, or if we try to update cluster state on + * an incompatible Elasticsearch version in mixed cluster mode. + */ + public static class IncompatibleVersionException extends RuntimeException { + public IncompatibleVersionException(String message) { + super(message); + } + } } diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ImmutableClusterStateControllerTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateControllerTests.java similarity index 72% rename from server/src/test/java/org/elasticsearch/reservedstate/service/ImmutableClusterStateControllerTests.java rename to server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateControllerTests.java index baf282805eda0..c6dcd0f37267d 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/ImmutableClusterStateControllerTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateControllerTests.java @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -package org.elasticsearch.immutablestate.service; +package org.elasticsearch.reservedstate.service; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -15,17 +15,17 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateAckListener; import org.elasticsearch.cluster.ClusterStateTaskExecutor; -import org.elasticsearch.cluster.metadata.ImmutableStateErrorMetadata; -import org.elasticsearch.cluster.metadata.ImmutableStateHandlerMetadata; -import org.elasticsearch.cluster.metadata.ImmutableStateMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata; +import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata; +import org.elasticsearch.cluster.metadata.ReservedStateMetadata; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.immutablestate.ImmutableClusterStateHandler; -import org.elasticsearch.immutablestate.TransformState; -import org.elasticsearch.immutablestate.action.ImmutableClusterSettingsAction; +import org.elasticsearch.reservedstate.ReservedClusterStateHandler; +import org.elasticsearch.reservedstate.TransformState; +import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParserConfiguration; @@ -53,7 +53,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class ImmutableClusterStateControllerTests extends ESTestCase { +public class ReservedClusterStateControllerTests extends ESTestCase { public void testOperatorController() throws IOException { ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); @@ -63,9 +63,9 @@ public void testOperatorController() throws IOException { ClusterState state = ClusterState.builder(clusterName).build(); when(clusterService.state()).thenReturn(state); - ImmutableClusterStateController controller = new ImmutableClusterStateController( + ReservedClusterStateController controller = new ReservedClusterStateController( clusterService, - List.of(new ImmutableClusterSettingsAction(clusterSettings)) + List.of(new ReservedClusterSettingsAction(clusterSettings)) ); String testJSON = """ @@ -130,13 +130,12 @@ public void testUpdateStateTasks() throws Exception { when(clusterService.getRerouteService()).thenReturn(rerouteService); ClusterState state = ClusterState.builder(new ClusterName("test")).build(); - ImmutableClusterStateController.ImmutableUpdateStateTaskExecutor taskExecutor = - new ImmutableClusterStateController.ImmutableUpdateStateTaskExecutor(clusterService.getRerouteService()); + ReservedStateUpdateTaskExecutor taskExecutor = new ReservedStateUpdateTaskExecutor(clusterService.getRerouteService()); AtomicBoolean successCalled = new AtomicBoolean(false); - ImmutableStateUpdateStateTask task = spy( - new ImmutableStateUpdateStateTask( + ReservedStateUpdateTask task = spy( + new ReservedStateUpdateTask( "test", null, Collections.emptyMap(), @@ -154,9 +153,9 @@ public void onFailure(Exception e) {} doReturn(state).when(task).execute(any()); - ClusterStateTaskExecutor.TaskContext taskContext = new ClusterStateTaskExecutor.TaskContext<>() { + ClusterStateTaskExecutor.TaskContext taskContext = new ClusterStateTaskExecutor.TaskContext<>() { @Override - public ImmutableStateUpdateStateTask getTask() { + public ReservedStateUpdateTask getTask() { return task; } @@ -191,14 +190,9 @@ public void onFailure(Exception failure) {} public void testErrorStateTask() throws Exception { ClusterState state = ClusterState.builder(new ClusterName("test")).build(); - ImmutableStateUpdateErrorTask task = spy( - new ImmutableStateUpdateErrorTask( - new ImmutableClusterStateController.ImmutableUpdateErrorState( - "test", - 1L, - List.of("some parse error", "some io error"), - ImmutableStateErrorMetadata.ErrorKind.PARSING - ), + ReservedStateErrorTask task = spy( + new ReservedStateErrorTask( + new ErrorState("test", 1L, List.of("some parse error", "some io error"), ReservedStateErrorMetadata.ErrorKind.PARSING), new ActionListener<>() { @Override public void onResponse(ActionResponse.Empty empty) {} @@ -209,10 +203,10 @@ public void onFailure(Exception e) {} ) ); - ImmutableClusterStateController.ImmutableUpdateErrorTaskExecutor.TaskContext taskContext = - new ImmutableClusterStateController.ImmutableUpdateErrorTaskExecutor.TaskContext<>() { + ReservedStateErrorTaskExecutor.TaskContext taskContext = + new ReservedStateErrorTaskExecutor.TaskContext<>() { @Override - public ImmutableStateUpdateErrorTask getTask() { + public ReservedStateErrorTask getTask() { return task; } @@ -234,23 +228,22 @@ public void success(Consumer publishedStateConsumer, ClusterStateA public void onFailure(Exception failure) {} }; - ImmutableClusterStateController.ImmutableUpdateErrorTaskExecutor executor = - new ImmutableClusterStateController.ImmutableUpdateErrorTaskExecutor(); + ReservedStateErrorTaskExecutor executor = new ReservedStateErrorTaskExecutor(); ClusterState newState = executor.execute(state, List.of(taskContext)); verify(task, times(1)).execute(any()); - ImmutableStateMetadata operatorMetadata = newState.metadata().immutableStateMetadata().get("test"); + ReservedStateMetadata operatorMetadata = newState.metadata().reservedStateMetadata().get("test"); assertNotNull(operatorMetadata); assertNotNull(operatorMetadata.errorMetadata()); assertEquals(1L, (long) operatorMetadata.errorMetadata().version()); - assertEquals(ImmutableStateErrorMetadata.ErrorKind.PARSING, operatorMetadata.errorMetadata().errorKind()); + assertEquals(ReservedStateErrorMetadata.ErrorKind.PARSING, operatorMetadata.errorMetadata().errorKind()); assertThat(operatorMetadata.errorMetadata().errors(), contains("some parse error", "some io error")); } public void testUpdateTaskDuplicateError() { - ImmutableClusterStateHandler> dummy = new ImmutableClusterStateHandler<>() { + ReservedClusterStateHandler> dummy = new ReservedClusterStateHandler<>() { @Override public String name() { return "one"; @@ -267,10 +260,10 @@ public Map fromXContent(XContentParser parser) throws IOExceptio } }; - ImmutableStateUpdateStateTask task = spy( - new ImmutableStateUpdateStateTask( + ReservedStateUpdateTask task = spy( + new ReservedStateUpdateTask( "namespace_one", - new ImmutableClusterStateController.Package(Map.of("one", "two"), new PackageVersion(1L, Version.CURRENT)), + new ReservedStateChunk(Map.of("one", "two"), new ReservedStateVersion(1L, Version.CURRENT)), Map.of("one", dummy), List.of(dummy.name()), (errorState) -> {}, @@ -284,14 +277,14 @@ public void onFailure(Exception e) {} ) ); - ImmutableStateHandlerMetadata hmOne = new ImmutableStateHandlerMetadata("one", Set.of("a", "b")); - ImmutableStateErrorMetadata emOne = new ImmutableStateErrorMetadata( + ReservedStateHandlerMetadata hmOne = new ReservedStateHandlerMetadata("one", Set.of("a", "b")); + ReservedStateErrorMetadata emOne = new ReservedStateErrorMetadata( 1L, - ImmutableStateErrorMetadata.ErrorKind.VALIDATION, + ReservedStateErrorMetadata.ErrorKind.VALIDATION, List.of("Test error 1", "Test error 2") ); - ImmutableStateMetadata operatorMetadata = ImmutableStateMetadata.builder("namespace_one") + ReservedStateMetadata operatorMetadata = ReservedStateMetadata.builder("namespace_one") .errorMetadata(emOne) .version(1L) .putHandler(hmOne) @@ -303,17 +296,17 @@ public void onFailure(Exception e) {} // We exit on duplicate errors before we update the cluster state error metadata assertEquals( "Not updating error state because version [1] is less or equal to the last state error version [1]", - expectThrows(ImmutableClusterStateController.IncompatibleVersionException.class, () -> task.execute(state)).getMessage() + expectThrows(ReservedStateVersion.IncompatibleVersionException.class, () -> task.execute(state)).getMessage() ); - emOne = new ImmutableStateErrorMetadata( + emOne = new ReservedStateErrorMetadata( 0L, - ImmutableStateErrorMetadata.ErrorKind.VALIDATION, + ReservedStateErrorMetadata.ErrorKind.VALIDATION, List.of("Test error 1", "Test error 2") ); // If we are writing with older error metadata, we should get proper IllegalStateException - operatorMetadata = ImmutableStateMetadata.builder("namespace_one").errorMetadata(emOne).version(0L).putHandler(hmOne).build(); + operatorMetadata = ReservedStateMetadata.builder("namespace_one").errorMetadata(emOne).version(0L).putHandler(hmOne).build(); metadata = Metadata.builder().put(operatorMetadata).build(); ClusterState newState = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); @@ -326,39 +319,43 @@ public void onFailure(Exception e) {} } public void testCheckMetadataVersion() { - ImmutableStateMetadata operatorMetadata = ImmutableStateMetadata.builder("test").version(123L).build(); + ReservedStateMetadata operatorMetadata = ReservedStateMetadata.builder("test").version(123L).build(); assertTrue( - ImmutableClusterStateController.checkMetadataVersion(operatorMetadata, new PackageVersion(124L, Version.CURRENT), (e) -> {}) + ReservedClusterStateController.checkMetadataVersion( + operatorMetadata, + new ReservedStateVersion(124L, Version.CURRENT), + (e) -> {} + ) ); AtomicReference x = new AtomicReference<>(); assertFalse( - ImmutableClusterStateController.checkMetadataVersion( + ReservedClusterStateController.checkMetadataVersion( operatorMetadata, - new PackageVersion(123L, Version.CURRENT), + new ReservedStateVersion(123L, Version.CURRENT), (e) -> x.set(e) ) ); - assertTrue(x.get() instanceof ImmutableClusterStateController.IncompatibleVersionException); + assertTrue(x.get() instanceof ReservedStateVersion.IncompatibleVersionException); assertTrue(x.get().getMessage().contains("is less or equal to the current metadata version")); assertFalse( - ImmutableClusterStateController.checkMetadataVersion( + ReservedClusterStateController.checkMetadataVersion( operatorMetadata, - new PackageVersion(124L, Version.fromId(Version.CURRENT.id + 1)), + new ReservedStateVersion(124L, Version.fromId(Version.CURRENT.id + 1)), (e) -> x.set(e) ) ); - assertEquals(ImmutableClusterStateController.IncompatibleVersionException.class, x.get().getClass()); + assertEquals(ReservedStateVersion.IncompatibleVersionException.class, x.get().getClass()); assertTrue(x.get().getMessage().contains("is not compatible with this Elasticsearch node")); } public void testHandlerOrdering() { - ImmutableClusterStateHandler> oh1 = new ImmutableClusterStateHandler<>() { + ReservedClusterStateHandler> oh1 = new ReservedClusterStateHandler<>() { @Override public String name() { return "one"; @@ -380,7 +377,7 @@ public Map fromXContent(XContentParser parser) throws IOExceptio } }; - ImmutableClusterStateHandler> oh2 = new ImmutableClusterStateHandler<>() { + ReservedClusterStateHandler> oh2 = new ReservedClusterStateHandler<>() { @Override public String name() { return "two"; @@ -397,7 +394,7 @@ public Map fromXContent(XContentParser parser) throws IOExceptio } }; - ImmutableClusterStateHandler> oh3 = new ImmutableClusterStateHandler<>() { + ReservedClusterStateHandler> oh3 = new ReservedClusterStateHandler<>() { @Override public String name() { return "three"; @@ -420,25 +417,30 @@ public Map fromXContent(XContentParser parser) throws IOExceptio }; ClusterService clusterService = mock(ClusterService.class); - final var controller = new ImmutableClusterStateController(clusterService, List.of(oh1, oh2, oh3)); - Collection ordered = controller.orderedStateHandlers(Set.of("one", "two", "three")); + final var controller = new ReservedClusterStateController(clusterService, List.of(oh1, oh2, oh3)); + Collection ordered = HandlerDependencyManager.orderedStateHandlers(controller.handlers, Set.of("one", "two", "three")); assertThat(ordered, contains("two", "three", "one")); // assure that we bail on unknown handler assertEquals( - "Unknown settings definition type: four", - expectThrows(IllegalStateException.class, () -> controller.orderedStateHandlers(Set.of("one", "two", "three", "four"))) - .getMessage() + "Unknown handler type: four", + expectThrows( + IllegalStateException.class, + () -> HandlerDependencyManager.orderedStateHandlers(controller.handlers, Set.of("one", "two", "three", "four")) + ).getMessage() ); // assure that we bail on missing dependency link assertEquals( - "Missing settings dependency definition: one -> three", - expectThrows(IllegalStateException.class, () -> controller.orderedStateHandlers(Set.of("one", "two"))).getMessage() + "Missing handler dependency definition: one -> three", + expectThrows( + IllegalStateException.class, + () -> HandlerDependencyManager.orderedStateHandlers(controller.handlers, Set.of("one", "two")) + ).getMessage() ); // Change the second handler so that we create cycle - oh2 = new ImmutableClusterStateHandler<>() { + oh2 = new ReservedClusterStateHandler<>() { @Override public String name() { return "two"; @@ -460,10 +462,13 @@ public Map fromXContent(XContentParser parser) throws IOExceptio } }; - final var controller1 = new ImmutableClusterStateController(clusterService, List.of(oh1, oh2)); + final var controller1 = new ReservedClusterStateController(clusterService, List.of(oh1, oh2)); assertThat( - expectThrows(IllegalStateException.class, () -> controller1.orderedStateHandlers(Set.of("one", "two"))).getMessage(), + expectThrows( + IllegalStateException.class, + () -> HandlerDependencyManager.orderedStateHandlers(controller1.handlers, Set.of("one", "two")) + ).getMessage(), anyOf( is("Cycle found in settings dependencies: one -> two -> one"), is("Cycle found in settings dependencies: two -> one -> two") @@ -482,19 +487,19 @@ public void testDuplicateHandlerNames() { assertTrue( expectThrows( IllegalStateException.class, - () -> new ImmutableClusterStateController( + () -> new ReservedClusterStateController( clusterService, - List.of(new ImmutableClusterSettingsAction(clusterSettings), new TestHandler()) + List.of(new ReservedClusterSettingsAction(clusterSettings), new TestHandler()) ) ).getMessage().startsWith("Duplicate key cluster_settings") ); } - class TestHandler implements ImmutableClusterStateHandler> { + class TestHandler implements ReservedClusterStateHandler> { @Override public String name() { - return ImmutableClusterSettingsAction.NAME; + return ReservedClusterSettingsAction.NAME; } @Override diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateControllerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateControllerTests.java index 19343f610318f..af023ff8a9d45 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateControllerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateControllerTests.java @@ -18,13 +18,14 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.immutablestate.TransformState; -import org.elasticsearch.immutablestate.action.ImmutableClusterSettingsAction; -import org.elasticsearch.immutablestate.service.ImmutableClusterStateController; -import org.elasticsearch.immutablestate.service.ImmutableStateUpdateStateTask; -import org.elasticsearch.immutablestate.service.PackageVersion; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.reservedstate.TransformState; +import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction; +import org.elasticsearch.reservedstate.service.ReservedClusterStateController; +import org.elasticsearch.reservedstate.service.ReservedStateChunk; +import org.elasticsearch.reservedstate.service.ReservedStateUpdateTask; +import org.elasticsearch.reservedstate.service.ReservedStateUpdateTaskExecutor; +import org.elasticsearch.reservedstate.service.ReservedStateVersion; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ParseField; @@ -236,17 +237,16 @@ private void setupTaskMock(ClusterService clusterService, ClusterState state) { doAnswer((Answer) invocation -> { Object[] args = invocation.getArguments(); - if ((args[3] instanceof ImmutableClusterStateController.ImmutableUpdateStateTaskExecutor) == false) { + if ((args[3] instanceof ReservedStateUpdateTaskExecutor) == false) { fail("Should have gotten a state update task to execute, instead got: " + args[3].getClass().getName()); } - ImmutableClusterStateController.ImmutableUpdateStateTaskExecutor task = - (ImmutableClusterStateController.ImmutableUpdateStateTaskExecutor) args[3]; + ReservedStateUpdateTaskExecutor task = (ReservedStateUpdateTaskExecutor) args[3]; - ClusterStateTaskExecutor.TaskContext context = new ClusterStateTaskExecutor.TaskContext<>() { + ClusterStateTaskExecutor.TaskContext context = new ClusterStateTaskExecutor.TaskContext<>() { @Override - public ImmutableStateUpdateStateTask getTask() { - return (ImmutableStateUpdateStateTask) args[1]; + public ReservedStateUpdateTask getTask() { + return (ReservedStateUpdateTask) args[1]; } @Override @@ -281,9 +281,9 @@ public void testOperatorControllerFromJSONContent() throws IOException { ClusterState state = ClusterState.builder(clusterName).build(); when(clusterService.state()).thenReturn(state); - ImmutableClusterStateController controller = new ImmutableClusterStateController( + ReservedClusterStateController controller = new ReservedClusterStateController( clusterService, - List.of(new ImmutableClusterSettingsAction(clusterSettings)) + List.of(new ReservedClusterSettingsAction(clusterSettings)) ); String testJSON = """ @@ -353,11 +353,11 @@ public void testOperatorControllerFromJSONContent() throws IOException { XPackLicenseState licenseState = mock(XPackLicenseState.class); - controller = new ImmutableClusterStateController( + controller = new ReservedClusterStateController( clusterService, List.of( - new ImmutableClusterSettingsAction(clusterSettings), - new ImmutableLifecycleAction(xContentRegistry(), client, licenseState) + new ReservedClusterSettingsAction(clusterSettings), + new ReservedLifecycleAction(xContentRegistry(), client, licenseState) ) ); @@ -380,18 +380,18 @@ public void testOperatorControllerWithPluginPackage() { ClusterState state = ClusterState.builder(clusterName).build(); when(clusterService.state()).thenReturn(state); - ImmutableClusterStateController controller = new ImmutableClusterStateController( + ReservedClusterStateController controller = new ReservedClusterStateController( clusterService, - List.of(new ImmutableClusterSettingsAction(clusterSettings)) + List.of(new ReservedClusterSettingsAction(clusterSettings)) ); AtomicReference x = new AtomicReference<>(); - ImmutableClusterStateController.Package pack = new ImmutableClusterStateController.Package( + ReservedStateChunk pack = new ReservedStateChunk( Map.of( - ImmutableClusterSettingsAction.NAME, + ReservedClusterSettingsAction.NAME, Map.of("indices.recovery.max_bytes_per_sec", "50mb"), - ImmutableLifecycleAction.NAME, + ReservedLifecycleAction.NAME, List.of( new LifecyclePolicy( "my_timeseries_lifecycle", @@ -404,7 +404,7 @@ public void testOperatorControllerWithPluginPackage() { ) ) ), - new PackageVersion(123L, Version.CURRENT) + new ReservedStateVersion(123L, Version.CURRENT) ); controller.process("operator", pack, (e) -> x.set(e)); @@ -417,11 +417,11 @@ public void testOperatorControllerWithPluginPackage() { XPackLicenseState licenseState = mock(XPackLicenseState.class); - controller = new ImmutableClusterStateController( + controller = new ReservedClusterStateController( clusterService, List.of( - new ImmutableClusterSettingsAction(clusterSettings), - new ImmutableLifecycleAction(xContentRegistry(), client, licenseState) + new ReservedClusterSettingsAction(clusterSettings), + new ReservedLifecycleAction(xContentRegistry(), client, licenseState) ) ); @@ -433,5 +433,4 @@ public void testOperatorControllerWithPluginPackage() { } }); } - } From 74dac8910fad94a017ff3ff6b05d5e88d293e876 Mon Sep 17 00:00:00 2001 From: Nikola Grcevski Date: Thu, 14 Jul 2022 10:40:25 -0400 Subject: [PATCH 3/5] Apply review feedback --- .../org/elasticsearch/ExceptionsHelper.java | 21 ---- .../ReservedClusterStateHandler.java | 4 +- .../reservedstate/TransformState.java | 2 +- .../action/ReservedClusterSettingsAction.java | 2 +- .../reservedstate/service/ErrorState.java | 14 +-- .../service/HandlerDependencyManager.java | 79 ------------ ....java => ReservedClusterStateService.java} | 64 +++++++++- .../service/ReservedStateChunk.java | 2 +- .../ReservedStateErrorTaskExecutor.java | 2 +- .../service/ReservedStateUpdateTask.java | 6 +- .../service/ReservedStateVersion.java | 4 +- ... => ReservedClusterStateServiceTests.java} | 113 ++++-------------- ...> ReservedLifecycleStateServiceTests.java} | 12 +- 13 files changed, 102 insertions(+), 223 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/reservedstate/service/HandlerDependencyManager.java rename server/src/main/java/org/elasticsearch/reservedstate/service/{ReservedClusterStateController.java => ReservedClusterStateService.java} (81%) rename server/src/test/java/org/elasticsearch/reservedstate/service/{ReservedClusterStateControllerTests.java => ReservedClusterStateServiceTests.java} (81%) rename x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/{ReservedLifecycleStateControllerTests.java => ReservedLifecycleStateServiceTests.java} (98%) diff --git a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java index 1f9dcf2008753..7e22e1797b527 100644 --- a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java +++ b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java @@ -34,7 +34,6 @@ import java.util.Optional; import java.util.Queue; import java.util.Set; -import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -263,26 +262,6 @@ public static void maybeDieOnAnotherThread(final Throwable throwable) { }); } - /** - * Unwraps a throwable and calls an error listener with every unwrapped throwable - * @param t the starting throwable - * @param errorListener a listener function that will be called with every unwrapped throwable we find - * @param limit after how many encountered throwables should we stop unwrapping. Prevents stack overflows. 10 is reasonable max. - */ - public static void unwrap(Throwable t, Consumer errorListener, int limit) { - int counter = 0; - Throwable cause; - Throwable prev = t; - errorListener.accept(prev); - while ((cause = prev.getCause()) != null && (prev != cause)) { - prev = cause; - errorListener.accept(prev); - if (counter++ > limit) { - return; - } - } - } - /** * Deduplicate the failures by exception message and index. */ diff --git a/server/src/main/java/org/elasticsearch/reservedstate/ReservedClusterStateHandler.java b/server/src/main/java/org/elasticsearch/reservedstate/ReservedClusterStateHandler.java index 042e43891957f..47c412e0ea2f9 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/ReservedClusterStateHandler.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/ReservedClusterStateHandler.java @@ -68,9 +68,9 @@ public interface ReservedClusterStateHandler { *

* Sometimes certain parts of the cluster state cannot be created/updated without previously * setting other cluster state components, e.g. composable templates. Since the reserved cluster state handlers - * are processed in random order by the ReservedClusterStateController, this method gives an opportunity + * are processed in random order by the ReservedClusterStateService, this method gives an opportunity * to any reserved handler to declare other state handlers it depends on. Given dependencies exist, - * the ReservedClusterStateController will order those handlers such that the handlers that are dependent + * the ReservedClusterStateService will order those handlers such that the handlers that are dependent * on are processed first. * * @return a collection of reserved state handler names diff --git a/server/src/main/java/org/elasticsearch/reservedstate/TransformState.java b/server/src/main/java/org/elasticsearch/reservedstate/TransformState.java index 33e01d8b675c6..d91a92e3b2158 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/TransformState.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/TransformState.java @@ -13,7 +13,7 @@ import java.util.Set; /** - * A {@link ClusterState} wrapper used by the ReservedClusterStateController to pass the + * A {@link ClusterState} wrapper used by the ReservedClusterStateService to pass the * current state as well as previous keys set by an {@link ReservedClusterStateHandler} to each transform * step of the cluster state update. * diff --git a/server/src/main/java/org/elasticsearch/reservedstate/action/ReservedClusterSettingsAction.java b/server/src/main/java/org/elasticsearch/reservedstate/action/ReservedClusterSettingsAction.java index c0805cb95b030..ad0a0aefa1230 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/action/ReservedClusterSettingsAction.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/action/ReservedClusterSettingsAction.java @@ -27,7 +27,7 @@ /** * This Action is the reserved state save version of RestClusterUpdateSettingsAction *

- * It is used by the ReservedClusterStateController to update the persistent cluster settings. + * It is used by the ReservedClusterStateService to update the persistent cluster settings. * Since transient cluster settings are deprecated, this action doesn't support updating transient cluster settings. */ public class ReservedClusterSettingsAction implements ReservedClusterStateHandler> { diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ErrorState.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ErrorState.java index 3a09862922172..401281a8274e0 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ErrorState.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ErrorState.java @@ -8,23 +8,15 @@ package org.elasticsearch.reservedstate.service; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata; import java.util.List; +import static org.elasticsearch.ExceptionsHelper.stackTrace; + record ErrorState(String namespace, Long version, List errors, ReservedStateErrorMetadata.ErrorKind errorKind) { ErrorState(String namespace, Long version, Exception e, ReservedStateErrorMetadata.ErrorKind errorKind) { - this(namespace, version, List.of(unwrapException(e)), errorKind); - } - - public static String unwrapException(Exception e) { - StringBuilder stringBuilder = new StringBuilder(); - ExceptionsHelper.unwrap(e, (t) -> stringBuilder.append(t.getMessage()).append(", "), 10); - if (stringBuilder.length() > 2) { - stringBuilder.setLength(stringBuilder.length() - 2); - } - return stringBuilder.toString(); + this(namespace, version, List.of(stackTrace(e)), errorKind); } public String toString() { diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/HandlerDependencyManager.java b/server/src/main/java/org/elasticsearch/reservedstate/service/HandlerDependencyManager.java deleted file mode 100644 index 8660aa0a1262c..0000000000000 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/HandlerDependencyManager.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.reservedstate.service; - -import org.elasticsearch.reservedstate.ReservedClusterStateHandler; - -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; - -/** - * Package private class used by the {@link ReservedClusterStateController} to order the reserved - * cluster state handlers, by their declared dependencies. See also {@link ReservedClusterStateHandler}. - */ -class HandlerDependencyManager { - /** - * Returns an ordered set ({@link LinkedHashSet}) of the cluster state handlers that need to - * execute for a given list of handler names supplied through the {@link ReservedStateChunk}. - * @param handlers All possible handlers that are registered with the {@link ReservedClusterStateController} - * @param handlerNames Names of handlers found in the {@link ReservedStateChunk} - * @return - */ - static LinkedHashSet orderedStateHandlers(Map> handlers, Set handlerNames) { - LinkedHashSet orderedHandlers = new LinkedHashSet<>(); - LinkedHashSet dependencyStack = new LinkedHashSet<>(); - - for (String key : handlerNames) { - addStateHandler(handlers, key, handlerNames, orderedHandlers, dependencyStack); - } - - return orderedHandlers; - } - - private static void addStateHandler( - Map> handlers, - String key, - Set keys, - LinkedHashSet ordered, - LinkedHashSet visited - ) { - if (visited.contains(key)) { - StringBuilder msg = new StringBuilder("Cycle found in settings dependencies: "); - visited.forEach(s -> { - msg.append(s); - msg.append(" -> "); - }); - msg.append(key); - throw new IllegalStateException(msg.toString()); - } - - if (ordered.contains(key)) { - // already added by another dependent handler - return; - } - - visited.add(key); - ReservedClusterStateHandler handler = handlers.get(key); - - if (handler == null) { - throw new IllegalStateException("Unknown handler type: " + key); - } - - for (String dependency : handler.dependencies()) { - if (keys.contains(dependency) == false) { - throw new IllegalStateException("Missing handler dependency definition: " + key + " -> " + dependency); - } - addStateHandler(handlers, dependency, keys, ordered, visited); - } - - visited.remove(key); - ordered.add(key); - } -} diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateController.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java similarity index 81% rename from server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateController.java rename to server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java index 8149da0afebf7..bceb8bec143d7 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateController.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java @@ -29,6 +29,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -40,10 +41,10 @@ *

* This class contains the logic about validation, ordering and applying of * the cluster state specified in a file or through plugins/modules. Reserved cluster state - * cannot be modified throught the REST APIs, only through this controller class. + * cannot be modified through the REST APIs, only through this controller class. */ -public class ReservedClusterStateController { - private static final Logger logger = LogManager.getLogger(ReservedClusterStateController.class); +public class ReservedClusterStateService { + private static final Logger logger = LogManager.getLogger(ReservedClusterStateService.class); public static final ParseField STATE_FIELD = new ParseField("state"); public static final ParseField METADATA_FIELD = new ParseField("metadata"); @@ -72,7 +73,7 @@ public class ReservedClusterStateController { * @param clusterService for fetching and saving the modified state * @param handlerList a list of reserved state handlers, which we use to transform the state */ - public ReservedClusterStateController(ClusterService clusterService, List> handlerList) { + public ReservedClusterStateService(ClusterService clusterService, List> handlerList) { this.clusterService = clusterService; this.updateStateTaskExecutor = new ReservedStateUpdateTaskExecutor(clusterService.getRerouteService()); this.errorStateTaskExecutor = new ReservedStateErrorTaskExecutor(); @@ -84,7 +85,7 @@ public ReservedClusterStateController(ClusterService clusterService, List(name, handlers.get(name).fromXContent(p)); }, STATE_FIELD); - stateChunkParser.declareObject(ConstructingObjectParser.constructorArg(), ReservedStateVersion::parse, METADATA_FIELD); + stateChunkParser.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> ReservedStateVersion.parse(p), METADATA_FIELD); } /** @@ -128,7 +129,7 @@ public void process(String namespace, ReservedStateChunk reservedStateChunk, Con LinkedHashSet orderedHandlers; try { - orderedHandlers = HandlerDependencyManager.orderedStateHandlers(handlers, reservedState.keySet()); + orderedHandlers = orderedStateHandlers(reservedState.keySet()); } catch (Exception e) { ErrorState errorState = new ErrorState( namespace, @@ -229,4 +230,55 @@ public void onFailure(Exception e) { errorStateTaskExecutor ); } + + /** + * Returns an ordered set ({@link LinkedHashSet}) of the cluster state handlers that need to + * execute for a given list of handler names supplied through the {@link ReservedStateChunk}. + * @param handlerNames Names of handlers found in the {@link ReservedStateChunk} + * @return + */ + LinkedHashSet orderedStateHandlers(Set handlerNames) { + LinkedHashSet orderedHandlers = new LinkedHashSet<>(); + LinkedHashSet dependencyStack = new LinkedHashSet<>(); + + for (String key : handlerNames) { + addStateHandler(key, handlerNames, orderedHandlers, dependencyStack); + } + + return orderedHandlers; + } + + private void addStateHandler(String key, Set keys, LinkedHashSet ordered, LinkedHashSet visited) { + if (visited.contains(key)) { + StringBuilder msg = new StringBuilder("Cycle found in settings dependencies: "); + visited.forEach(s -> { + msg.append(s); + msg.append(" -> "); + }); + msg.append(key); + throw new IllegalStateException(msg.toString()); + } + + if (ordered.contains(key)) { + // already added by another dependent handler + return; + } + + visited.add(key); + ReservedClusterStateHandler handler = handlers.get(key); + + if (handler == null) { + throw new IllegalStateException("Unknown handler type: " + key); + } + + for (String dependency : handler.dependencies()) { + if (keys.contains(dependency) == false) { + throw new IllegalStateException("Missing handler dependency definition: " + key + " -> " + dependency); + } + addStateHandler(dependency, keys, ordered, visited); + } + + visited.remove(key); + ordered.add(key); + } } diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateChunk.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateChunk.java index ee7e0892e2aba..b5b04b509dad5 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateChunk.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateChunk.java @@ -11,7 +11,7 @@ import java.util.Map; /** - * Tuple class containing the cluster state to be saved and reserved and the version info + * A holder for the cluster state to be saved and reserved and the version info *

* Apart from the cluster state we want to store and reserve, the chunk requires that * you supply the version metadata. This version metadata (see {@link ReservedStateVersion}) is checked to ensure diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTaskExecutor.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTaskExecutor.java index bdb11c849934f..9e0cf82818f70 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTaskExecutor.java @@ -37,6 +37,6 @@ public ClusterState execute(ClusterState currentState, List - * Reserved cluster state can only be modified by using the {@link ReservedClusterStateController}. Updating + * Reserved cluster state can only be modified by using the {@link ReservedClusterStateService}. Updating * the reserved cluster state through REST APIs is not permitted. */ public class ReservedStateUpdateTask implements ClusterStateTaskListener { @@ -90,7 +90,7 @@ protected ClusterState execute(ClusterState state) { state = transformState.state(); reservedMetadataBuilder.putHandler(new ReservedStateHandlerMetadata(handlerName, transformState.keys())); } catch (Exception e) { - errors.add(format("Error processing %s state change: %s", handler.name(), unwrapException(e))); + errors.add(format("Error processing %s state change: %s", handler.name(), stackTrace(e))); } } diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateVersion.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateVersion.java index 43c755d96a14d..6ae5048d02550 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateVersion.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateVersion.java @@ -37,8 +37,8 @@ public record ReservedStateVersion(Long version, Version compatibleWith) { PARSER.declareString(ConstructingObjectParser.constructorArg(), COMPATIBILITY); } - public static ReservedStateVersion parse(XContentParser parser, Void v) { - return PARSER.apply(parser, v); + public static ReservedStateVersion parse(XContentParser parser) { + return PARSER.apply(parser, null); } public Version minCompatibleVersion() { diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateControllerTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java similarity index 81% rename from server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateControllerTests.java rename to server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java index c6dcd0f37267d..c131a5918ca09 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateControllerTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java @@ -53,7 +53,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class ReservedClusterStateControllerTests extends ESTestCase { +public class ReservedClusterStateServiceTests extends ESTestCase { public void testOperatorController() throws IOException { ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); @@ -63,7 +63,7 @@ public void testOperatorController() throws IOException { ClusterState state = ClusterState.builder(clusterName).build(); when(clusterService.state()).thenReturn(state); - ReservedClusterStateController controller = new ReservedClusterStateController( + ReservedClusterStateService controller = new ReservedClusterStateService( clusterService, List.of(new ReservedClusterSettingsAction(clusterSettings)) ); @@ -322,17 +322,13 @@ public void testCheckMetadataVersion() { ReservedStateMetadata operatorMetadata = ReservedStateMetadata.builder("test").version(123L).build(); assertTrue( - ReservedClusterStateController.checkMetadataVersion( - operatorMetadata, - new ReservedStateVersion(124L, Version.CURRENT), - (e) -> {} - ) + ReservedClusterStateService.checkMetadataVersion(operatorMetadata, new ReservedStateVersion(124L, Version.CURRENT), (e) -> {}) ); AtomicReference x = new AtomicReference<>(); assertFalse( - ReservedClusterStateController.checkMetadataVersion( + ReservedClusterStateService.checkMetadataVersion( operatorMetadata, new ReservedStateVersion(123L, Version.CURRENT), (e) -> x.set(e) @@ -343,7 +339,7 @@ public void testCheckMetadataVersion() { assertTrue(x.get().getMessage().contains("is less or equal to the current metadata version")); assertFalse( - ReservedClusterStateController.checkMetadataVersion( + ReservedClusterStateService.checkMetadataVersion( operatorMetadata, new ReservedStateVersion(124L, Version.fromId(Version.CURRENT.id + 1)), (e) -> x.set(e) @@ -354,11 +350,11 @@ public void testCheckMetadataVersion() { assertTrue(x.get().getMessage().contains("is not compatible with this Elasticsearch node")); } - public void testHandlerOrdering() { - ReservedClusterStateHandler> oh1 = new ReservedClusterStateHandler<>() { + private ReservedClusterStateHandler> makeHandlerHelper(final String name, final List deps) { + return new ReservedClusterStateHandler<>() { @Override public String name() { - return "one"; + return name; } @Override @@ -368,24 +364,7 @@ public TransformState transform(Object source, TransformState prevState) throws @Override public Collection dependencies() { - return List.of("two", "three"); - } - - @Override - public Map fromXContent(XContentParser parser) throws IOException { - return parser.map(); - } - }; - - ReservedClusterStateHandler> oh2 = new ReservedClusterStateHandler<>() { - @Override - public String name() { - return "two"; - } - - @Override - public TransformState transform(Object source, TransformState prevState) throws Exception { - return null; + return deps; } @Override @@ -393,82 +372,38 @@ public Map fromXContent(XContentParser parser) throws IOExceptio return parser.map(); } }; + } - ReservedClusterStateHandler> oh3 = new ReservedClusterStateHandler<>() { - @Override - public String name() { - return "three"; - } - - @Override - public TransformState transform(Object source, TransformState prevState) throws Exception { - return null; - } - - @Override - public Collection dependencies() { - return List.of("two"); - } - - @Override - public Map fromXContent(XContentParser parser) throws IOException { - return parser.map(); - } - }; + public void testHandlerOrdering() { + ReservedClusterStateHandler> oh1 = makeHandlerHelper("one", List.of("two", "three")); + ReservedClusterStateHandler> oh2 = makeHandlerHelper("two", Collections.emptyList()); + ReservedClusterStateHandler> oh3 = makeHandlerHelper("three", List.of("two")); ClusterService clusterService = mock(ClusterService.class); - final var controller = new ReservedClusterStateController(clusterService, List.of(oh1, oh2, oh3)); - Collection ordered = HandlerDependencyManager.orderedStateHandlers(controller.handlers, Set.of("one", "two", "three")); + final var controller = new ReservedClusterStateService(clusterService, List.of(oh1, oh2, oh3)); + Collection ordered = controller.orderedStateHandlers(Set.of("one", "two", "three")); assertThat(ordered, contains("two", "three", "one")); // assure that we bail on unknown handler assertEquals( "Unknown handler type: four", - expectThrows( - IllegalStateException.class, - () -> HandlerDependencyManager.orderedStateHandlers(controller.handlers, Set.of("one", "two", "three", "four")) - ).getMessage() + expectThrows(IllegalStateException.class, () -> controller.orderedStateHandlers(Set.of("one", "two", "three", "four"))) + .getMessage() ); // assure that we bail on missing dependency link assertEquals( "Missing handler dependency definition: one -> three", - expectThrows( - IllegalStateException.class, - () -> HandlerDependencyManager.orderedStateHandlers(controller.handlers, Set.of("one", "two")) - ).getMessage() + expectThrows(IllegalStateException.class, () -> controller.orderedStateHandlers(Set.of("one", "two"))).getMessage() ); // Change the second handler so that we create cycle - oh2 = new ReservedClusterStateHandler<>() { - @Override - public String name() { - return "two"; - } + oh2 = makeHandlerHelper("two", List.of("one")); - @Override - public TransformState transform(Object source, TransformState prevState) throws Exception { - return null; - } - - @Override - public Collection dependencies() { - return List.of("one"); - } - - @Override - public Map fromXContent(XContentParser parser) throws IOException { - return parser.map(); - } - }; - - final var controller1 = new ReservedClusterStateController(clusterService, List.of(oh1, oh2)); + final var controller1 = new ReservedClusterStateService(clusterService, List.of(oh1, oh2)); assertThat( - expectThrows( - IllegalStateException.class, - () -> HandlerDependencyManager.orderedStateHandlers(controller1.handlers, Set.of("one", "two")) - ).getMessage(), + expectThrows(IllegalStateException.class, () -> controller1.orderedStateHandlers(Set.of("one", "two"))).getMessage(), anyOf( is("Cycle found in settings dependencies: one -> two -> one"), is("Cycle found in settings dependencies: two -> one -> two") @@ -487,7 +422,7 @@ public void testDuplicateHandlerNames() { assertTrue( expectThrows( IllegalStateException.class, - () -> new ReservedClusterStateController( + () -> new ReservedClusterStateService( clusterService, List.of(new ReservedClusterSettingsAction(clusterSettings), new TestHandler()) ) @@ -503,7 +438,7 @@ public String name() { } @Override - public TransformState transform(Object source, TransformState prevState) throws Exception { + public TransformState transform(Object source, TransformState prevState) { return prevState; } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateControllerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateServiceTests.java similarity index 98% rename from x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateControllerTests.java rename to x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateServiceTests.java index af023ff8a9d45..8fa8abcdd2aa7 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateControllerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleStateServiceTests.java @@ -21,7 +21,7 @@ import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.reservedstate.TransformState; import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction; -import org.elasticsearch.reservedstate.service.ReservedClusterStateController; +import org.elasticsearch.reservedstate.service.ReservedClusterStateService; import org.elasticsearch.reservedstate.service.ReservedStateChunk; import org.elasticsearch.reservedstate.service.ReservedStateUpdateTask; import org.elasticsearch.reservedstate.service.ReservedStateUpdateTaskExecutor; @@ -71,7 +71,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class ReservedLifecycleStateControllerTests extends ESTestCase { +public class ReservedLifecycleStateServiceTests extends ESTestCase { protected NamedXContentRegistry xContentRegistry() { List entries = new ArrayList<>(ClusterModule.getNamedXWriteables()); @@ -281,7 +281,7 @@ public void testOperatorControllerFromJSONContent() throws IOException { ClusterState state = ClusterState.builder(clusterName).build(); when(clusterService.state()).thenReturn(state); - ReservedClusterStateController controller = new ReservedClusterStateController( + ReservedClusterStateService controller = new ReservedClusterStateService( clusterService, List.of(new ReservedClusterSettingsAction(clusterSettings)) ); @@ -353,7 +353,7 @@ public void testOperatorControllerFromJSONContent() throws IOException { XPackLicenseState licenseState = mock(XPackLicenseState.class); - controller = new ReservedClusterStateController( + controller = new ReservedClusterStateService( clusterService, List.of( new ReservedClusterSettingsAction(clusterSettings), @@ -380,7 +380,7 @@ public void testOperatorControllerWithPluginPackage() { ClusterState state = ClusterState.builder(clusterName).build(); when(clusterService.state()).thenReturn(state); - ReservedClusterStateController controller = new ReservedClusterStateController( + ReservedClusterStateService controller = new ReservedClusterStateService( clusterService, List.of(new ReservedClusterSettingsAction(clusterSettings)) ); @@ -417,7 +417,7 @@ public void testOperatorControllerWithPluginPackage() { XPackLicenseState licenseState = mock(XPackLicenseState.class); - controller = new ReservedClusterStateController( + controller = new ReservedClusterStateService( clusterService, List.of( new ReservedClusterSettingsAction(clusterSettings), From a568668948ad7621f4787fca3bd3720fd92647c4 Mon Sep 17 00:00:00 2001 From: Nikola Grcevski Date: Thu, 14 Jul 2022 16:48:10 -0400 Subject: [PATCH 4/5] Apply more review feedback --- .../service/ReservedClusterStateService.java | 36 ++++----- .../ReservedStateErrorTaskExecutor.java | 2 +- .../service/ReservedStateUpdateTask.java | 14 ++-- .../service/ReservedStateVersion.java | 11 --- .../ReservedClusterStateServiceTests.java | 78 +++++++++---------- 5 files changed, 61 insertions(+), 80 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java index bceb8bec143d7..cd4c3a6058975 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java @@ -94,8 +94,7 @@ public ReservedClusterStateService(ClusterService clusterService, List errorListener) { ReservedStateChunk stateChunk; @@ -120,8 +119,7 @@ public void process(String namespace, XContentParser parser, Consumer * @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata * @param reservedStateChunk a {@link ReservedStateChunk} composite state object to process * @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the - * cluster state cannot be correctly applied, {@link ReservedStateVersion.IncompatibleVersionException} if the content is stale - * or incompatible with this node {@link Version}, null if successful. + * cluster state cannot be correctly applied, null if successful or the state failed to apply because of incompatible version. */ public void process(String namespace, ReservedStateChunk reservedStateChunk, Consumer errorListener) { Map reservedState = reservedStateChunk.state(); @@ -147,7 +145,7 @@ public void process(String namespace, ReservedStateChunk reservedStateChunk, Con ClusterState state = clusterService.state(); ReservedStateMetadata existingMetadata = state.metadata().reservedStateMetadata().get(namespace); - if (checkMetadataVersion(existingMetadata, reservedStateVersion, errorListener) == false) { + if (checkMetadataVersion(existingMetadata, reservedStateVersion) == false) { return; } @@ -179,31 +177,23 @@ public void onFailure(Exception e) { } // package private for testing - static boolean checkMetadataVersion( - ReservedStateMetadata existingMetadata, - ReservedStateVersion reservedStateVersion, - Consumer errorListener - ) { + static boolean checkMetadataVersion(ReservedStateMetadata existingMetadata, ReservedStateVersion reservedStateVersion) { if (Version.CURRENT.before(reservedStateVersion.minCompatibleVersion())) { - errorListener.accept( - new ReservedStateVersion.IncompatibleVersionException( - format( - "Cluster state version [%s] is not compatible with this Elasticsearch node", - reservedStateVersion.minCompatibleVersion() - ) + logger.warn( + () -> format( + "Cluster state version [%s] is not compatible with this Elasticsearch node", + reservedStateVersion.minCompatibleVersion() ) ); return false; } if (existingMetadata != null && existingMetadata.version() >= reservedStateVersion.version()) { - errorListener.accept( - new ReservedStateVersion.IncompatibleVersionException( - format( - "Not updating cluster state because version [%s] is less or equal to the current metadata version [%s]", - reservedStateVersion.version(), - existingMetadata.version() - ) + logger.warn( + () -> format( + "Not updating cluster state because version [%s] is less or equal to the current metadata version [%s]", + reservedStateVersion.version(), + existingMetadata.version() ) ); return false; diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTaskExecutor.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTaskExecutor.java index 9e0cf82818f70..5a3d70668855b 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTaskExecutor.java @@ -37,6 +37,6 @@ public ClusterState execute(ClusterState currentState, List listener() { return listener; } - protected ClusterState execute(ClusterState state) { - ReservedStateMetadata existingMetadata = state.metadata().reservedStateMetadata().get(namespace); + protected ClusterState execute(final ClusterState currentState) { + ReservedStateMetadata existingMetadata = currentState.metadata().reservedStateMetadata().get(namespace); Map reservedState = stateChunk.state(); ReservedStateVersion reservedStateVersion = stateChunk.metadata(); var reservedMetadataBuilder = new ReservedStateMetadata.Builder(namespace).version(reservedStateVersion.version()); List errors = new ArrayList<>(); + ClusterState state = currentState; for (var handlerName : orderedHandlers) { ReservedClusterStateHandler handler = handlers.get(handlerName); try { @@ -97,24 +98,25 @@ protected ClusterState execute(ClusterState state) { if (errors.isEmpty() == false) { // Check if we had previous error metadata with version information, don't spam with cluster state updates, if the // version hasn't been updated. + logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); if (existingMetadata != null && existingMetadata.errorMetadata() != null && existingMetadata.errorMetadata().version() >= reservedStateVersion.version()) { - logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); - throw new ReservedStateVersion.IncompatibleVersionException( - format( + logger.info( + () -> format( "Not updating error state because version [%s] is less or equal to the last state error version [%s]", reservedStateVersion.version(), existingMetadata.errorMetadata().version() ) ); + + return currentState; } errorReporter.accept( new ErrorState(namespace, reservedStateVersion.version(), errors, ReservedStateErrorMetadata.ErrorKind.VALIDATION) ); - logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); throw new IllegalStateException("Error processing state change request for " + namespace); } diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateVersion.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateVersion.java index 6ae5048d02550..c7c21b458482f 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateVersion.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateVersion.java @@ -44,15 +44,4 @@ public static ReservedStateVersion parse(XContentParser parser) { public Version minCompatibleVersion() { return compatibleWith; } - - /** - * {@link IncompatibleVersionException} is thrown when we try to update the cluster state - * without changing the update version id, or if we try to update cluster state on - * an incompatible Elasticsearch version in mixed cluster mode. - */ - public static class IncompatibleVersionException extends RuntimeException { - public IncompatibleVersionException(String message) { - super(message); - } - } } diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java index c131a5918ca09..04a6b8ac91126 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java @@ -243,7 +243,25 @@ public void onFailure(Exception failure) {} } public void testUpdateTaskDuplicateError() { - ReservedClusterStateHandler> dummy = new ReservedClusterStateHandler<>() { + ReservedClusterStateHandler> newStateMaker = new ReservedClusterStateHandler<>() { + @Override + public String name() { + return "maker"; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + ClusterState newState = new ClusterState.Builder(prevState.state()).build(); + return new TransformState(newState, prevState.keys()); + } + + @Override + public Map fromXContent(XContentParser parser) throws IOException { + return parser.map(); + } + }; + + ReservedClusterStateHandler> exceptionThrower = new ReservedClusterStateHandler<>() { @Override public String name() { return "one"; @@ -260,21 +278,22 @@ public Map fromXContent(XContentParser parser) throws IOExceptio } }; - ReservedStateUpdateTask task = spy( - new ReservedStateUpdateTask( - "namespace_one", - new ReservedStateChunk(Map.of("one", "two"), new ReservedStateVersion(1L, Version.CURRENT)), - Map.of("one", dummy), - List.of(dummy.name()), - (errorState) -> {}, - new ActionListener<>() { - @Override - public void onResponse(ActionResponse.Empty empty) {} + // We submit a task with two handler, one will cause an exception, the other will create a new state. + // When we fail to update the metadata because of version, we ensure that the returned state is equal to the + // original state by pointer reference to avoid cluster state update task to run. + ReservedStateUpdateTask task = new ReservedStateUpdateTask( + "namespace_one", + new ReservedStateChunk(Map.of("one", "two", "maker", "three"), new ReservedStateVersion(1L, Version.CURRENT)), + Map.of(exceptionThrower.name(), exceptionThrower, newStateMaker.name(), newStateMaker), + List.of(exceptionThrower.name(), newStateMaker.name()), + (errorState) -> {}, + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) {} - @Override - public void onFailure(Exception e) {} - } - ) + @Override + public void onFailure(Exception e) {} + } ); ReservedStateHandlerMetadata hmOne = new ReservedStateHandlerMetadata("one", Set.of("a", "b")); @@ -294,10 +313,8 @@ public void onFailure(Exception e) {} ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); // We exit on duplicate errors before we update the cluster state error metadata - assertEquals( - "Not updating error state because version [1] is less or equal to the last state error version [1]", - expectThrows(ReservedStateVersion.IncompatibleVersionException.class, () -> task.execute(state)).getMessage() - ); + // The reference == ensures we return the same object as the current state to avoid publishing no-op state update + assertTrue(state == task.execute(state)); emOne = new ReservedStateErrorMetadata( 0L, @@ -321,33 +338,16 @@ public void onFailure(Exception e) {} public void testCheckMetadataVersion() { ReservedStateMetadata operatorMetadata = ReservedStateMetadata.builder("test").version(123L).build(); - assertTrue( - ReservedClusterStateService.checkMetadataVersion(operatorMetadata, new ReservedStateVersion(124L, Version.CURRENT), (e) -> {}) - ); - - AtomicReference x = new AtomicReference<>(); - - assertFalse( - ReservedClusterStateService.checkMetadataVersion( - operatorMetadata, - new ReservedStateVersion(123L, Version.CURRENT), - (e) -> x.set(e) - ) - ); + assertTrue(ReservedClusterStateService.checkMetadataVersion(operatorMetadata, new ReservedStateVersion(124L, Version.CURRENT))); - assertTrue(x.get() instanceof ReservedStateVersion.IncompatibleVersionException); - assertTrue(x.get().getMessage().contains("is less or equal to the current metadata version")); + assertFalse(ReservedClusterStateService.checkMetadataVersion(operatorMetadata, new ReservedStateVersion(123L, Version.CURRENT))); assertFalse( ReservedClusterStateService.checkMetadataVersion( operatorMetadata, - new ReservedStateVersion(124L, Version.fromId(Version.CURRENT.id + 1)), - (e) -> x.set(e) + new ReservedStateVersion(124L, Version.fromId(Version.CURRENT.id + 1)) ) ); - - assertEquals(ReservedStateVersion.IncompatibleVersionException.class, x.get().getClass()); - assertTrue(x.get().getMessage().contains("is not compatible with this Elasticsearch node")); } private ReservedClusterStateHandler> makeHandlerHelper(final String name, final List deps) { From 439c42aa52666c4ebfc81ad37bddc874dddaacc3 Mon Sep 17 00:00:00 2001 From: Nikola Grcevski Date: Thu, 14 Jul 2022 19:50:12 -0400 Subject: [PATCH 5/5] Add namespace to log messages --- .../service/ReservedClusterStateService.java | 17 ++++++++++++----- .../ReservedClusterStateServiceTests.java | 9 +++++++-- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java index cd4c3a6058975..1c9bffc269de7 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java @@ -145,7 +145,7 @@ public void process(String namespace, ReservedStateChunk reservedStateChunk, Con ClusterState state = clusterService.state(); ReservedStateMetadata existingMetadata = state.metadata().reservedStateMetadata().get(namespace); - if (checkMetadataVersion(existingMetadata, reservedStateVersion) == false) { + if (checkMetadataVersion(namespace, existingMetadata, reservedStateVersion) == false) { return; } @@ -177,12 +177,17 @@ public void onFailure(Exception e) { } // package private for testing - static boolean checkMetadataVersion(ReservedStateMetadata existingMetadata, ReservedStateVersion reservedStateVersion) { + static boolean checkMetadataVersion( + String namespace, + ReservedStateMetadata existingMetadata, + ReservedStateVersion reservedStateVersion + ) { if (Version.CURRENT.before(reservedStateVersion.minCompatibleVersion())) { logger.warn( () -> format( - "Cluster state version [%s] is not compatible with this Elasticsearch node", - reservedStateVersion.minCompatibleVersion() + "Reserved cluster state version [%s] for namespace [%s] is not compatible with this Elasticsearch node", + reservedStateVersion.minCompatibleVersion(), + namespace ) ); return false; @@ -191,7 +196,9 @@ static boolean checkMetadataVersion(ReservedStateMetadata existingMetadata, Rese if (existingMetadata != null && existingMetadata.version() >= reservedStateVersion.version()) { logger.warn( () -> format( - "Not updating cluster state because version [%s] is less or equal to the current metadata version [%s]", + "Not updating reserved cluster state for namespace [%s], because version [%s] is less or equal" + + " to the current metadata version [%s]", + namespace, reservedStateVersion.version(), existingMetadata.version() ) diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java index 04a6b8ac91126..eaf55cdb7c8c3 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java @@ -338,12 +338,17 @@ public void onFailure(Exception e) {} public void testCheckMetadataVersion() { ReservedStateMetadata operatorMetadata = ReservedStateMetadata.builder("test").version(123L).build(); - assertTrue(ReservedClusterStateService.checkMetadataVersion(operatorMetadata, new ReservedStateVersion(124L, Version.CURRENT))); + assertTrue( + ReservedClusterStateService.checkMetadataVersion("operator", operatorMetadata, new ReservedStateVersion(124L, Version.CURRENT)) + ); - assertFalse(ReservedClusterStateService.checkMetadataVersion(operatorMetadata, new ReservedStateVersion(123L, Version.CURRENT))); + assertFalse( + ReservedClusterStateService.checkMetadataVersion("operator", operatorMetadata, new ReservedStateVersion(123L, Version.CURRENT)) + ); assertFalse( ReservedClusterStateService.checkMetadataVersion( + "operator", operatorMetadata, new ReservedStateVersion(124L, Version.fromId(Version.CURRENT.id + 1)) )