From f112a46cddded11c371a9b903107fb28f165faed Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 20 Mar 2025 17:31:59 +1100 Subject: [PATCH 01/10] wip --- .../snapshots/RepositoriesIT.java | 4 +- .../org/elasticsearch/TransportVersions.java | 1 + .../elasticsearch/cluster/ClusterModule.java | 4 +- .../cluster/metadata/Metadata.java | 54 +++++++++++++++++-- .../cluster/metadata/ProjectMetadata.java | 11 +++- .../metadata/RepositoriesMetadata.java | 13 +++-- .../cluster/metadata/MetadataTests.java | 2 - .../metadata/ProjectMetadataTests.java | 2 + ...epositoriesMetadataSerializationTests.java | 14 ++--- .../AbstractSnapshotIntegTestCase.java | 2 +- .../SearchableSnapshots.java | 2 +- 11 files changed, 85 insertions(+), 24 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java index 562f752b82220..55a1aca166fcc 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java @@ -83,7 +83,7 @@ public void testRepositoryCreation() throws Exception { .setMetadata(true) .get(); Metadata metadata = clusterStateResponse.getState().getMetadata(); - RepositoriesMetadata repositoriesMetadata = metadata.custom(RepositoriesMetadata.TYPE); + RepositoriesMetadata repositoriesMetadata = metadata.getProject().custom(RepositoriesMetadata.TYPE); assertThat(repositoriesMetadata, notNullValue()); assertThat(repositoriesMetadata.repository("test-repo-1"), notNullValue()); assertThat(repositoriesMetadata.repository("test-repo-1").type(), equalTo("fs")); @@ -94,7 +94,7 @@ public void testRepositoryCreation() throws Exception { logger.info("--> check that both repositories are in cluster state"); clusterStateResponse = client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).clear().setMetadata(true).get(); metadata = clusterStateResponse.getState().getMetadata(); - repositoriesMetadata = metadata.custom(RepositoriesMetadata.TYPE); + repositoriesMetadata = metadata.getProject().custom(RepositoriesMetadata.TYPE); assertThat(repositoriesMetadata, notNullValue()); assertThat(repositoriesMetadata.repositories().size(), equalTo(2)); assertThat(repositoriesMetadata.repository("test-repo-1"), notNullValue()); diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 416abedf69809..2fe90d9d3a571 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -193,6 +193,7 @@ static TransportVersion def(int id) { public static final TransportVersion INTRODUCE_LIFECYCLE_TEMPLATE = def(9_033_0_00); public static final TransportVersion INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD = def(9_034_0_00); public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL = def(9_035_0_00); + public static final TransportVersion REPOSITORIES_METADATA_AS_PROJECT_CUSTOM = def(9_036_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 1e982ca25e1a4..7ca70b8f262b1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -224,7 +224,7 @@ public static List getNamedWriteables() { RepositoryCleanupInProgress::readDiffFrom ); // Metadata - registerMetadataCustom(entries, RepositoriesMetadata.TYPE, RepositoriesMetadata::new, RepositoriesMetadata::readDiffFrom); + registerProjectCustom(entries, RepositoriesMetadata.TYPE, RepositoriesMetadata::new, RepositoriesMetadata::readDiffFrom); registerProjectCustom(entries, IngestMetadata.TYPE, IngestMetadata::new, IngestMetadata::readDiffFrom); registerProjectCustom(entries, ScriptMetadata.TYPE, ScriptMetadata::new, ScriptMetadata::readDiffFrom); registerProjectCustom(entries, IndexGraveyard.TYPE, IndexGraveyard::new, IndexGraveyard::readDiffFrom); @@ -279,7 +279,7 @@ public static List getNamedXWriteables() { // Metadata entries.add( new NamedXContentRegistry.Entry( - Metadata.ClusterCustom.class, + Metadata.ProjectCustom.class, new ParseField(RepositoriesMetadata.TYPE), RepositoriesMetadata::fromXContent ) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index 33fe2ad01c9dd..8107e764edb3c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -54,6 +54,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -1178,7 +1179,18 @@ public static Metadata readFrom(StreamInput in) throws IOException { builder.put(ReservedStateMetadata.readFrom(in)); } } else { - readClusterCustoms(in, builder); + final boolean beforeRepositoriesMetadataMigration = in.getTransportVersion() + .before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM); + List projectCustoms = List.of(); + if (beforeRepositoriesMetadataMigration) { + projectCustoms = new ArrayList<>(); + readBwcCustoms(in, builder, projectCustoms::add); + assert projectCustoms.size() <= 1 + : "expect only a single custom for repository metadata, but got " + + projectCustoms.stream().map(ProjectCustom::getWriteableName).toList(); + } else { + readClusterCustoms(in, builder); + } int reservedStateSize = in.readVInt(); for (int i = 0; i < reservedStateSize; i++) { @@ -1186,11 +1198,21 @@ public static Metadata readFrom(StreamInput in) throws IOException { } builder.projectMetadata(in.readMap(ProjectId::readFrom, ProjectMetadata::readFrom)); + + if (projectCustoms.isEmpty() == false) { + projectCustoms.forEach( + projectCustom -> builder.getProject(ProjectId.DEFAULT).putCustom(projectCustom.getWriteableName(), projectCustom) + ); + } } return builder.build(); } private static void readBwcCustoms(StreamInput in, Builder builder) throws IOException { + readBwcCustoms(in, builder, projectCustom -> builder.putProjectCustom(projectCustom.getWriteableName(), projectCustom)); + } + + private static void readBwcCustoms(StreamInput in, Builder builder, Consumer projectCustomConsumer) throws IOException { final Set clusterScopedNames = in.namedWriteableRegistry().getReaders(ClusterCustom.class).keySet(); final Set projectScopedNames = in.namedWriteableRegistry().getReaders(ProjectCustom.class).keySet(); final int count = in.readVInt(); @@ -1206,9 +1228,9 @@ private static void readBwcCustoms(StreamInput in, Builder builder) throws IOExc if (custom instanceof PersistentTasksCustomMetadata persistentTasksCustomMetadata) { final var tuple = persistentTasksCustomMetadata.split(); builder.putCustom(tuple.v1().getWriteableName(), tuple.v1()); - builder.putProjectCustom(tuple.v2().getWriteableName(), tuple.v2()); + projectCustomConsumer.accept(custom); } else { - builder.putProjectCustom(custom.getWriteableName(), custom); + projectCustomConsumer.accept(custom); } } else { throw new IllegalArgumentException("Unknown custom name [" + name + "]"); @@ -1275,12 +1297,36 @@ public void writeTo(StreamOutput out) throws IOException { combinedMetadata.addAll(singleProject.reservedStateMetadata().values()); out.writeCollection(combinedMetadata); } else { - VersionedNamedWriteable.writeVersionedWriteables(out, customs.values()); + final boolean beforeRepositoriesMetadataMigration = out.getTransportVersion() + .before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM); + if (beforeRepositoriesMetadataMigration) { + if (isSingleProject() || noRepositoryExceptForDefaultProject(projects().values())) { + final List combinedCustoms = new ArrayList<>(customs.size() + 1); + combinedCustoms.addAll(customs.values()); + final ProjectCustom custom = getProject(ProjectId.DEFAULT).custom(RepositoriesMetadata.TYPE); + if (custom != null) { + combinedCustoms.add(custom); + } + VersionedNamedWriteable.writeVersionedWriteables(out, combinedCustoms); + } else { + throw new UnsupportedOperationException( + "Cannot serialize metadata with multiple projects to an output of version before repositories metadata migration" + ); + } + } else { + VersionedNamedWriteable.writeVersionedWriteables(out, customs.values()); + } + out.writeCollection(reservedStateMetadata.values()); out.writeMap(projectMetadata, StreamOutput::writeWriteable, StreamOutput::writeWriteable); } } + private static boolean noRepositoryExceptForDefaultProject(Collection projects) { + return projects.stream() + .allMatch(project -> ProjectId.DEFAULT.equals(project.id()) || project.custom(RepositoriesMetadata.TYPE) == null); + } + public static Builder builder() { return new Builder(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java index 85d8a81aa126d..b31f763c9b017 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java @@ -10,6 +10,7 @@ package org.elasticsearch.cluster.metadata; import org.apache.lucene.util.CollectionUtil; +import org.elasticsearch.TransportVersions; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.Diffable; import org.elasticsearch.cluster.DiffableUtils; @@ -2196,7 +2197,15 @@ public void writeTo(StreamOutput out) throws IOException { indexMetadata.writeTo(out, true); } out.writeCollection(templates.values()); - VersionedNamedWriteable.writeVersionedWriteables(out, customs.values()); + Collection filteredCustoms = customs.values(); + if (out.getTransportVersion().before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM)) { + if (custom(RepositoriesMetadata.TYPE) != null) { + assert ProjectId.DEFAULT.equals(id) + : "Only default project can have repositories metadata. Otherwise the code should have thrown before it reaches here"; + filteredCustoms = filteredCustoms.stream().filter(custom -> custom instanceof RepositoriesMetadata == false).toList(); + } + } + VersionedNamedWriteable.writeVersionedWriteables(out, filteredCustoms); out.writeCollection(reservedStateMetadata.values()); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetadata.java index 21615886ecd09..d8fd04b701d2b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetadata.java @@ -37,7 +37,7 @@ /** * Contains metadata about registered snapshot repositories */ -public class RepositoriesMetadata extends AbstractNamedDiffable implements Metadata.ClusterCustom { +public class RepositoriesMetadata extends AbstractNamedDiffable implements Metadata.ProjectCustom { public static final String TYPE = "repositories"; @@ -51,8 +51,13 @@ public class RepositoriesMetadata extends AbstractNamedDiffable repositories; + @Deprecated(forRemoval = true) public static RepositoriesMetadata get(ClusterState state) { - return state.metadata().custom(TYPE, EMPTY); + return get(state.metadata().getProject()); + } + + public static RepositoriesMetadata get(ProjectMetadata project) { + return project.custom(TYPE, EMPTY); } /** @@ -182,8 +187,8 @@ public RepositoriesMetadata(StreamInput in) throws IOException { this.repositories = in.readCollectionAsImmutableList(RepositoryMetadata::new); } - public static NamedDiff readDiffFrom(StreamInput in) throws IOException { - return readDiffFrom(Metadata.ClusterCustom.class, TYPE, in); + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(Metadata.ProjectCustom.class, TYPE, in); } /** diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java index c885a9d0b6f93..9fba366b2a23a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java @@ -2937,8 +2937,6 @@ public static int expectedChunkCount(ToXContent.Params params, Metadata metadata chunkCount += checkChunkSize(custom, params, 1); } else if (custom instanceof NodesShutdownMetadata nodesShutdownMetadata) { chunkCount += checkChunkSize(custom, params, 2 + nodesShutdownMetadata.getAll().size()); - } else if (custom instanceof RepositoriesMetadata repositoriesMetadata) { - chunkCount += checkChunkSize(custom, params, repositoriesMetadata.repositories().size()); } else { // could be anything, we have to just try it chunkCount += count(custom.toXContentChunked(params)); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/ProjectMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/ProjectMetadataTests.java index ee84c63a0cc37..d692756d5f4c0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/ProjectMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/ProjectMetadataTests.java @@ -371,6 +371,8 @@ static int expectedChunkCount(ToXContent.Params params, ProjectMetadata project) chunkCount += checkChunkSize(custom, params, 2 + ingestMetadata.getPipelines().size()); } else if (custom instanceof PersistentTasksCustomMetadata persistentTasksCustomMetadata) { chunkCount += checkChunkSize(custom, params, 3 + persistentTasksCustomMetadata.tasks().size()); + } else if (custom instanceof RepositoriesMetadata repositoriesMetadata) { + chunkCount += checkChunkSize(custom, params, repositoriesMetadata.repositories().size()); } else { // could be anything, we have to just try it chunkCount += count(custom.toXContentChunked(params)); diff --git a/server/src/test/java/org/elasticsearch/snapshots/RepositoriesMetadataSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/RepositoriesMetadataSerializationTests.java index d8c871d8c17ad..118849800c977 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/RepositoriesMetadataSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/RepositoriesMetadataSerializationTests.java @@ -25,10 +25,10 @@ import java.util.Comparator; import java.util.List; -public class RepositoriesMetadataSerializationTests extends ChunkedToXContentDiffableSerializationTestCase { +public class RepositoriesMetadataSerializationTests extends ChunkedToXContentDiffableSerializationTestCase { @Override - protected Metadata.ClusterCustom createTestInstance() { + protected Metadata.ProjectCustom createTestInstance() { int numberOfRepositories = randomInt(10); List entries = new ArrayList<>(); for (int i = 0; i < numberOfRepositories; i++) { @@ -50,12 +50,12 @@ protected Metadata.ClusterCustom createTestInstance() { } @Override - protected Writeable.Reader instanceReader() { + protected Writeable.Reader instanceReader() { return RepositoriesMetadata::new; } @Override - protected Metadata.ClusterCustom mutateInstance(Metadata.ClusterCustom instance) { + protected Metadata.ProjectCustom mutateInstance(Metadata.ProjectCustom instance) { List entries = new ArrayList<>(((RepositoriesMetadata) instance).repositories()); boolean addEntry = entries.isEmpty() ? true : randomBoolean(); if (addEntry) { @@ -80,7 +80,7 @@ public Settings randomSettings() { } @Override - protected Metadata.ClusterCustom makeTestChanges(Metadata.ClusterCustom testInstance) { + protected Metadata.ProjectCustom makeTestChanges(Metadata.ProjectCustom testInstance) { RepositoriesMetadata repositoriesMetadata = (RepositoriesMetadata) testInstance; List repos = new ArrayList<>(repositoriesMetadata.repositories()); if (randomBoolean() && repos.size() > 1) { @@ -99,7 +99,7 @@ protected Metadata.ClusterCustom makeTestChanges(Metadata.ClusterCustom testInst } @Override - protected Writeable.Reader> diffReader() { + protected Writeable.Reader> diffReader() { return RepositoriesMetadata::readDiffFrom; } @@ -109,7 +109,7 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() { } @Override - protected Metadata.ClusterCustom doParseInstance(XContentParser parser) throws IOException { + protected Metadata.ProjectCustom doParseInstance(XContentParser parser) throws IOException { assertEquals(XContentParser.Token.START_OBJECT, parser.nextToken()); RepositoriesMetadata repositoriesMetadata = RepositoriesMetadata.fromXContent(parser); assertEquals(XContentParser.Token.END_OBJECT, parser.currentToken()); diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index c5f943c529c5d..dc500220850e3 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -513,7 +513,7 @@ protected void assertDocCount(String index, long count) { */ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Map metadata) throws Exception { final ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState(); - final RepositoriesMetadata repositoriesMetadata = state.metadata().custom(RepositoriesMetadata.TYPE); + final RepositoriesMetadata repositoriesMetadata = state.metadata().getProject().custom(RepositoriesMetadata.TYPE); assertNotNull(repositoriesMetadata); final RepositoryMetadata initialRepoMetadata = repositoriesMetadata.repository(repoName); assertNotNull(initialRepoMetadata); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index f6a35fb98203d..0b33b67770048 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -737,7 +737,7 @@ private static final class RepositoryUuidWatcher implements ClusterStateListener @Override public void clusterChanged(ClusterChangedEvent event) { - final RepositoriesMetadata repositoriesMetadata = event.state().metadata().custom(RepositoriesMetadata.TYPE); + final RepositoriesMetadata repositoriesMetadata = event.state().metadata().getProject().custom(RepositoriesMetadata.TYPE); if (repositoriesMetadata == null) { knownUuids.clear(); return; From 3ebe4e563d00b34922b9e0bbab586e4e6482ce34 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 21 Mar 2025 22:10:40 +1100 Subject: [PATCH 02/10] diff and tests --- .../elasticsearch/cluster/DiffableUtils.java | 36 ++ .../cluster/metadata/Metadata.java | 157 +++++++- .../cluster/metadata/ProjectMetadata.java | 11 + .../MetadataRepositoriesMetadataTests.java | 364 ++++++++++++++++++ 4 files changed, 552 insertions(+), 16 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/cluster/metadata/MetadataRepositoriesMetadataTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java b/server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java index be748a578256e..cfa281bd96b76 100644 --- a/server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java +++ b/server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -98,6 +99,18 @@ public static > MapDiff emptyDiff() { return (MapDiff) EMPTY; } + /** + * Merges two map diffs into one unified diff with write-only value serializer. + */ + @SuppressWarnings("unchecked") + public static , T1 extends T, T2 extends T, M extends Map> MapDiff merge( + MapDiff> diff1, + MapDiff> diff2, + KeySerializer keySerializer + ) { + return merge(diff1, diff2, keySerializer, DiffableValueSerializer.getWriteOnlyInstance()); + } + /** * Merges two map diffs into one unified diff. */ @@ -146,6 +159,29 @@ public static > boolean hasKey(MapDiff diff, return false; } + public static > MapDiff updateDiffsAndUpsertsForKey( + MapDiff diff, + Predicate keyPredicate, + BiFunction, Diff> diffUpdateFunction, + BiFunction upsertUpdateFunction + ) { + final var newDiffs = diff.getDiffs().stream().map(entry -> { + if (keyPredicate.test(entry.getKey()) == false) { + return entry; + } + return Map.entry(entry.getKey(), diffUpdateFunction.apply(entry.getKey(), entry.getValue())); + }).toList(); + + final var newUpserts = diff.getUpserts().stream().map(entry -> { + if (keyPredicate.test(entry.getKey()) == false) { + return entry; + } + return Map.entry(entry.getKey(), upsertUpdateFunction.apply(entry.getKey(), entry.getValue())); + }).toList(); + + return new MapDiff<>(diff.keySerializer, diff.valueSerializer, diff.deletes, newDiffs, newUpserts, diff.builderCtor); + } + /** * Creates a MapDiff that applies a single entry diff to a map */ diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index 8107e764edb3c..6cf6349237759 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.TransportVersions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.Diff; @@ -953,11 +954,10 @@ private MetadataDiff(StreamInput in) throws IOException { multiProject = null; } else { fromNodeBeforeMultiProjectsSupport = false; - clusterCustoms = DiffableUtils.readImmutableOpenMapDiff( - in, - DiffableUtils.getStringKeySerializer(), - CLUSTER_CUSTOM_VALUE_SERIALIZER - ); + final var bwcCustoms = maybeReadBwcCustoms(in); + clusterCustoms = bwcCustoms.v1(); + final var defaultProjectCustoms = bwcCustoms.v2(); + reservedStateMetadata = DiffableUtils.readImmutableOpenMapDiff( in, DiffableUtils.getStringKeySerializer(), @@ -965,15 +965,59 @@ private MetadataDiff(StreamInput in) throws IOException { ); singleProject = null; - multiProject = DiffableUtils.readJdkMapDiff( - in, - PROJECT_ID_SERIALIZER, - ProjectMetadata::readFrom, - ProjectMetadata.ProjectMetadataDiff::new + multiProject = readMultiProjectDiffs(in, defaultProjectCustoms); + } + } + + private static + Tuple< + MapDiff>, + MapDiff>> + maybeReadBwcCustoms(StreamInput in) throws IOException { + if (in.getTransportVersion().before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM)) { + return readBwcCustoms(in); + } else { + return new Tuple<>( + DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), CLUSTER_CUSTOM_VALUE_SERIALIZER), + null ); } } + @SuppressWarnings("unchecked") + private static MapDiff> readMultiProjectDiffs( + StreamInput in, + MapDiff> defaultProjectCustoms + ) throws IOException { + final var multiProject = DiffableUtils.readJdkMapDiff( + in, + PROJECT_ID_SERIALIZER, + ProjectMetadata::readFrom, + ProjectMetadata.ProjectMetadataDiff::new + ); + + if (defaultProjectCustoms != null && defaultProjectCustoms.isEmpty() == false) { + return DiffableUtils.updateDiffsAndUpsertsForKey(multiProject, ProjectId.DEFAULT::equals, (k, v) -> { + assert ProjectId.DEFAULT.equals(k) : k; + assert v instanceof ProjectMetadata.ProjectMetadataDiff : v; + final var projectMetadataDiff = (ProjectMetadata.ProjectMetadataDiff) v; + return projectMetadataDiff.withCustoms( + DiffableUtils.merge( + projectMetadataDiff.customs(), + defaultProjectCustoms, + DiffableUtils.getStringKeySerializer(), + BWC_CUSTOM_VALUE_SERIALIZER + ) + ); + }, (k, v) -> { + assert ProjectId.DEFAULT.equals(k) : k; + return ProjectMetadata.builder(v).clearCustoms().customs(defaultProjectCustoms.apply(v.customs())).build(); + }); + } else { + return multiProject; + } + } + @SuppressWarnings("unchecked") private static Tuple< @@ -1017,17 +1061,98 @@ public void writeTo(StreamOutput out) throws IOException { buildUnifiedCustomDiff().writeTo(out); buildUnifiedReservedStateMetadataDiff().writeTo(out); } else { - clusterCustoms.writeTo(out); - reservedStateMetadata.writeTo(out); - if (multiProject != null) { - multiProject.writeTo(out); + final var multiProjectForWrite = multiProject != null + ? multiProject + : DiffableUtils.singleEntryDiff(DEFAULT_PROJECT_ID, singleProject, PROJECT_ID_SERIALIZER); + + if (out.getTransportVersion().before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM)) { + writeDiffBeforeRepositoriesMetadataMigration(out, clusterCustoms, multiProjectForWrite, reservedStateMetadata); } else { - // construct the MapDiff to write out this single project - DiffableUtils.singleEntryDiff(DEFAULT_PROJECT_ID, singleProject, PROJECT_ID_SERIALIZER).writeTo(out); + clusterCustoms.writeTo(out); + reservedStateMetadata.writeTo(out); + multiProjectForWrite.writeTo(out); } } } + @SuppressWarnings({ "rawtypes", "unchecked" }) + private static void writeDiffBeforeRepositoriesMetadataMigration( + StreamOutput out, + MapDiff> clusterCustoms, + MapDiff> multiProject, + MapDiff> reservedStateMetadata + ) throws IOException { + assert out.getTransportVersion().onOrAfter(TransportVersions.MULTI_PROJECT) + && out.getTransportVersion().before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM) : out.getTransportVersion(); + + final var combineClustersCustoms = new SetOnce>>(); + + final var updatedMultiProject = DiffableUtils.updateDiffsAndUpsertsForKey(multiProject, ignore -> true, (k, v) -> { + assert v instanceof ProjectMetadata.ProjectMetadataDiff : v; + final var projectMetadataDiff = (ProjectMetadata.ProjectMetadataDiff) v; + final var bwcCustoms = DiffableUtils.split( + projectMetadataDiff.customs(), + RepositoriesMetadata.TYPE::equals, + PROJECT_CUSTOM_VALUE_SERIALIZER, + type -> RepositoriesMetadata.TYPE.equals(type) == false, + PROJECT_CUSTOM_VALUE_SERIALIZER + ); + if (bwcCustoms.v1().isEmpty()) { + return projectMetadataDiff; + } + + if (ProjectId.DEFAULT.equals(k) == false) { + throwForVersionBeforeRepositoriesMetadataMigration(out); + } + + combineClustersCustoms.set( + DiffableUtils.>merge( + clusterCustoms, + bwcCustoms.v1(), + DiffableUtils.getStringKeySerializer() + ) + ); + return projectMetadataDiff.withCustoms(bwcCustoms.v2()); + }, (k, v) -> { + final ProjectCustom projectCustom = v.customs().get(RepositoriesMetadata.TYPE); + if (projectCustom == null) { + return v; + } + + if (ProjectId.DEFAULT.equals(k) == false) { + throwForVersionBeforeRepositoriesMetadataMigration(out); + } + + combineClustersCustoms.set( + DiffableUtils.>merge( + clusterCustoms, + DiffableUtils.singleUpsertDiff(RepositoriesMetadata.TYPE, projectCustom, DiffableUtils.getStringKeySerializer()), + DiffableUtils.getStringKeySerializer() + ) + ); + return ProjectMetadata.builder(v).removeCustom(RepositoriesMetadata.TYPE).build(); + }); + + if (combineClustersCustoms.get() != null) { + combineClustersCustoms.get().writeTo(out); + } else { + clusterCustoms.writeTo(out); + } + + reservedStateMetadata.writeTo(out); + updatedMultiProject.writeTo(out); + } + + private static void throwForVersionBeforeRepositoriesMetadataMigration(StreamOutput out) { + throw new UnsupportedOperationException( + "Serialize a diff containing per-project repository requires version on or after [" + + TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM + + "], but got [" + + out.getTransportVersion() + + "]" + ); + } + @SuppressWarnings("unchecked") private Diff> buildUnifiedCustomDiff() { assert multiProject == null : "should only be used for single project metadata"; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java index b31f763c9b017..38029dbfdb8b7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java @@ -1481,6 +1481,11 @@ public Builder removeCustomIf(BiPredicate customs) { customs.forEach((key, value) -> Objects.requireNonNull(value, key)); this.customs.putAllFromMap(customs); @@ -2316,6 +2321,12 @@ public ProjectMetadata apply(ProjectMetadata part) { } return builder.build(true); } + + ProjectMetadataDiff withCustoms( + DiffableUtils.MapDiff> customs + ) { + return new ProjectMetadataDiff(indices, templates, customs, reservedStateMetadata); + } } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataRepositoriesMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataRepositoriesMetadataTests.java new file mode 100644 index 0000000000000..a83337cd02ba9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataRepositoriesMetadataTests.java @@ -0,0 +1,364 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.cluster.AbstractNamedDiffable; +import org.elasticsearch.cluster.ClusterModule; +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.TransportVersionUtils; +import org.elasticsearch.xcontent.ToXContent; +import org.junit.Before; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.List; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class MetadataRepositoriesMetadataTests extends ESTestCase { + + private NamedWriteableRegistry namedWriteableRegistry; + private NamedWriteableRegistry namedWriteableRegistryBwc; + + @Before + public void initializeRegistries() { + namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); + namedWriteableRegistryBwc = new NamedWriteableRegistry( + Stream.concat( + ClusterModule.getNamedWriteables().stream().filter(entry -> entry.name.equals(RepositoriesMetadata.TYPE) == false), + Stream.of( + new NamedWriteableRegistry.Entry( + Metadata.ClusterCustom.class, + RepositoriesMetadata.TYPE, + TestBwcRepositoryMetadata::new + ), + new NamedWriteableRegistry.Entry(NamedDiff.class, RepositoriesMetadata.TYPE, TestBwcRepositoryMetadata::readDiffFrom) + ) + ).toList() + ); + } + + public void testRepositoriesMetadataSerialization() throws IOException { + final Metadata orig = randomMetadata(between(0, 5)); + + final BytesStreamOutput out = new BytesStreamOutput(); + orig.writeTo(out); + + final Metadata fromStream = Metadata.readFrom( + new NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistry) + ); + + assertTrue(Metadata.isGlobalStateEquals(orig, fromStream)); + } + + public void testRepositoriesMetadataDiffSerialization() throws IOException { + final Tuple tuple = randomMetadataAndUpdate(between(0, 5)); + final Metadata before = tuple.v1(); + final Metadata after = tuple.v2(); + + final Diff diff = after.diff(before); + + final BytesStreamOutput out = new BytesStreamOutput(); + diff.writeTo(out); + + final Diff diffFromStream = Metadata.readDiffFrom( + new NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistry) + ); + final Metadata metadataFromDiff = diffFromStream.apply(before); + + assertTrue(Metadata.isGlobalStateEquals(after, metadataFromDiff)); + } + + public void testRepositoriesMetadataSerializationBwc() throws IOException { + final Metadata orig = randomMetadata(between(0, 5), -1); + + final BytesStreamOutput out = new BytesStreamOutput(); + final var oldVersion = TransportVersionUtils.randomVersionBetween( + random(), + TransportVersions.MULTI_PROJECT, + TransportVersionUtils.getPreviousVersion(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM) + ); + out.setTransportVersion(oldVersion); + orig.writeTo(out); + + // Round-trip from new-node writes to old-stream and new-node reads from old-stream + final var in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistry); + in.setTransportVersion(oldVersion); + final Metadata fromStream = Metadata.readFrom(in); + assertTrue(Metadata.isGlobalStateEquals(orig, fromStream)); + + // Simulate new-node writes to old-stream and old-node reads from old-stream + simulateReadOnOldNode(out.bytes(), oldVersion, orig); + } + + private void simulateReadOnOldNode(BytesReference bytesReference, TransportVersion oldVersion, Metadata orig) throws IOException { + final var in = new NamedWriteableAwareStreamInput(bytesReference.streamInput(), namedWriteableRegistryBwc); + in.setTransportVersion(oldVersion); + final Metadata fromStream = Metadata.readFrom(in); + assertMetadataBwcEquals(fromStream, orig); + } + + public void testRepositoriesMetadataDiffSerializationBwc() throws IOException { + final Tuple tuple = randomMetadataAndUpdate(between(0, 5), -1); + final Metadata before = tuple.v1(); + final Metadata after = tuple.v2(); + final Diff diff = after.diff(before); + + final BytesStreamOutput out = new BytesStreamOutput(); + final var oldVersion = TransportVersionUtils.randomVersionBetween( + random(), + TransportVersions.MULTI_PROJECT, + TransportVersionUtils.getPreviousVersion(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM) + ); + out.setTransportVersion(oldVersion); + diff.writeTo(out); + + // Round-trip from new-node writes diff to old-stream and new-node reads from old-stream + final var in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistry); + in.setTransportVersion(oldVersion); + final Diff diffFromStream = Metadata.readDiffFrom(in); + final Metadata metadataFromDiff = diffFromStream.apply(before); + assertTrue(Metadata.isGlobalStateEquals(after, metadataFromDiff)); + + // Simulate new-node writes diff to old-stream and old-node reads from old-stream + simulateReadAndApplyDiffOnOldNode(out.bytes(), oldVersion, before, after); + } + + // Simulate the deserialization and application of the diff on an old node + private void simulateReadAndApplyDiffOnOldNode( + BytesReference bytesReference, + TransportVersion oldVersion, + Metadata before, + Metadata after + ) throws IOException { + final var in = new NamedWriteableAwareStreamInput(bytesReference.streamInput(), namedWriteableRegistryBwc); + in.setTransportVersion(oldVersion); + final Diff diffFromStream = Metadata.readDiffFrom(in); + + // On the old node, the "before" metadata would have repositories in the Metadata#customs. We simulate + // it by move the repositories from the default project into Metadata#customs as a TestBwcRepositoryMetadata + final Metadata beforeBwc = Metadata.builder(before) + .putCustom( + RepositoriesMetadata.TYPE, + new TestBwcRepositoryMetadata(RepositoriesMetadata.get(before.getProject(ProjectId.DEFAULT)).repositories()) + ) + .put(ProjectMetadata.builder(before.getProject(ProjectId.DEFAULT)).removeCustom(RepositoriesMetadata.TYPE)) + .build(); + // Apply the diff to the BWC "before" metadata and get the "after" metadata on an old node + final Metadata metadataFromDiff = diffFromStream.apply(beforeBwc); + + assertMetadataBwcEquals(metadataFromDiff, after); + } + + private void assertMetadataBwcEquals(Metadata metadataBwc, Metadata metadataNew) { + // BWC metadata has no repositories in project + assertThat(metadataBwc.getProject(ProjectId.DEFAULT).custom(RepositoriesMetadata.TYPE), nullValue()); + // New metadata has no repositories in Metadata#customs + assertThat(metadataNew.custom(RepositoriesMetadata.TYPE), nullValue()); + + final Metadata.ClusterCustom custom = metadataBwc.custom(RepositoriesMetadata.TYPE); + + if (custom != null) { + assertThat(custom, notNullValue()); + assertThat(custom, instanceOf(TestBwcRepositoryMetadata.class)); + assertThat( + ((TestBwcRepositoryMetadata) custom).repositories, + equalTo(RepositoriesMetadata.get(metadataNew.getProject(ProjectId.DEFAULT)).repositories()) + ); + } else { + // If there is no repositories in the deserialized metadataBwc, there should no repositories in the original metadataNew + assertThat(metadataNew.getProject(ProjectId.DEFAULT).custom(RepositoriesMetadata.TYPE), nullValue()); + } + + // All other parts excluding repositories of the two metadata are equal + assertTrue( + Metadata.isGlobalStateEquals( + Metadata.builder(metadataNew) + .put(ProjectMetadata.builder(metadataNew.getProject(ProjectId.DEFAULT)).removeCustom(RepositoriesMetadata.TYPE)) + .build(), + Metadata.builder(metadataBwc).removeCustom(RepositoriesMetadata.TYPE).build() + ) + ); + } + + private static Metadata randomMetadata(int extraProjects) { + return randomMetadata(extraProjects, between(0, 5)); + } + + /** + * Randomly create a Metadata object with the default project and extra projects as specified. The default project + * has random RepositoriesMetadata which can also be null. Each extra project also has a random RepositoriesMetadata + * containing the specified number of RepositoryMetadata. If the specified number is -1, extra projects will not have + * any RepositoriesMetadata. + * @param extraProjects Number of extra projects to create + * @param numReposPerExtraProject Number of RepositoryMetadata for each extra project. Or -1 to ensure no + * RepositoriesMetadata for extra projects. + */ + private static Metadata randomMetadata(int extraProjects, int numReposPerExtraProject) { + final Metadata.Builder builder = Metadata.builder().put(randomProject(ProjectId.DEFAULT, between(0, 5))); + IntStream.range(0, extraProjects).forEach(i -> builder.put(randomProject(randomUniqueProjectId(), numReposPerExtraProject))); + return builder.build(); + } + + private static Tuple randomMetadataAndUpdate(int extraProjects) { + return randomMetadataAndUpdate(extraProjects, between(0, 5)); + } + + /** + * Randomly generate a metadata then randomly mutates its RepositoriesMetadata in all projects. + * @param extraProjects Number of extra projects + * @param numReposPerExtraProject Number of RepositoryMetadata the RepositoriesMetadata contains per project or -1 for null. + */ + private static Tuple randomMetadataAndUpdate(int extraProjects, int numReposPerExtraProject) { + final Metadata before = randomMetadata(extraProjects, numReposPerExtraProject); + + final Metadata.Builder builder = Metadata.builder(before); + + builder.forEachProject(b -> { + final RepositoriesMetadata repositoriesMetadata = b.getCustom(RepositoriesMetadata.TYPE); + if (ProjectId.DEFAULT.equals(b.getId()) || numReposPerExtraProject > 0) { + final RepositoriesMetadata mutatedRepositoriesMetadata = mutateRepositoriesMetadata(repositoriesMetadata); + if (mutatedRepositoriesMetadata != null) { + b.putCustom(RepositoriesMetadata.TYPE, mutatedRepositoriesMetadata); + } else { + b.removeCustom(RepositoriesMetadata.TYPE); + } + } + return b; + }); + + return new Tuple<>(before, builder.build()); + } + + /** + * Randomly create a ProjectMetadata object with the given ProjectId and RepositoriesMetadata containing specified + * number of RepositoryMetadata. If the number is -1, no RepositoriesMetadata will be created. + */ + private static ProjectMetadata randomProject(ProjectId projectId, int numRepos) { + final ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); + if (numRepos < 0) { + return builder.build(); + } + final RepositoriesMetadata repositoriesMetadata = randomRepositoriesMetadata(); + if (repositoriesMetadata == null) { + return builder.build(); + } else { + return builder.putCustom(RepositoriesMetadata.TYPE, repositoriesMetadata).build(); + } + } + + @Nullable + private static RepositoriesMetadata randomRepositoriesMetadata() { + return randomRepositoriesMetadata(between(0, 5)); + } + + private static RepositoriesMetadata randomRepositoriesMetadata(int numRepos) { + if (numRepos == 0) { + return randomBoolean() ? null : RepositoriesMetadata.EMPTY; + } + return new RepositoriesMetadata(randomList(numRepos, numRepos, MetadataRepositoriesMetadataTests::randomRepositoryMetadata)); + } + + private static RepositoryMetadata randomRepositoryMetadata() { + return new RepositoryMetadata(randomIdentifier(), randomUUID(), randomAlphaOfLengthBetween(3, 8), Settings.EMPTY); + } + + private static RepositoriesMetadata mutateRepositoriesMetadata(@Nullable RepositoriesMetadata repositoriesMetadata) { + if ((repositoriesMetadata == null || repositoriesMetadata.repositories().isEmpty()) && randomBoolean()) { + return randomRepositoriesMetadata(); + } + + if (randomBoolean()) { + return randomRepositoriesMetadata(); + } + + if (repositoriesMetadata == null || repositoriesMetadata.repositories().isEmpty()) { + return randomRepositoriesMetadata(between(1, 5)); + } + + return new RepositoriesMetadata(repositoriesMetadata.repositories().stream().map(repositoryMetadata -> switch (randomInt(3)) { + case 0 -> new RepositoryMetadata(randomIdentifier(), repositoryMetadata.uuid(), repositoryMetadata.type(), Settings.EMPTY); + case 1 -> new RepositoryMetadata(repositoryMetadata.name(), randomUUID(), repositoryMetadata.type(), Settings.EMPTY); + case 2 -> new RepositoryMetadata( + repositoryMetadata.name(), + repositoryMetadata.uuid(), + randomAlphaOfLengthBetween(3, 8), + Settings.EMPTY + ); + default -> new RepositoryMetadata( + repositoryMetadata.name(), + repositoryMetadata.uuid(), + repositoryMetadata.type(), + Settings.builder().put("base_path", randomIdentifier()).build() + ); + }).toList()); + } + + public static class TestBwcRepositoryMetadata extends AbstractNamedDiffable implements Metadata.ClusterCustom { + + private final List repositories; + + public TestBwcRepositoryMetadata(List repositories) { + this.repositories = repositories; + } + + public TestBwcRepositoryMetadata(StreamInput in) throws IOException { + this.repositories = in.readCollectionAsImmutableList(RepositoryMetadata::new); + } + + @Override + public EnumSet context() { + return Metadata.API_AND_GATEWAY; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersions.MINIMUM_COMPATIBLE; + } + + @Override + public String getWriteableName() { + return RepositoriesMetadata.TYPE; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeCollection(repositories); + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(Metadata.ClusterCustom.class, RepositoriesMetadata.TYPE, in); + } + + @Override + public Iterator toXContentChunked(ToXContent.Params params) { + return null; + } + } +} From 7cb2212ef39462ccd086b08132650adb71f9b564 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 21 Mar 2025 23:36:12 +1100 Subject: [PATCH 03/10] fix test --- .../LicensesMetadataSerializationTests.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicensesMetadataSerializationTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicensesMetadataSerializationTests.java index e2218dfab1f1c..689568e8b74d3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicensesMetadataSerializationTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicensesMetadataSerializationTests.java @@ -8,8 +8,7 @@ import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.metadata.RepositoriesMetadata; -import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.xcontent.ChunkedToXContent; @@ -25,6 +24,7 @@ import org.elasticsearch.xpack.core.XPackPlugin; import java.util.Collections; +import java.util.Map; import java.util.UUID; import static org.hamcrest.Matchers.equalTo; @@ -64,14 +64,14 @@ public void testXContentSerializationOneSignedLicenseWithUsedTrial() throws Exce public void testLicenseMetadataParsingDoesNotSwallowOtherMetadata() throws Exception { License license = TestUtils.generateSignedLicense(TimeValue.timeValueHours(2)); LicensesMetadata licensesMetadata = new LicensesMetadata(license, TrialLicenseVersion.CURRENT); - RepositoryMetadata repositoryMetadata = new RepositoryMetadata("repo", "fs", Settings.EMPTY); - RepositoriesMetadata repositoriesMetadata = new RepositoriesMetadata(Collections.singletonList(repositoryMetadata)); + NodesShutdownMetadata nodesShutdownMetadata = new NodesShutdownMetadata(Map.of()); + final Metadata.Builder metadataBuilder = Metadata.builder(); if (randomBoolean()) { // random order of insertion metadataBuilder.putCustom(licensesMetadata.getWriteableName(), licensesMetadata); - metadataBuilder.putCustom(repositoriesMetadata.getWriteableName(), repositoriesMetadata); + metadataBuilder.putCustom(nodesShutdownMetadata.getWriteableName(), nodesShutdownMetadata); } else { - metadataBuilder.putCustom(repositoriesMetadata.getWriteableName(), repositoriesMetadata); + metadataBuilder.putCustom(nodesShutdownMetadata.getWriteableName(), nodesShutdownMetadata); metadataBuilder.putCustom(licensesMetadata.getWriteableName(), licensesMetadata); } // serialize metadata @@ -84,7 +84,7 @@ public void testLicenseMetadataParsingDoesNotSwallowOtherMetadata() throws Excep Metadata metadata = Metadata.Builder.fromXContent(createParser(builder)); // check that custom metadata still present assertThat(metadata.custom(licensesMetadata.getWriteableName()), notNullValue()); - assertThat(metadata.custom(repositoriesMetadata.getWriteableName()), notNullValue()); + assertThat(metadata.custom(nodesShutdownMetadata.getWriteableName()), notNullValue()); } public void testXContentSerializationOneTrial() throws Exception { From 7c66247d46574d5f3005bae5b205d68fde8afb1c Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Sat, 22 Mar 2025 00:02:15 +1100 Subject: [PATCH 04/10] fix tests --- .../main/java/org/elasticsearch/cluster/metadata/Metadata.java | 2 +- .../elasticsearch/cluster/metadata/RepositoriesMetadata.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index 6cf6349237759..7ec549e420afa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -1353,7 +1353,7 @@ private static void readBwcCustoms(StreamInput in, Builder builder, Consumer Date: Sat, 22 Mar 2025 00:23:01 +1100 Subject: [PATCH 05/10] explicit default project for repo service --- .../elasticsearch/cluster/metadata/Metadata.java | 15 +++++++++------ .../repositories/RepositoriesService.java | 6 +++--- .../blobstore/BlobStoreRepository.java | 4 +++- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index 7ec549e420afa..09f98efe53dc6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -1304,10 +1304,8 @@ public static Metadata readFrom(StreamInput in) throws IOException { builder.put(ReservedStateMetadata.readFrom(in)); } } else { - final boolean beforeRepositoriesMetadataMigration = in.getTransportVersion() - .before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM); List projectCustoms = List.of(); - if (beforeRepositoriesMetadataMigration) { + if (in.getTransportVersion().before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM)) { projectCustoms = new ArrayList<>(); readBwcCustoms(in, builder, projectCustoms::add); assert projectCustoms.size() <= 1 @@ -1422,9 +1420,7 @@ public void writeTo(StreamOutput out) throws IOException { combinedMetadata.addAll(singleProject.reservedStateMetadata().values()); out.writeCollection(combinedMetadata); } else { - final boolean beforeRepositoriesMetadataMigration = out.getTransportVersion() - .before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM); - if (beforeRepositoriesMetadataMigration) { + if (out.getTransportVersion().before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM)) { if (isSingleProject() || noRepositoryExceptForDefaultProject(projects().values())) { final List combinedCustoms = new ArrayList<>(customs.size() + 1); combinedCustoms.addAll(customs.values()); @@ -1695,6 +1691,13 @@ public Builder putCustom(String type, ProjectCustom custom) { return putProjectCustom(type, custom); } + @Deprecated(forRemoval = true) + public Builder putDefaultProjectCustom(String type, ProjectCustom custom) { + assert projectMetadata.containsKey(ProjectId.DEFAULT) : projectMetadata.keySet(); + getProject(ProjectId.DEFAULT).putCustom(type, custom); + return this; + } + public ClusterCustom getCustom(String type) { return customs.get(type); } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index ac13f132d2879..f62c50390475b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -343,7 +343,7 @@ public ClusterState execute(ClusterState currentState) { repositoriesMetadata.add(new RepositoryMetadata(request.name(), request.type(), request.settings())); } repositories = new RepositoriesMetadata(repositoriesMetadata); - mdBuilder.putCustom(RepositoriesMetadata.TYPE, repositories); + mdBuilder.putDefaultProjectCustom(RepositoriesMetadata.TYPE, repositories); changed = true; return ClusterState.builder(currentState).metadata(mdBuilder).build(); } @@ -448,7 +448,7 @@ public ClusterState execute(ClusterState currentState) { } else { final RepositoriesMetadata newReposMetadata = currentReposMetadata.withUuid(repositoryName, repositoryUuid); final Metadata.Builder metadata = Metadata.builder(currentState.metadata()) - .putCustom(RepositoriesMetadata.TYPE, newReposMetadata); + .putDefaultProjectCustom(RepositoriesMetadata.TYPE, newReposMetadata); return ClusterState.builder(currentState).metadata(metadata).build(); } } @@ -531,7 +531,7 @@ public ClusterState execute(ClusterState currentState) { } if (changed) { repositories = new RepositoriesMetadata(repositoriesMetadata); - mdBuilder.putCustom(RepositoriesMetadata.TYPE, repositories); + mdBuilder.putDefaultProjectCustom(RepositoriesMetadata.TYPE, repositories); return ClusterState.builder(currentState).metadata(mdBuilder).build(); } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index d3fae8adb466d..a369124945ae1 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -2914,7 +2914,9 @@ public ClusterState execute(ClusterState currentState) { : withGenerations.withUuid(metadata.name(), newRepositoryData.getUuid()); final ClusterState newClusterState = stateFilter.apply( ClusterState.builder(currentState) - .metadata(Metadata.builder(currentState.getMetadata()).putCustom(RepositoriesMetadata.TYPE, withUuid)) + .metadata( + Metadata.builder(currentState.getMetadata()).putDefaultProjectCustom(RepositoriesMetadata.TYPE, withUuid) + ) .build() ); return updateRepositoryGenerationsIfNecessary(newClusterState, expectedGen, newGen); From 91a97e39e6539e7ed138faba2afcf51c40fdad3a Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Sat, 22 Mar 2025 09:11:38 +1100 Subject: [PATCH 06/10] fix tests --- .../repositories/blobstore/BlobStoreRepository.java | 2 +- .../watcher/WatcherMetadataSerializationTests.java | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index a369124945ae1..85039f1b61792 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -2399,7 +2399,7 @@ private ClusterState getClusterStateWithUpdatedRepositoryGeneration(ClusterState return ClusterState.builder(currentState) .metadata( Metadata.builder(currentState.getMetadata()) - .putCustom( + .putDefaultProjectCustom( RepositoriesMetadata.TYPE, RepositoriesMetadata.get(currentState) .withUpdatedGeneration(repoMetadata.name(), repoData.getGenId(), repoData.getGenId()) diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherMetadataSerializationTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherMetadataSerializationTests.java index ff4c345bf3f6d..50d50769e30a1 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherMetadataSerializationTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherMetadataSerializationTests.java @@ -8,8 +8,7 @@ import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.metadata.RepositoriesMetadata; -import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.test.ESTestCase; @@ -22,6 +21,7 @@ import org.elasticsearch.xpack.core.watcher.WatcherMetadata; import java.util.Collections; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -48,14 +48,13 @@ public void testWatcherMetadataParsingDoesNotSwallowOtherMetadata() throws Excep new Watcher(settings); // makes sure WatcherMetadata is registered in Custom Metadata boolean manuallyStopped = randomBoolean(); WatcherMetadata watcherMetadata = new WatcherMetadata(manuallyStopped); - RepositoryMetadata repositoryMetadata = new RepositoryMetadata("repo", "fs", Settings.EMPTY); - RepositoriesMetadata repositoriesMetadata = new RepositoriesMetadata(Collections.singletonList(repositoryMetadata)); + NodesShutdownMetadata nodesShutdownMetadata = new NodesShutdownMetadata(Map.of()); final Metadata.Builder metadataBuilder = Metadata.builder(); if (randomBoolean()) { // random order of insertion metadataBuilder.putCustom(watcherMetadata.getWriteableName(), watcherMetadata); - metadataBuilder.putCustom(repositoriesMetadata.getWriteableName(), repositoriesMetadata); + metadataBuilder.putCustom(nodesShutdownMetadata.getWriteableName(), nodesShutdownMetadata); } else { - metadataBuilder.putCustom(repositoriesMetadata.getWriteableName(), repositoriesMetadata); + metadataBuilder.putCustom(nodesShutdownMetadata.getWriteableName(), nodesShutdownMetadata); metadataBuilder.putCustom(watcherMetadata.getWriteableName(), watcherMetadata); } // serialize metadata @@ -70,7 +69,7 @@ public void testWatcherMetadataParsingDoesNotSwallowOtherMetadata() throws Excep Metadata metadata = Metadata.Builder.fromXContent(createParser(builder)); // check that custom metadata still present assertThat(metadata.getProject().custom(watcherMetadata.getWriteableName()), notNullValue()); - assertThat(metadata.custom(repositoriesMetadata.getWriteableName()), notNullValue()); + assertThat(metadata.custom(nodesShutdownMetadata.getWriteableName()), notNullValue()); } private static WatcherMetadata getWatcherMetadataFromXContent(XContentParser parser) throws Exception { From ae4f55bb9b6a1411085b4e4ad0d9febb791d609c Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Sun, 23 Mar 2025 13:01:43 +1100 Subject: [PATCH 07/10] comments and renames --- .../elasticsearch/cluster/DiffableUtils.java | 12 +++- .../cluster/metadata/Metadata.java | 69 +++++++++++-------- 2 files changed, 53 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java b/server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java index cfa281bd96b76..f256d6013c970 100644 --- a/server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java +++ b/server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java @@ -159,7 +159,17 @@ public static > boolean hasKey(MapDiff diff, return false; } - public static > MapDiff updateDiffsAndUpsertsForKey( + /** + * Create a new MapDiff from the specified MapDiff by transforming its diffs with the provided diffUpdateFunction as well as + * transforming its upserts with the provided upsertUpdateFunction. Whether an entry should be transformed is determined by + * the specified keyPredicate. + * @param diff The original MapDiff + * @param keyPredicate Determines whether an entry should be transformed + * @param diffUpdateFunction A function to transform a Diff entry + * @param upsertUpdateFunction A function to transform an upsert entry + * @return A new MapDiff as a result of the transformation + */ + public static > MapDiff updateDiffsAndUpserts( MapDiff diff, Predicate keyPredicate, BiFunction, Diff> diffUpdateFunction, diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index 09f98efe53dc6..a0d2b9fefdeb8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -954,6 +954,9 @@ private MetadataDiff(StreamInput in) throws IOException { multiProject = null; } else { fromNodeBeforeMultiProjectsSupport = false; + // Repositories metadata is sent as cluster customs diff from old node. We need + // 1. Split it from the cluster customs diff + // 2. Merge it into the default project's ProjectMetadataDiff final var bwcCustoms = maybeReadBwcCustoms(in); clusterCustoms = bwcCustoms.v1(); final var defaultProjectCustoms = bwcCustoms.v2(); @@ -996,8 +999,10 @@ private static MapDiff { + return DiffableUtils.updateDiffsAndUpserts(multiProject, ProjectId.DEFAULT::equals, (k, v) -> { assert ProjectId.DEFAULT.equals(k) : k; assert v instanceof ProjectMetadata.ProjectMetadataDiff : v; final var projectMetadataDiff = (ProjectMetadata.ProjectMetadataDiff) v; @@ -1061,22 +1066,22 @@ public void writeTo(StreamOutput out) throws IOException { buildUnifiedCustomDiff().writeTo(out); buildUnifiedReservedStateMetadataDiff().writeTo(out); } else { - final var multiProjectForWrite = multiProject != null + final var multiProjectToWrite = multiProject != null ? multiProject : DiffableUtils.singleEntryDiff(DEFAULT_PROJECT_ID, singleProject, PROJECT_ID_SERIALIZER); if (out.getTransportVersion().before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM)) { - writeDiffBeforeRepositoriesMetadataMigration(out, clusterCustoms, multiProjectForWrite, reservedStateMetadata); + writeDiffWithRepositoriesMetadataAsClusterCustom(out, clusterCustoms, multiProjectToWrite, reservedStateMetadata); } else { clusterCustoms.writeTo(out); reservedStateMetadata.writeTo(out); - multiProjectForWrite.writeTo(out); + multiProjectToWrite.writeTo(out); } } } @SuppressWarnings({ "rawtypes", "unchecked" }) - private static void writeDiffBeforeRepositoriesMetadataMigration( + private static void writeDiffWithRepositoriesMetadataAsClusterCustom( StreamOutput out, MapDiff> clusterCustoms, MapDiff> multiProject, @@ -1085,9 +1090,13 @@ private static void writeDiffBeforeRepositoriesMetadataMigration( assert out.getTransportVersion().onOrAfter(TransportVersions.MULTI_PROJECT) && out.getTransportVersion().before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM) : out.getTransportVersion(); + // For old nodes, RepositoriesMetadata needs to be sent as a cluster custom. This is possible only if (a) the repositories + // are defined only for the default project or (b) no repositories at all. What we need to do is: + // 1. Iterate through the multi-project's MapDiff to extract the RepositoriesMetadata of the default project + // 2. Throws if any repositories are found for non-default projects + // 3. Merge default project's RepositoriesMetadata into cluster customs final var combineClustersCustoms = new SetOnce>>(); - - final var updatedMultiProject = DiffableUtils.updateDiffsAndUpsertsForKey(multiProject, ignore -> true, (k, v) -> { + final var updatedMultiProject = DiffableUtils.updateDiffsAndUpserts(multiProject, ignore -> true, (k, v) -> { assert v instanceof ProjectMetadata.ProjectMetadataDiff : v; final var projectMetadataDiff = (ProjectMetadata.ProjectMetadataDiff) v; final var bwcCustoms = DiffableUtils.split( @@ -1097,14 +1106,15 @@ private static void writeDiffBeforeRepositoriesMetadataMigration( type -> RepositoriesMetadata.TYPE.equals(type) == false, PROJECT_CUSTOM_VALUE_SERIALIZER ); + // Simply return if RepositoriesMetadata is not found if (bwcCustoms.v1().isEmpty()) { return projectMetadataDiff; } - + // RepositoriesMetadata can only be defined for the default project. Otherwise throw exception. if (ProjectId.DEFAULT.equals(k) == false) { throwForVersionBeforeRepositoriesMetadataMigration(out); } - + // RepositoriesMetadata is found for the default project as a diff, merge it into the cluster customs combineClustersCustoms.set( DiffableUtils.>merge( clusterCustoms, @@ -1115,14 +1125,15 @@ private static void writeDiffBeforeRepositoriesMetadataMigration( return projectMetadataDiff.withCustoms(bwcCustoms.v2()); }, (k, v) -> { final ProjectCustom projectCustom = v.customs().get(RepositoriesMetadata.TYPE); + // Simply return if RepositoriesMetadata is not found if (projectCustom == null) { return v; } - + // RepositoriesMetadata can only be defined for the default project. Otherwise throw exception. if (ProjectId.DEFAULT.equals(k) == false) { throwForVersionBeforeRepositoriesMetadataMigration(out); } - + // RepositoriesMetadata is found for the default project as an upsert, package it as MapDiff and merge with cluster customs combineClustersCustoms.set( DiffableUtils.>merge( clusterCustoms, @@ -1145,7 +1156,7 @@ private static void writeDiffBeforeRepositoriesMetadataMigration( private static void throwForVersionBeforeRepositoriesMetadataMigration(StreamOutput out) { throw new UnsupportedOperationException( - "Serialize a diff containing per-project repository requires version on or after [" + "Serialize a diff with repositories defined for multiple projects requires version on or after [" + TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM + "], but got [" + out.getTransportVersion() @@ -1304,13 +1315,14 @@ public static Metadata readFrom(StreamInput in) throws IOException { builder.put(ReservedStateMetadata.readFrom(in)); } } else { - List projectCustoms = List.of(); + List defaultProjectCustoms = List.of(); if (in.getTransportVersion().before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM)) { - projectCustoms = new ArrayList<>(); - readBwcCustoms(in, builder, projectCustoms::add); - assert projectCustoms.size() <= 1 - : "expect only a single custom for repository metadata, but got " - + projectCustoms.stream().map(ProjectCustom::getWriteableName).toList(); + // Extract the default project's repositories metadata from an old style cluster customs + defaultProjectCustoms = new ArrayList<>(); + readBwcCustoms(in, builder, defaultProjectCustoms::add); + assert defaultProjectCustoms.size() <= 1 + : "expect only a single default project custom for repository metadata, but got " + + defaultProjectCustoms.stream().map(ProjectCustom::getWriteableName).toList(); } else { readClusterCustoms(in, builder); } @@ -1321,12 +1333,7 @@ public static Metadata readFrom(StreamInput in) throws IOException { } builder.projectMetadata(in.readMap(ProjectId::readFrom, ProjectMetadata::readFrom)); - - if (projectCustoms.isEmpty() == false) { - projectCustoms.forEach( - projectCustom -> builder.getProject(ProjectId.DEFAULT).putCustom(projectCustom.getWriteableName(), projectCustom) - ); - } + defaultProjectCustoms.forEach(c -> builder.getProject(ProjectId.DEFAULT).putCustom(c.getWriteableName(), c)); } return builder.build(); } @@ -1421,7 +1428,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeCollection(combinedMetadata); } else { if (out.getTransportVersion().before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM)) { - if (isSingleProject() || noRepositoryExceptForDefaultProject(projects().values())) { + if (isSingleProject() || hasNoNonDefaultProjectRepositories(projects().values())) { + // Repositories metadata must be sent as cluster customs for old nodes final List combinedCustoms = new ArrayList<>(customs.size() + 1); combinedCustoms.addAll(customs.values()); final ProjectCustom custom = getProject(ProjectId.DEFAULT).custom(RepositoriesMetadata.TYPE); @@ -1431,7 +1439,11 @@ public void writeTo(StreamOutput out) throws IOException { VersionedNamedWriteable.writeVersionedWriteables(out, combinedCustoms); } else { throw new UnsupportedOperationException( - "Cannot serialize metadata with multiple projects to an output of version before repositories metadata migration" + "Serialize metadata with repositories defined for multiple projects requires version on or after [" + + TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM + + "], but got [" + + out.getTransportVersion() + + "]" ); } } else { @@ -1443,7 +1455,10 @@ public void writeTo(StreamOutput out) throws IOException { } } - private static boolean noRepositoryExceptForDefaultProject(Collection projects) { + /** + * @return {@code true} iff no repositories are defined for non-default-projects. + */ + private static boolean hasNoNonDefaultProjectRepositories(Collection projects) { return projects.stream() .allMatch(project -> ProjectId.DEFAULT.equals(project.id()) || project.custom(RepositoriesMetadata.TYPE) == null); } From 9ba0c417f3f3efd7663bd9094e1d305b5b18d03d Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Sun, 23 Mar 2025 14:13:26 +1100 Subject: [PATCH 08/10] toxcontent tests and fixes --- .../cluster/metadata/Metadata.java | 27 +++- .../cluster/metadata/MetadataTests.java | 138 +++++++++++++++++- 2 files changed, 159 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index a0d2b9fefdeb8..73091d19ca6c7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -954,7 +954,7 @@ private MetadataDiff(StreamInput in) throws IOException { multiProject = null; } else { fromNodeBeforeMultiProjectsSupport = false; - // Repositories metadata is sent as cluster customs diff from old node. We need + // Repositories metadata is sent as cluster customs diff from old node. We need to // 1. Split it from the cluster customs diff // 2. Merge it into the default project's ProjectMetadataDiff final var bwcCustoms = maybeReadBwcCustoms(in); @@ -1090,8 +1090,8 @@ private static void writeDiffWithRepositoriesMetadataAsClusterCustom( assert out.getTransportVersion().onOrAfter(TransportVersions.MULTI_PROJECT) && out.getTransportVersion().before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM) : out.getTransportVersion(); - // For old nodes, RepositoriesMetadata needs to be sent as a cluster custom. This is possible only if (a) the repositories - // are defined only for the default project or (b) no repositories at all. What we need to do is: + // For old nodes, RepositoriesMetadata needs to be sent as a cluster custom. This is possible when (a) the repositories + // are defined only for the default project or (b) no repositories at all. What we need to do are: // 1. Iterate through the multi-project's MapDiff to extract the RepositoriesMetadata of the default project // 2. Throws if any repositories are found for non-default projects // 3. Merge default project's RepositoriesMetadata into cluster customs @@ -1961,7 +1961,10 @@ public static Metadata fromXContent(XContentParser parser) throws IOException { currentFieldName = parser.currentName(); } else if (token == XContentParser.Token.START_ARRAY) { switch (currentFieldName) { - case "projects" -> readProjects(parser, builder); + case "projects" -> { + assert builder.projectMetadata.isEmpty() : "expect empty projectMetadata, but got " + builder.projectMetadata; + readProjects(parser, builder); + } default -> throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]"); } } else if (token == XContentParser.Token.START_OBJECT) { @@ -2003,7 +2006,21 @@ public static Metadata fromXContent(XContentParser parser) throws IOException { builder.putProjectCustom(PersistentTasksCustomMetadata.TYPE, tuple.v2()); builder.putCustom(ClusterPersistentTasksCustomMetadata.TYPE, tuple.v1()); } else { - builder.putProjectCustom(name, projectCustom); + if (projectCustom instanceof RepositoriesMetadata repositoriesMetadata) { + // Repositories at the top level means it is either + // 1. Serialization from a single project for which we need to create the default project + // 2. Serialization from repositories metadata migration. In this case, the metadata may + // contain multiple projects, including the default project, which should be deserialized + // already with readProjects. + final ProjectMetadata.Builder defaultProjectBuilder = builder.getProject(ProjectId.DEFAULT); + if (defaultProjectBuilder == null) { + builder.putProjectCustom(name, projectCustom); + } else { + defaultProjectBuilder.putCustom(RepositoriesMetadata.TYPE, repositoriesMetadata); + } + } else { + builder.putProjectCustom(name, projectCustom); + } } }); } else { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java index 9fba366b2a23a..6402111e331fc 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java @@ -812,6 +812,17 @@ public void testParseXContentFormatBeforeMultiProject() throws IOException { } } }, + "repositories": { + "my-repo": { + "type": "fs", + "uuid": "_my-repo-uuid_", + "settings": { + "location": "backup" + }, + "generation": 42, + "pending_generation": 42 + } + }, "reserved_state":{ } } } @@ -830,7 +841,7 @@ public void testParseXContentFormatBeforeMultiProject() throws IOException { ); assertThat( metadata.getProject().customs().keySet(), - containsInAnyOrder("persistent_tasks", "index-graveyard", "component_template") + containsInAnyOrder("persistent_tasks", "index-graveyard", "component_template", "repositories") ); final var projectTasks = PersistentTasksCustomMetadata.get(metadata.getProject()); assertThat( @@ -838,6 +849,105 @@ public void testParseXContentFormatBeforeMultiProject() throws IOException { containsInAnyOrder("upgrade-system-indices") ); assertThat(clusterTasks.getLastAllocationId(), equalTo(projectTasks.getLastAllocationId())); + assertThat(metadata.customs(), not(hasKey("repositories"))); + final var repositoriesMetadata = RepositoriesMetadata.get(metadata.getProject(ProjectId.DEFAULT)); + assertThat( + repositoriesMetadata.repositories(), + equalTo( + List.of( + new RepositoryMetadata("my-repo", "_my-repo-uuid_", "fs", Settings.builder().put("location", "backup").build(), 42, 42) + ) + ) + ); + } + + public void testParseXContentFormatBeforeRepositoriesMetadataMigration() throws IOException { + final String json = org.elasticsearch.core.Strings.format(""" + { + "meta-data": { + "version": 54321, + "cluster_uuid":"aba1aa1ababbbaabaabaab", + "cluster_uuid_committed":false, + "cluster_coordination":{ + "term":1, + "last_committed_config":[], + "last_accepted_config":[], + "voting_config_exclusions":[] + }, + "projects" : [ + { + "id" : "default", + "templates" : { + "template" : { + "order" : 0, + "index_patterns" : [ + "pattern1", + "pattern2" + ], + "mappings" : { + "key1" : { } + }, + "aliases" : { } + } + }, + "index-graveyard" : { + "tombstones" : [ ] + }, + "reserved_state" : { } + }, + { + "id" : "another_project", + "templates" : { + "template" : { + "order" : 0, + "index_patterns" : [ + "pattern1", + "pattern2" + ], + "mappings" : { + "key1" : { } + }, + "aliases" : { } + } + }, + "index-graveyard" : { + "tombstones" : [ ] + }, + "reserved_state" : { } + } + ], + "repositories": { + "my-repo": { + "type": "fs", + "uuid": "_my-repo-uuid_", + "settings": { + "location": "backup" + }, + "generation": 42, + "pending_generation": 42 + } + }, + "reserved_state":{ } + } + } + """, IndexVersion.current(), IndexVersion.current()); + + final Metadata metadata = fromJsonXContentStringWithPersistentTasks(json); + assertThat(metadata, notNullValue()); + assertThat(metadata.clusterUUID(), is("aba1aa1ababbbaabaabaab")); + + assertThat(metadata.projects().keySet(), containsInAnyOrder(ProjectId.fromId("default"), ProjectId.fromId("another_project"))); + assertThat(metadata.customs(), not(hasKey("repositories"))); + final var repositoriesMetadata = RepositoriesMetadata.get(metadata.getProject(ProjectId.DEFAULT)); + assertThat( + repositoriesMetadata.repositories(), + equalTo( + List.of( + new RepositoryMetadata("my-repo", "_my-repo-uuid_", "fs", Settings.builder().put("location", "backup").build(), 42, 42) + ) + ) + ); + assertThat(metadata.getProject(ProjectId.fromId("another_project")).customs(), not(hasKey("repositories"))); } private Metadata fromJsonXContentStringWithPersistentTasks(String json) throws IOException { @@ -2615,6 +2725,19 @@ public void testMultiProjectXContent() throws IOException { ) ) ) + .putCustom( + RepositoriesMetadata.TYPE, + new RepositoriesMetadata( + List.of( + new RepositoryMetadata( + "backup", + "uuid-" + project.id().id(), + "fs", + Settings.builder().put("location", project.id().id()).build() + ) + ) + ) + ) .build() ) .toList(); @@ -2677,6 +2800,19 @@ public void testMultiProjectXContent() throws IOException { final var projectTasks = PersistentTasksCustomMetadata.get(project); assertThat(projectTasks.getLastAllocationId(), equalTo(lastAllocationId)); assertThat(projectTasks.taskMap().keySet(), equalTo(Set.of(SystemIndexMigrationTaskParams.SYSTEM_INDEX_UPGRADE_TASK_NAME))); + assertThat( + RepositoriesMetadata.get(project).repositories(), + equalTo( + List.of( + new RepositoryMetadata( + "backup", + "uuid-" + project.id().id(), + "fs", + Settings.builder().put("location", project.id().id()).build() + ) + ) + ) + ); } final var clusterTasks = ClusterPersistentTasksCustomMetadata.get(fromXContentMeta); assertThat(clusterTasks.getLastAllocationId(), equalTo(lastAllocationId + 1)); From ce1b0a41ffc1c18767078717e30226fe6c5804a1 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Sun, 23 Mar 2025 19:51:01 +1100 Subject: [PATCH 09/10] bwc tests with version before multi-project --- .../MetadataRepositoriesMetadataTests.java | 65 ++++++++++++++----- 1 file changed, 49 insertions(+), 16 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataRepositoriesMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataRepositoriesMetadataTests.java index a83337cd02ba9..bfffd99efb338 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataRepositoriesMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataRepositoriesMetadataTests.java @@ -96,14 +96,26 @@ public void testRepositoriesMetadataDiffSerialization() throws IOException { } public void testRepositoriesMetadataSerializationBwc() throws IOException { - final Metadata orig = randomMetadata(between(0, 5), -1); + { + final var oldVersion = TransportVersionUtils.randomVersionBetween( + random(), + TransportVersions.MULTI_PROJECT, + TransportVersionUtils.getPreviousVersion(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM) + ); + final Metadata orig = randomMetadata(between(0, 5), -1); + doTestRepositoriesMetadataSerializationBwc(orig, oldVersion); + } + + { + final var oldVersion = TransportVersionUtils.getPreviousVersion(TransportVersions.MULTI_PROJECT); + // Before multi-project, BWC is possible for a single project + final Metadata orig = randomMetadata(0, -1); + doTestRepositoriesMetadataSerializationBwc(orig, oldVersion); + } + } + private void doTestRepositoriesMetadataSerializationBwc(Metadata orig, TransportVersion oldVersion) throws IOException { final BytesStreamOutput out = new BytesStreamOutput(); - final var oldVersion = TransportVersionUtils.randomVersionBetween( - random(), - TransportVersions.MULTI_PROJECT, - TransportVersionUtils.getPreviousVersion(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM) - ); out.setTransportVersion(oldVersion); orig.writeTo(out); @@ -114,10 +126,11 @@ public void testRepositoriesMetadataSerializationBwc() throws IOException { assertTrue(Metadata.isGlobalStateEquals(orig, fromStream)); // Simulate new-node writes to old-stream and old-node reads from old-stream - simulateReadOnOldNode(out.bytes(), oldVersion, orig); + simulateReadOnOldNodeAndAssert(out.bytes(), oldVersion, orig); } - private void simulateReadOnOldNode(BytesReference bytesReference, TransportVersion oldVersion, Metadata orig) throws IOException { + private void simulateReadOnOldNodeAndAssert(BytesReference bytesReference, TransportVersion oldVersion, Metadata orig) + throws IOException { final var in = new NamedWriteableAwareStreamInput(bytesReference.streamInput(), namedWriteableRegistryBwc); in.setTransportVersion(oldVersion); final Metadata fromStream = Metadata.readFrom(in); @@ -125,17 +138,32 @@ private void simulateReadOnOldNode(BytesReference bytesReference, TransportVersi } public void testRepositoriesMetadataDiffSerializationBwc() throws IOException { - final Tuple tuple = randomMetadataAndUpdate(between(0, 5), -1); + { + final var oldVersion = TransportVersionUtils.randomVersionBetween( + random(), + TransportVersions.MULTI_PROJECT, + TransportVersionUtils.getPreviousVersion(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM) + ); + final Tuple tuple = randomMetadataAndUpdate(between(0, 5), -1); + doTestRepositoriesMetadataDiffSerializationBwc(tuple, oldVersion); + } + + { + final var oldVersion = TransportVersionUtils.getPreviousVersion(TransportVersions.MULTI_PROJECT); + // Before multi-project, BWC is possible for a single project + final Tuple tuple = randomMetadataAndUpdate(0, -1); + doTestRepositoriesMetadataDiffSerializationBwc(tuple, oldVersion); + } + } + + private void doTestRepositoriesMetadataDiffSerializationBwc(Tuple tuple, TransportVersion oldVersion) + throws IOException { final Metadata before = tuple.v1(); final Metadata after = tuple.v2(); final Diff diff = after.diff(before); final BytesStreamOutput out = new BytesStreamOutput(); - final var oldVersion = TransportVersionUtils.randomVersionBetween( - random(), - TransportVersions.MULTI_PROJECT, - TransportVersionUtils.getPreviousVersion(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM) - ); + out.setTransportVersion(oldVersion); diff.writeTo(out); @@ -147,11 +175,11 @@ public void testRepositoriesMetadataDiffSerializationBwc() throws IOException { assertTrue(Metadata.isGlobalStateEquals(after, metadataFromDiff)); // Simulate new-node writes diff to old-stream and old-node reads from old-stream - simulateReadAndApplyDiffOnOldNode(out.bytes(), oldVersion, before, after); + simulateReadAndApplyDiffOnOldNodeAndAssert(out.bytes(), oldVersion, before, after); } // Simulate the deserialization and application of the diff on an old node - private void simulateReadAndApplyDiffOnOldNode( + private void simulateReadAndApplyDiffOnOldNodeAndAssert( BytesReference bytesReference, TransportVersion oldVersion, Metadata before, @@ -176,6 +204,11 @@ private void simulateReadAndApplyDiffOnOldNode( assertMetadataBwcEquals(metadataFromDiff, after); } + /** + * Check equality of the two metadata by handling the different types of RepositoriesMetadata + * @param metadataBwc The old style metadata with RepositoriesMetadata as Metadata#ClusterCustom + * @param metadataNew The new style metadata with RepositoriesMetadata as Metadata#ProjectCustom + */ private void assertMetadataBwcEquals(Metadata metadataBwc, Metadata metadataNew) { // BWC metadata has no repositories in project assertThat(metadataBwc.getProject(ProjectId.DEFAULT).custom(RepositoriesMetadata.TYPE), nullValue()); From 2d0b90a8c6a922ca371181be948b5764b77cb7d3 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 24 Mar 2025 09:21:37 +1100 Subject: [PATCH 10/10] rephrase --- .../cluster/metadata/Metadata.java | 19 ++++++++++--------- .../cluster/metadata/ProjectMetadata.java | 2 ++ 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index 73091d19ca6c7..3155604088fae 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -954,8 +954,8 @@ private MetadataDiff(StreamInput in) throws IOException { multiProject = null; } else { fromNodeBeforeMultiProjectsSupport = false; - // Repositories metadata is sent as cluster customs diff from old node. We need to - // 1. Split it from the cluster customs diff + // Repositories metadata is sent as Metadata#customs diff from old node. We need to + // 1. Split it from the Metadata#customs diff // 2. Merge it into the default project's ProjectMetadataDiff final var bwcCustoms = maybeReadBwcCustoms(in); clusterCustoms = bwcCustoms.v1(); @@ -1094,7 +1094,7 @@ private static void writeDiffWithRepositoriesMetadataAsClusterCustom( // are defined only for the default project or (b) no repositories at all. What we need to do are: // 1. Iterate through the multi-project's MapDiff to extract the RepositoriesMetadata of the default project // 2. Throws if any repositories are found for non-default projects - // 3. Merge default project's RepositoriesMetadata into cluster customs + // 3. Merge default project's RepositoriesMetadata into Metadata#customs final var combineClustersCustoms = new SetOnce>>(); final var updatedMultiProject = DiffableUtils.updateDiffsAndUpserts(multiProject, ignore -> true, (k, v) -> { assert v instanceof ProjectMetadata.ProjectMetadataDiff : v; @@ -1114,7 +1114,7 @@ private static void writeDiffWithRepositoriesMetadataAsClusterCustom( if (ProjectId.DEFAULT.equals(k) == false) { throwForVersionBeforeRepositoriesMetadataMigration(out); } - // RepositoriesMetadata is found for the default project as a diff, merge it into the cluster customs + // RepositoriesMetadata is found for the default project as a diff, merge it into the Metadata#customs combineClustersCustoms.set( DiffableUtils.>merge( clusterCustoms, @@ -1133,7 +1133,7 @@ private static void writeDiffWithRepositoriesMetadataAsClusterCustom( if (ProjectId.DEFAULT.equals(k) == false) { throwForVersionBeforeRepositoriesMetadataMigration(out); } - // RepositoriesMetadata is found for the default project as an upsert, package it as MapDiff and merge with cluster customs + // RepositoriesMetadata found for the default project as an upsert, package it as MapDiff and merge into Metadata#customs combineClustersCustoms.set( DiffableUtils.>merge( clusterCustoms, @@ -1155,6 +1155,7 @@ private static void writeDiffWithRepositoriesMetadataAsClusterCustom( } private static void throwForVersionBeforeRepositoriesMetadataMigration(StreamOutput out) { + assert out.getTransportVersion().before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM) : out.getTransportVersion(); throw new UnsupportedOperationException( "Serialize a diff with repositories defined for multiple projects requires version on or after [" + TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM @@ -1317,7 +1318,7 @@ public static Metadata readFrom(StreamInput in) throws IOException { } else { List defaultProjectCustoms = List.of(); if (in.getTransportVersion().before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM)) { - // Extract the default project's repositories metadata from an old style cluster customs + // Extract the default project's repositories metadata from the Metadata#customs from an old node defaultProjectCustoms = new ArrayList<>(); readBwcCustoms(in, builder, defaultProjectCustoms::add); assert defaultProjectCustoms.size() <= 1 @@ -1429,7 +1430,7 @@ public void writeTo(StreamOutput out) throws IOException { } else { if (out.getTransportVersion().before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM)) { if (isSingleProject() || hasNoNonDefaultProjectRepositories(projects().values())) { - // Repositories metadata must be sent as cluster customs for old nodes + // Repositories metadata must be sent as Metadata#customs for old nodes final List combinedCustoms = new ArrayList<>(customs.size() + 1); combinedCustoms.addAll(customs.values()); final ProjectCustom custom = getProject(ProjectId.DEFAULT).custom(RepositoriesMetadata.TYPE); @@ -2009,9 +2010,9 @@ public static Metadata fromXContent(XContentParser parser) throws IOException { if (projectCustom instanceof RepositoriesMetadata repositoriesMetadata) { // Repositories at the top level means it is either // 1. Serialization from a single project for which we need to create the default project - // 2. Serialization from repositories metadata migration. In this case, the metadata may + // 2. Serialization before repositories metadata migration. In this case, the metadata may // contain multiple projects, including the default project, which should be deserialized - // already with readProjects. + // already with readProjects, i.e. no need to create the default project. final ProjectMetadata.Builder defaultProjectBuilder = builder.getProject(ProjectId.DEFAULT); if (defaultProjectBuilder == null) { builder.putProjectCustom(name, projectCustom); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java index 38029dbfdb8b7..4a7bc9a1c012d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java @@ -2204,6 +2204,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeCollection(templates.values()); Collection filteredCustoms = customs.values(); if (out.getTransportVersion().before(TransportVersions.REPOSITORIES_METADATA_AS_PROJECT_CUSTOM)) { + // RepositoriesMetadata is sent as part of Metadata#customs for version before RepositoriesMetadata migration + // So we exclude it from the project level customs if (custom(RepositoriesMetadata.TYPE) != null) { assert ProjectId.DEFAULT.equals(id) : "Only default project can have repositories metadata. Otherwise the code should have thrown before it reaches here";