From 336a7e3ebeae1a4d530d75d51e48f98ba29b52c8 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 12 Mar 2025 15:57:47 +1100 Subject: [PATCH 1/4] Combine cluster and project tasks in xcontent for single project This PR combines both cluster and project tasks under persistent_tasks for XContent output of Metadata when it contains only a single project, i.e. there will be no cluster_persistent_tasks in such output. This is to maintain the existing output format when the cluster is not multi-project enabled. Relates: MP-1945 --- .../cluster/metadata/Metadata.java | 28 ++- .../cluster/metadata/ProjectMetadata.java | 17 +- .../cluster/metadata/MetadataTests.java | 175 +++++++++++++++--- 3 files changed, 181 insertions(+), 39 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index 55c0c29247b5f..e037ec356a15e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -744,7 +744,26 @@ public Iterator toXContentChunked(ToXContent.Params p) { // and not include it in the project xcontent output (through the lack of multi-project params) clusterReservedState.putAll(project.reservedStateMetadata()); - @FixForMultiProject(description = "consider include cluster-scoped persistent tasks") + // Similarly, combine cluster and project persistent tasks and report them under a single key + Iterator customs = Iterators.flatMap(customs().entrySet().iterator(), entry -> { + if (entry.getValue().context().contains(context) + && ClusterPersistentTasksCustomMetadata.TYPE.equals(entry.getKey()) == false) { + return ChunkedToXContentHelper.object(entry.getKey(), entry.getValue().toXContentChunked(p)); + } else { + return Collections.emptyIterator(); + } + }); + final var combinedTasks = PersistentTasksCustomMetadata.combine( + ClusterPersistentTasksCustomMetadata.get(this), + PersistentTasksCustomMetadata.get(project) + ); + if (combinedTasks != null) { + customs = Iterators.concat( + customs, + ChunkedToXContentHelper.object(PersistentTasksCustomMetadata.TYPE, combinedTasks.toXContentChunked(p)) + ); + } + final var iterators = Iterators.concat(start, Iterators.single((builder, params) -> { builder.field("cluster_uuid", clusterUUID); builder.field("cluster_uuid_committed", clusterUUIDCommitted); @@ -754,12 +773,7 @@ public Iterator toXContentChunked(ToXContent.Params p) { }), persistentSettings, project.toXContentChunked(p), - Iterators.flatMap( - customs.entrySet().iterator(), - entry -> entry.getValue().context().contains(context) - ? ChunkedToXContentHelper.object(entry.getKey(), entry.getValue().toXContentChunked(p)) - : Collections.emptyIterator() - ), + customs, ChunkedToXContentHelper.object("reserved_state", clusterReservedState.values().iterator()), ChunkedToXContentHelper.endObject() ); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java index fd7472ebc2b0d..3176becc628e2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.plugins.FieldPredicate; import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.transport.Transports; @@ -2111,14 +2112,16 @@ public Iterator toXContentChunked(ToXContent.Params p) { ? ChunkedToXContentHelper.object("indices", indices().values().iterator()) : Collections.emptyIterator(); - Iterator customs = Iterators.flatMap( - customs().entrySet().iterator(), - entry -> entry.getValue().context().contains(context) - ? ChunkedToXContentHelper.object(entry.getKey(), entry.getValue().toXContentChunked(p)) - : Collections.emptyIterator() - ); - final var multiProject = p.paramAsBoolean("multi-project", false); + Iterator customs = Iterators.flatMap(customs().entrySet().iterator(), entry -> { + if (entry.getValue().context().contains(context) + && (multiProject || PersistentTasksCustomMetadata.TYPE.equals(entry.getKey()) == false)) { + return ChunkedToXContentHelper.object(entry.getKey(), entry.getValue().toXContentChunked(p)); + } else { + return Collections.emptyIterator(); + } + }); + return Iterators.concat( ChunkedToXContentHelper.object( "templates", diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java index 146dd13186b7e..77df14179c817 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java @@ -66,6 +66,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TransportVersionUtils; import org.elasticsearch.test.index.IndexVersionUtils; +import org.elasticsearch.test.rest.ObjectPath; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.upgrades.SystemIndexMigrationExecutor; import org.elasticsearch.upgrades.SystemIndexMigrationTaskParams; @@ -104,6 +105,7 @@ import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance; import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_API; import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_PARAM; +import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_SNAPSHOT; import static org.elasticsearch.cluster.metadata.ProjectMetadata.Builder.assertDataStreams; import static org.elasticsearch.test.LambdaMatchers.transformedItemsMatch; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent; @@ -814,6 +816,31 @@ public void testParseXContentFormatBeforeMultiProject() throws IOException { } """, IndexVersion.current(), IndexVersion.current()); + final var metadata = fromJsonXContentStringWithPersistentTasks(json); + + assertThat(metadata, notNullValue()); + assertThat(metadata.clusterUUID(), is("aba1aa1ababbbaabaabaab")); + assertThat(metadata.customs().keySet(), containsInAnyOrder("desired_nodes", "cluster_persistent_tasks")); + final var clusterTasks = ClusterPersistentTasksCustomMetadata.get(metadata); + assertThat(clusterTasks.tasks(), hasSize(1)); + assertThat( + clusterTasks.tasks().stream().map(PersistentTasksCustomMetadata.PersistentTask::getTaskName).toList(), + containsInAnyOrder("health-node") + ); + assertThat( + metadata.getProject().customs().keySet(), + containsInAnyOrder("persistent_tasks", "index-graveyard", "component_template") + ); + final var projectTasks = PersistentTasksCustomMetadata.get(metadata.getProject()); + assertThat( + projectTasks.tasks().stream().map(PersistentTasksCustomMetadata.PersistentTask::getTaskName).toList(), + containsInAnyOrder("upgrade-system-indices") + ); + assertThat(clusterTasks.getLastAllocationId(), equalTo(projectTasks.getLastAllocationId())); + + } + + private Metadata fromJsonXContentStringWithPersistentTasks(String json) throws IOException { List registry = new ArrayList<>(); registry.addAll(ClusterModule.getNamedXWriteables()); registry.addAll(IndicesModule.getNamedXContents()); @@ -840,26 +867,7 @@ public void testParseXContentFormatBeforeMultiProject() throws IOException { XContentParserConfiguration config = XContentParserConfiguration.EMPTY.withRegistry(new NamedXContentRegistry(registry)); try (XContentParser parser = JsonXContent.jsonXContent.createParser(config, json)) { - final var metatdata = Metadata.fromXContent(parser); - assertThat(metatdata, notNullValue()); - assertThat(metatdata.clusterUUID(), is("aba1aa1ababbbaabaabaab")); - assertThat(metatdata.customs().keySet(), containsInAnyOrder("desired_nodes", "cluster_persistent_tasks")); - final var clusterTasks = ClusterPersistentTasksCustomMetadata.get(metatdata); - assertThat(clusterTasks.tasks(), hasSize(1)); - assertThat( - clusterTasks.tasks().stream().map(PersistentTasksCustomMetadata.PersistentTask::getTaskName).toList(), - containsInAnyOrder("health-node") - ); - assertThat( - metatdata.getProject().customs().keySet(), - containsInAnyOrder("persistent_tasks", "index-graveyard", "component_template") - ); - final var projectTasks = PersistentTasksCustomMetadata.get(metatdata.getProject()); - assertThat( - projectTasks.tasks().stream().map(PersistentTasksCustomMetadata.PersistentTask::getTaskName).toList(), - containsInAnyOrder("upgrade-system-indices") - ); - assertThat(clusterTasks.getLastAllocationId(), equalTo(projectTasks.getLastAllocationId())); + return Metadata.fromXContent(parser); } } @@ -2655,14 +2663,117 @@ public void testMultiProjectXContent() throws IOException { } } + public void testDefaultProjectXContentWithPersistentTasks() throws IOException { + final long lastAllocationId = randomNonNegativeLong(); + final var originalMeta = Metadata.builder() + .clusterUUID(randomUUID()) + .clusterUUIDCommitted(true) + .put( + ProjectMetadata.builder(ProjectId.DEFAULT) + .putCustom( + PersistentTasksCustomMetadata.TYPE, + new PersistentTasksCustomMetadata( + lastAllocationId, + Map.of( + SystemIndexMigrationTaskParams.SYSTEM_INDEX_UPGRADE_TASK_NAME, + new PersistentTasksCustomMetadata.PersistentTask<>( + SystemIndexMigrationTaskParams.SYSTEM_INDEX_UPGRADE_TASK_NAME, + SystemIndexMigrationTaskParams.SYSTEM_INDEX_UPGRADE_TASK_NAME, + new SystemIndexMigrationTaskParams(), + lastAllocationId, + PersistentTasks.INITIAL_ASSIGNMENT + ) + ) + ) + ) + ) + .putCustom( + ClusterPersistentTasksCustomMetadata.TYPE, + new ClusterPersistentTasksCustomMetadata( + lastAllocationId + 1, + Map.of( + HealthNode.TASK_NAME, + new PersistentTasksCustomMetadata.PersistentTask<>( + HealthNode.TASK_NAME, + HealthNode.TASK_NAME, + HealthNodeTaskParams.INSTANCE, + lastAllocationId + 1, + PersistentTasks.INITIAL_ASSIGNMENT + ) + ) + ) + ) + .build(); + + // For single project metadata, XContent output should combine the cluster and project tasks + final ToXContent.Params p = new ToXContent.MapParams( + Map.ofEntries(Map.entry("multi-project", "false"), Map.entry(Metadata.CONTEXT_MODE_PARAM, CONTEXT_MODE_SNAPSHOT)) + ); + final BytesReference bytes = toXContentBytes(originalMeta, p); + + final var objectPath = ObjectPath.createFromXContent(JsonXContent.jsonXContent, bytes); + // No cluster_persistent_tasks for single project output, it is combined with persistent_tasks + assertThat(objectPath.evaluate("meta-data.cluster_persistent_tasks"), nullValue()); + // The combined lastAllocationId is the max between cluster and project tasks + assertThat(objectPath.evaluate("meta-data.persistent_tasks.last_allocation_id"), equalTo(lastAllocationId + 1)); + assertThat(objectPath.evaluate("meta-data.persistent_tasks.tasks"), hasSize(2)); + assertThat( + Set.of( + objectPath.evaluate("meta-data.persistent_tasks.tasks.0.id"), + objectPath.evaluate("meta-data.persistent_tasks.tasks.1.id") + ), + equalTo(Set.of(HealthNode.TASK_NAME, SystemIndexMigrationTaskParams.SYSTEM_INDEX_UPGRADE_TASK_NAME)) + ); + + // Deserialize from the XContent should separate cluster and project tasks + final Metadata fromXContentMeta = fromJsonXContentStringWithPersistentTasks(bytes.utf8ToString()); + assertThat(fromXContentMeta.projects().keySet(), equalTo(Set.of(ProjectId.DEFAULT))); + final var clusterTasks = ClusterPersistentTasksCustomMetadata.get(fromXContentMeta); + assertThat(clusterTasks, notNullValue()); + assertThat(clusterTasks.getLastAllocationId(), equalTo(lastAllocationId + 1)); + assertThat( + clusterTasks.tasks().stream().map(PersistentTasksCustomMetadata.PersistentTask::getId).toList(), + contains(HealthNode.TASK_NAME) + ); + final var projectTasks = PersistentTasksCustomMetadata.get(fromXContentMeta.getProject(ProjectId.DEFAULT)); + assertThat(projectTasks, notNullValue()); + assertThat(projectTasks.getLastAllocationId(), equalTo(lastAllocationId + 1)); + assertThat( + projectTasks.tasks().stream().map(PersistentTasksCustomMetadata.PersistentTask::getId).toList(), + contains(SystemIndexMigrationTaskParams.SYSTEM_INDEX_UPGRADE_TASK_NAME) + ); + } + public void testSingleNonDefaultProjectXContent() throws IOException { + final long lastAllocationId = randomNonNegativeLong(); + final var indexVersion = IndexVersion.current(); // When ClusterStateAction acts in a project scope, it returns cluster state metadata that has a single project that may // not have the default project-id. We need to be able to convert this to XContent in the Rest response final ProjectMetadata project = ProjectMetadata.builder(ProjectId.fromId("c8af967d644b3219")) - .put(IndexMetadata.builder("index-1").settings(indexSettings(IndexVersion.current(), 1, 1)).build(), false) - .put(IndexMetadata.builder("index-2").settings(indexSettings(IndexVersion.current(), 2, 2)).build(), false) + .put(IndexMetadata.builder("index-1").settings(indexSettings(indexVersion, 1, 1)).build(), false) + .put(IndexMetadata.builder("index-2").settings(indexSettings(indexVersion, 2, 2)).build(), false) + .build(); + final Metadata metadata = Metadata.builder() + .clusterUUID("afSSOgAAQAC8BuQTAAAAAA") + .clusterUUIDCommitted(true) + .put(project) + .putCustom( + ClusterPersistentTasksCustomMetadata.TYPE, + new ClusterPersistentTasksCustomMetadata( + lastAllocationId, + Map.of( + HealthNode.TASK_NAME, + new PersistentTasksCustomMetadata.PersistentTask<>( + HealthNode.TASK_NAME, + HealthNode.TASK_NAME, + HealthNodeTaskParams.INSTANCE, + lastAllocationId, + PersistentTasks.INITIAL_ASSIGNMENT + ) + ) + ) + ) .build(); - final Metadata metadata = Metadata.builder().clusterUUID("afSSOgAAQAC8BuQTAAAAAA").clusterUUIDCommitted(true).put(project).build(); final ToXContent.Params p = new ToXContent.MapParams( Map.ofEntries(Map.entry("multi-project", "false"), Map.entry(Metadata.CONTEXT_MODE_PARAM, CONTEXT_MODE_API)) ); @@ -2753,10 +2864,24 @@ public void testSingleNonDefaultProjectXContent() throws IOException { "index-graveyard": { "tombstones": [] }, - "reserved_state": {} + "reserved_state": {}, + "persistent_tasks": { + "last_allocation_id": %s, + "tasks": [ + { + "id": "health-node", + "task": { "health-node": {"params":{}} }, + "assignment": { + "explanation": "waiting for initial assignment", + "executor_node": null + }, + "allocation_id": %s + } + ] + } } } - """, IndexVersion.current(), IndexVersion.current(), IndexVersion.current(), IndexVersion.current()); + """, indexVersion, indexVersion, indexVersion, indexVersion, lastAllocationId, lastAllocationId); assertToXContentEquivalent(new BytesArray(expected), toXContentBytes(metadata, p), XContentType.JSON); } From 80cbc1ace1776e1d038feee72f508f1c4d363d5a Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 12 Mar 2025 16:26:15 +1100 Subject: [PATCH 2/4] more tests --- .../cluster/metadata/MetadataTests.java | 53 +++++++++++++------ 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java index 77df14179c817..aa742bf0bd936 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java @@ -80,6 +80,7 @@ import org.hamcrest.Matchers; import java.io.IOException; +import java.io.UncheckedIOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.util.ArrayList; @@ -2643,24 +2644,44 @@ public void testMultiProjectXContent() throws IOException { ); final BytesReference bytes = toXContentBytes(originalMeta, p); - final List registry = new ArrayList<>(); - registry.addAll(ClusterModule.getNamedXWriteables()); - registry.addAll(SystemIndexMigrationExecutor.getNamedXContentParsers()); - registry.addAll(HealthNodeTaskExecutor.getNamedXContentParsers()); - final var config = XContentParserConfiguration.EMPTY.withRegistry(new NamedXContentRegistry(registry)); - - try (XContentParser parser = createParser(config, JsonXContent.jsonXContent, bytes)) { - Metadata fromXContentMeta = Metadata.fromXContent(parser); - assertThat(fromXContentMeta.projects().keySet(), equalTo(originalMeta.projects().keySet())); - for (var project : fromXContentMeta.projects().values()) { - final var projectTasks = PersistentTasksCustomMetadata.get(project); - assertThat(projectTasks.getLastAllocationId(), equalTo(lastAllocationId)); - assertThat(projectTasks.taskMap().keySet(), equalTo(Set.of(SystemIndexMigrationTaskParams.SYSTEM_INDEX_UPGRADE_TASK_NAME))); + + // XContent with multi-project=true has separate cluster and project tasks + final var objectPath = ObjectPath.createFromXContent(JsonXContent.jsonXContent, bytes); + assertThat(objectPath.evaluate("meta-data.cluster_persistent_tasks"), notNullValue()); + assertThat(objectPath.evaluate("meta-data.cluster_persistent_tasks.last_allocation_id"), equalTo(lastAllocationId + 1)); + assertThat(objectPath.evaluate("meta-data.cluster_persistent_tasks.tasks"), hasSize(1)); + assertThat(objectPath.evaluate("meta-data.cluster_persistent_tasks.tasks.0.id"), equalTo(HealthNode.TASK_NAME)); + assertThat(objectPath.evaluate("meta-data.projects"), hasSize(projects.size())); + assertThat(IntStream.range(0, projects.size()).mapToObj(i -> { + try { + return (String) objectPath.evaluate("meta-data.projects." + i + ".id"); + } catch (IOException e) { + throw new UncheckedIOException(e); } - final var clusterTasks = ClusterPersistentTasksCustomMetadata.get(fromXContentMeta); - assertThat(clusterTasks.getLastAllocationId(), equalTo(lastAllocationId + 1)); - assertThat(clusterTasks.taskMap().keySet(), equalTo(Set.of(HealthNode.TASK_NAME))); + }).collect(Collectors.toUnmodifiableSet()), + equalTo(projects.stream().map(pp -> pp.id().id()).collect(Collectors.toUnmodifiableSet())) + ); + + for (int i = 0; i < projects.size(); i++) { + assertThat(objectPath.evaluate("meta-data.projects." + i + ".persistent_tasks"), notNullValue()); + assertThat(objectPath.evaluate("meta-data.projects." + i + ".persistent_tasks.last_allocation_id"), equalTo(lastAllocationId)); + assertThat(objectPath.evaluate("meta-data.projects." + i + ".persistent_tasks.tasks"), hasSize(1)); + assertThat( + objectPath.evaluate("meta-data.projects." + i + ".persistent_tasks.tasks.0.id"), + equalTo(SystemIndexMigrationTaskParams.SYSTEM_INDEX_UPGRADE_TASK_NAME) + ); + } + + Metadata fromXContentMeta = fromJsonXContentStringWithPersistentTasks(bytes.utf8ToString()); + assertThat(fromXContentMeta.projects().keySet(), equalTo(originalMeta.projects().keySet())); + for (var project : fromXContentMeta.projects().values()) { + final var projectTasks = PersistentTasksCustomMetadata.get(project); + assertThat(projectTasks.getLastAllocationId(), equalTo(lastAllocationId)); + assertThat(projectTasks.taskMap().keySet(), equalTo(Set.of(SystemIndexMigrationTaskParams.SYSTEM_INDEX_UPGRADE_TASK_NAME))); } + final var clusterTasks = ClusterPersistentTasksCustomMetadata.get(fromXContentMeta); + assertThat(clusterTasks.getLastAllocationId(), equalTo(lastAllocationId + 1)); + assertThat(clusterTasks.taskMap().keySet(), equalTo(Set.of(HealthNode.TASK_NAME))); } public void testDefaultProjectXContentWithPersistentTasks() throws IOException { From 15276b49b5859a26cdaa815abccf2e614235437f Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 12 Mar 2025 16:27:16 +1100 Subject: [PATCH 3/4] tweak --- .../java/org/elasticsearch/cluster/metadata/MetadataTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java index aa742bf0bd936..c885a9d0b6f93 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java @@ -838,7 +838,6 @@ public void testParseXContentFormatBeforeMultiProject() throws IOException { containsInAnyOrder("upgrade-system-indices") ); assertThat(clusterTasks.getLastAllocationId(), equalTo(projectTasks.getLastAllocationId())); - } private Metadata fromJsonXContentStringWithPersistentTasks(String json) throws IOException { From aa9d54c3a9ed8847a44fcca1b07141d99ca9cd17 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 12 Mar 2025 23:31:49 +1100 Subject: [PATCH 4/4] comment --- .../org/elasticsearch/cluster/metadata/ProjectMetadata.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java index 3176becc628e2..42f50bd2e7917 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java @@ -2115,6 +2115,8 @@ public Iterator toXContentChunked(ToXContent.Params p) { final var multiProject = p.paramAsBoolean("multi-project", false); Iterator customs = Iterators.flatMap(customs().entrySet().iterator(), entry -> { if (entry.getValue().context().contains(context) + // Include persistent tasks in the output only when multi-project=true. + // In single-project-mode (multi-project=false), we already output them in Metadata. && (multiProject || PersistentTasksCustomMetadata.TYPE.equals(entry.getKey()) == false)) { return ChunkedToXContentHelper.object(entry.getKey(), entry.getValue().toXContentChunked(p)); } else {