diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java index 56002554cb140..9daa5c24f3bd4 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java @@ -145,7 +145,8 @@ public void setUp() throws Exception { for (int i = 1; i <= numNodes; i++) { String id = "node" + i; nb.add(Allocators.newNode(id, Collections.singletonMap("tag", "tag_" + (i % numTags)))); - compatibilityVersions.put(id, new CompatibilityVersions(TransportVersion.current())); + // system index mappings versions not needed here, so we use Map.of() + compatibilityVersions.put(id, new CompatibilityVersions(TransportVersion.current(), Map.of())); } initialClusterState = ClusterState.builder(ClusterName.DEFAULT) .metadata(metadata) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java index f561cc50b4f19..373213be479a7 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.version.CompatibilityVersions; +import org.elasticsearch.cluster.version.CompatibilityVersionsUtils; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -238,7 +239,7 @@ private ClusterState.Builder randomNodes(ClusterState clusterState) { versions.remove(nodeId); if (randomBoolean()) { nodes.add(randomNode(nodeId)); - versions.put(nodeId, new CompatibilityVersions(TransportVersionUtils.randomVersion(random()))); + versions.put(nodeId, CompatibilityVersionsUtils.fakeSystemIndicesRandom()); } } } @@ -246,7 +247,7 @@ private ClusterState.Builder randomNodes(ClusterState clusterState) { for (int i = 0; i < additionalNodeCount; i++) { String id = "node-" + randomAlphaOfLength(10); nodes.add(randomNode(id)); - versions.put(id, new CompatibilityVersions(TransportVersionUtils.randomVersion(random()))); + versions.put(id, CompatibilityVersionsUtils.fakeSystemIndicesRandom()); } return ClusterState.builder(clusterState).nodes(nodes).compatibilityVersions(versions); diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 6fb02a3ca05d2..3bc2bd7ce8499 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -136,6 +136,7 @@ static TransportVersion def(int id, String uniqueId) { public static final TransportVersion V_8_500_070 = def(8_500_070, "6BADC9CD-3C9D-4381-8BD9-B305CAA93F86"); public static final TransportVersion V_8_500_071 = def(8_500_071, "a86dfc08-3026-4f01-90ef-6d6de003e217"); public static final TransportVersion V_8_500_072 = def(8_500_072, "e2df7d80-7b74-4afd-9734-aee0fc256025"); + public static final TransportVersion V_8_500_073 = def(8_500_073, "9128e16a-e4f7-41c4-b04f-842955bfc1b4"); /* * STOP! READ THIS FIRST! No, really, * ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _ diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index 3f9e7e4d8d9ae..603a93ab11f79 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -45,6 +45,7 @@ import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; import org.elasticsearch.core.Nullable; +import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContent; @@ -133,12 +134,12 @@ default boolean isPrivate() { new DiffableUtils.NonDiffableValueSerializer<>() { @Override public void write(CompatibilityVersions value, StreamOutput out) throws IOException { - TransportVersion.writeVersion(value.transportVersion(), out); + value.writeTo(out); } @Override public CompatibilityVersions read(StreamInput in, String key) throws IOException { - return new CompatibilityVersions(TransportVersion.readVersion(in)); + return CompatibilityVersions.readVersion(in); } }; @@ -222,8 +223,8 @@ public ClusterState( this.routingNodes = routingNodes; assert assertConsistentRoutingNodes(routingTable, nodes, routingNodes); this.minVersions = blocks.hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) - ? new CompatibilityVersions(TransportVersions.MINIMUM_COMPATIBLE) - : CompatibilityVersions.minimumVersions(compatibilityVersions); + ? new CompatibilityVersions(TransportVersions.MINIMUM_COMPATIBLE, Map.of()) // empty map because cluster state is unknown + : CompatibilityVersions.minimumVersions(compatibilityVersions.values()); } private static boolean assertConsistentRoutingNodes( @@ -287,6 +288,10 @@ public TransportVersion getMinTransportVersion() { return this.minVersions.transportVersion(); } + public Map getMinSystemIndexMappingVersions() { + return this.minVersions.systemIndexMappingsVersion(); + } + public Metadata metadata() { return this.metadata; } @@ -773,7 +778,8 @@ public DiscoveryNodes nodes() { } public Builder putTransportVersion(String nodeId, TransportVersion transportVersion) { - compatibilityVersions.put(nodeId, new CompatibilityVersions(Objects.requireNonNull(transportVersion, nodeId))); + // TODO[wrb]: system index mappings versions will be added in a followup + compatibilityVersions.put(nodeId, new CompatibilityVersions(Objects.requireNonNull(transportVersion, nodeId), Map.of())); return this; } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java index 3d70fca6723af..d8958f75c7aa5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java @@ -16,6 +16,7 @@ import org.elasticsearch.transport.TransportRequest; import java.io.IOException; +import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -66,7 +67,8 @@ public JoinRequest(StreamInput in) throws IOException { compatibilityVersions = CompatibilityVersions.readVersion(in); } else { // there's a 1-1 mapping from Version to TransportVersion before 8.8.0 - compatibilityVersions = new CompatibilityVersions(TransportVersion.fromId(sourceNode.getVersion().id)); + // no known mapping versions here + compatibilityVersions = new CompatibilityVersions(TransportVersion.fromId(sourceNode.getVersion().id), Map.of()); } minimumTerm = in.readLong(); optionalJoin = Optional.ofNullable(in.readOptionalWriteable(Join::new)); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/NodeJoinExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/NodeJoinExecutor.java index 55cf6ea8a398d..dd52f20c7355a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/NodeJoinExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/NodeJoinExecutor.java @@ -143,7 +143,7 @@ public ClusterState execute(BatchExecutionContext batchExecutionContex CompatibilityVersions compatibilityVersions = nodeJoinTask.compatibilityVersions(); if (enforceVersionBarrier) { ensureVersionBarrier(node.getVersion(), minClusterNodeVersion); - ensureTransportVersionBarrier(compatibilityVersions, compatibilityVersionsMap.values()); + CompatibilityVersions.ensureVersionsCompatibility(compatibilityVersions, compatibilityVersionsMap.values()); } blockForbiddenVersions(compatibilityVersions.transportVersion()); ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion); diff --git a/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java b/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java index 5e7692e645d6a..a9c5298a4325e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java +++ b/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java @@ -13,24 +13,34 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Collection; import java.util.Comparator; +import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Wraps component version numbers for cluster state * *

Cluster state will need to carry version information for different independently versioned components. - * This wrapper lets us wrap these versions one level below {@link org.elasticsearch.cluster.ClusterState}. - * It's similar to {@link org.elasticsearch.cluster.node.VersionInformation}, but this class is meant to - * be constructed during node startup and hold values from plugins as well. + * This wrapper lets us wrap these versions one level below {@link org.elasticsearch.cluster.ClusterState}. It's similar to + * {@link org.elasticsearch.cluster.node.VersionInformation}, but this class is meant to be constructed during node startup and hold values + * from plugins as well. * - * @param transportVersion A transport version, usually a minimum compatible one for a node. + * @param transportVersion A transport version, usually a minimum compatible one for a node. + * @param systemIndexMappingsVersion A map of system index names to versions for their mappings. */ -public record CompatibilityVersions(TransportVersion transportVersion) implements Writeable, ToXContentFragment { +public record CompatibilityVersions( + TransportVersion transportVersion, + Map systemIndexMappingsVersion +) implements Writeable, ToXContentFragment { /** * Constructs a VersionWrapper collecting all the minimum versions from the values of the map. @@ -38,24 +48,73 @@ public record CompatibilityVersions(TransportVersion transportVersion) implement * @param compatibilityVersions A map of strings (typically node identifiers) and versions wrappers * @return Minimum versions for the cluster */ - public static CompatibilityVersions minimumVersions(Map compatibilityVersions) { - return new CompatibilityVersions( - compatibilityVersions.values() - .stream() - .map(CompatibilityVersions::transportVersion) - .min(Comparator.naturalOrder()) - // In practice transportVersions is always nonempty (except in tests) but use a conservative default anyway: - .orElse(TransportVersions.MINIMUM_COMPATIBLE) - ); + public static CompatibilityVersions minimumVersions(Collection compatibilityVersions) { + TransportVersion minimumTransport = compatibilityVersions.stream() + .map(CompatibilityVersions::transportVersion) + .min(Comparator.naturalOrder()) + // In practice transportVersions is always nonempty (except in tests) but use a conservative default anyway: + .orElse(TransportVersions.MINIMUM_COMPATIBLE); + + Map minimumMappingsVersions = compatibilityVersions.stream() + .flatMap(mv -> mv.systemIndexMappingsVersion().entrySet().stream()) + .collect( + Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (v1, v2) -> Stream.of(v1, v2).min(Comparator.naturalOrder()).get()) + ); + + return new CompatibilityVersions(minimumTransport, minimumMappingsVersions); + } + + public static void ensureVersionsCompatibility(CompatibilityVersions candidate, Collection existing) { + CompatibilityVersions minimumClusterVersions = minimumVersions(existing); + + if (candidate.transportVersion().before(minimumClusterVersions.transportVersion())) { + throw new IllegalStateException( + "node with transport version [" + + candidate.transportVersion() + + "] may not join a cluster with minimum transport version [" + + minimumClusterVersions.transportVersion() + + "]" + ); + } + + Map candidateInvalid = new HashMap<>(); + Map existingInvalid = new HashMap<>(); + for (Map.Entry candidates : candidate.systemIndexMappingsVersion().entrySet()) { + var mapping = minimumClusterVersions.systemIndexMappingsVersion().get(candidates.getKey()); + if (Objects.nonNull(mapping) && mapping.version() > candidates.getValue().version()) { + candidateInvalid.put(candidates.getKey(), candidates.getValue()); + existingInvalid.put(candidates.getKey(), minimumClusterVersions.systemIndexMappingsVersion().get(candidates.getKey())); + } + } + if (candidateInvalid.isEmpty() == false) { + throw new IllegalStateException( + "node with system index mappings versions [" + + candidateInvalid + + "] may not join a cluster with minimum system index mappings versions [" + + existingInvalid + + "]" + ); + } } public static CompatibilityVersions readVersion(StreamInput in) throws IOException { - return new CompatibilityVersions(TransportVersion.readVersion(in)); + TransportVersion transportVersion = TransportVersion.readVersion(in); + + Map mappingsVersions = Map.of(); + if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_500_073)) { + mappingsVersions = in.readMap(SystemIndexDescriptor.MappingsVersion::new); + } + + return new CompatibilityVersions(transportVersion, mappingsVersions); } @Override public void writeTo(StreamOutput out) throws IOException { TransportVersion.writeVersion(this.transportVersion(), out); + + if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_500_073)) { + out.writeMap(this.systemIndexMappingsVersion(), (o, v) -> v.writeTo(o)); + } } /** @@ -69,6 +128,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("transport_version", this.transportVersion().toString()); + builder.field("mappings_versions", this.systemIndexMappingsVersion); return builder; } } diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java b/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java index 4d37cea8bcf98..d6441a2920f43 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java @@ -18,15 +18,20 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.SystemIndexMetadataUpgradeService; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lucene.RegExp; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -649,7 +654,33 @@ public boolean isInternal() { * The hash is a hash of the system index descriptor's mappings so that we can warn * in case of inconsistencies across nodes. */ - public record MappingsVersion(int version, int hash) {}; + public record MappingsVersion(int version, int hash) implements Writeable, ToXContent, Comparable { + + public MappingsVersion(StreamInput in) throws IOException { + this(in.readVInt(), in.readInt()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(version); + out.writeInt(hash); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("version", version); + builder.field("hash", hash); + builder.endObject(); + return builder; + } + + @Override + public int compareTo(MappingsVersion o) { + Objects.requireNonNull(o, "Cannot compare null MappingsVersion"); + return Integer.compare(this.version, o.version); + } + } /** * Provides a fluent API for building a {@link SystemIndexDescriptor}. Validation still happens in that class. diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 8a6cb1f82bc05..111dc5ec72165 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -622,7 +622,10 @@ protected Node( resourcesToClose.add(circuitBreakerService); modules.add(new GatewayModule()); - CompatibilityVersions compatibilityVersions = new CompatibilityVersions(TransportVersion.current()); + CompatibilityVersions compatibilityVersions = new CompatibilityVersions( + TransportVersion.current(), + systemIndices.getMappingsVersions() + ); PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings); BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService); modules.add(settingsModule); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java index 80ff2168f6344..b5ab63140e433 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java @@ -129,7 +129,8 @@ public void testToXContentWithDeprecatedClusterState() { "nodes_versions": [ { "node_id": "node0", - "transport_version": "8000099" + "transport_version": "8000099", + "mappings_versions": {} } ], "metadata": { diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java index b96a362b2a867..243d30ccf811f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.version.CompatibilityVersions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Iterators; @@ -44,6 +45,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TransportVersionUtils; @@ -211,7 +213,13 @@ public void testToXContent() throws IOException { "nodes_versions" : [ { "node_id" : "nodeId1", - "transport_version" : "%s" + "transport_version" : "%s", + "mappings_versions" : { + ".tasks" : { + "version" : 1, + "hash" : 1 + } + } } ], "metadata": { @@ -466,7 +474,13 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti "nodes_versions" : [ { "node_id" : "nodeId1", - "transport_version" : "%s" + "transport_version" : "%s", + "mappings_versions" : { + ".tasks" : { + "version" : 1, + "hash" : 1 + } + } } ], "metadata" : { @@ -717,7 +731,13 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti "nodes_versions" : [ { "node_id" : "nodeId1", - "transport_version" : "%s" + "transport_version" : "%s", + "mappings_versions" : { + ".tasks" : { + "version" : 1, + "hash" : 1 + } + } } ], "metadata" : { @@ -1029,7 +1049,12 @@ private ClusterState buildClusterState() throws IOException { .add(DiscoveryNodeUtils.create("nodeId1", new TransportAddress(InetAddress.getByName("127.0.0.1"), 111))) .build() ) - .putTransportVersion("nodeId1", TransportVersion.current()) + .compatibilityVersions( + Map.of( + "nodeId1", + new CompatibilityVersions(TransportVersion.current(), Map.of(".tasks", new SystemIndexDescriptor.MappingsVersion(1, 1))) + ) + ) .blocks( ClusterBlocks.builder() .addGlobalBlock( diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java index 676914ec0bed2..f779d5ea56dfa 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java @@ -11,12 +11,14 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.version.CompatibilityVersions; +import org.elasticsearch.cluster.version.CompatibilityVersionsUtils; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.EqualsHashCodeTestUtils; import org.elasticsearch.test.EqualsHashCodeTestUtils.CopyFunction; import org.elasticsearch.test.TransportVersionUtils; +import java.util.Map; import java.util.Optional; import java.util.Set; @@ -243,7 +245,7 @@ public void testJoinRequestEqualsHashCodeSerialization() { ); JoinRequest initialJoinRequest = new JoinRequest( initialJoin.getSourceNode(), - new CompatibilityVersions(TransportVersionUtils.randomVersion()), + CompatibilityVersionsUtils.fakeSystemIndicesRandom(), randomNonNegativeLong(), randomBoolean() ? Optional.empty() : Optional.of(initialJoin) ); @@ -263,7 +265,8 @@ public void testJoinRequestEqualsHashCodeSerialization() { return new JoinRequest( joinRequest.getSourceNode(), new CompatibilityVersions( - TransportVersionUtils.randomVersion(Set.of(joinRequest.getCompatibilityVersions().transportVersion())) + TransportVersionUtils.randomVersion(Set.of(joinRequest.getCompatibilityVersions().transportVersion())), + Map.of() ), joinRequest.getMinimumTerm(), joinRequest.getOptionalJoin() diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java index 4807d5ee984ca..19aa035817a01 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java @@ -8,7 +8,6 @@ package org.elasticsearch.cluster.coordination; import org.apache.logging.log4j.Level; -import org.elasticsearch.TransportVersion; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; @@ -28,7 +27,6 @@ import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils; -import org.elasticsearch.cluster.version.CompatibilityVersions; import org.elasticsearch.cluster.version.CompatibilityVersionsUtils; import org.elasticsearch.common.Priority; import org.elasticsearch.common.ReferenceDocs; @@ -38,7 +36,6 @@ import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; -import org.elasticsearch.test.TransportVersionUtils; import org.elasticsearch.test.index.IndexVersionUtils; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -48,7 +45,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.stream.IntStream; import java.util.stream.Stream; import static org.elasticsearch.cluster.metadata.DesiredNodesTestCase.assertDesiredNodesStatusIsCorrect; @@ -157,33 +153,6 @@ public void testPreventJoinClusterWithUnsupportedNodeVersions() { } } - public void testPreventJoinClusterWithUnsupportedTransportVersion() { - List versions = IntStream.range(0, randomIntBetween(2, 10)) - .mapToObj(i -> TransportVersionUtils.randomCompatibleVersion(random())) - .toList(); - TransportVersion min = Collections.min(versions); - List compatibilityVersions = versions.stream().map(CompatibilityVersions::new).toList(); - - // should not throw - NodeJoinExecutor.ensureTransportVersionBarrier( - new CompatibilityVersions(TransportVersionUtils.randomVersionBetween(random(), min, TransportVersion.current())), - compatibilityVersions - ); - expectThrows( - IllegalStateException.class, - () -> NodeJoinExecutor.ensureTransportVersionBarrier( - new CompatibilityVersions( - TransportVersionUtils.randomVersionBetween( - random(), - TransportVersionUtils.getFirstVersion(), - TransportVersionUtils.getPreviousVersion(min) - ) - ), - compatibilityVersions - ) - ); - } - public void testSuccess() { Settings.builder().build(); Metadata.Builder metaBuilder = Metadata.builder(); diff --git a/server/src/test/java/org/elasticsearch/cluster/service/TransportVersionsFixupListenerTests.java b/server/src/test/java/org/elasticsearch/cluster/service/TransportVersionsFixupListenerTests.java index 3d488b6d55bff..f213d7e366ce4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/TransportVersionsFixupListenerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/TransportVersionsFixupListenerTests.java @@ -112,7 +112,7 @@ public void testNothingFixedWhenNothingToInfer() { ClusterState testState = ClusterState.builder(ClusterState.EMPTY_STATE) .nodes(node(Version.V_8_8_0)) - .compatibilityVersions(versions(new CompatibilityVersions(TransportVersions.V_8_8_0))) + .compatibilityVersions(versions(new CompatibilityVersions(TransportVersions.V_8_8_0, Map.of()))) .build(); TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null); @@ -127,7 +127,7 @@ public void testNothingFixedWhenOnNextVersion() { ClusterState testState = ClusterState.builder(ClusterState.EMPTY_STATE) .nodes(node(NEXT_VERSION)) - .compatibilityVersions(versions(new CompatibilityVersions(NEXT_TRANSPORT_VERSION))) + .compatibilityVersions(versions(new CompatibilityVersions(NEXT_TRANSPORT_VERSION, Map.of()))) .build(); TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null); @@ -143,7 +143,10 @@ public void testNothingFixedWhenOnPreviousVersion() { ClusterState testState = ClusterState.builder(ClusterState.EMPTY_STATE) .nodes(node(Version.V_8_7_0, Version.V_8_8_0)) .compatibilityVersions( - Maps.transformValues(versions(TransportVersions.V_8_7_0, TransportVersions.V_8_8_0), CompatibilityVersions::new) + Maps.transformValues( + versions(TransportVersions.V_8_7_0, TransportVersions.V_8_8_0), + transportVersion -> new CompatibilityVersions(transportVersion, Map.of()) + ) ) .build(); @@ -163,7 +166,7 @@ public void testVersionsAreFixed() { .compatibilityVersions( Maps.transformValues( versions(NEXT_TRANSPORT_VERSION, TransportVersions.V_8_8_0, TransportVersions.V_8_8_0), - CompatibilityVersions::new + transportVersion -> new CompatibilityVersions(transportVersion, Map.of()) ) ) .build(); @@ -192,7 +195,7 @@ public void testConcurrentChangesDoNotOverlap() { .compatibilityVersions( Maps.transformValues( versions(NEXT_TRANSPORT_VERSION, TransportVersions.V_8_8_0, TransportVersions.V_8_8_0), - CompatibilityVersions::new + transportVersion -> new CompatibilityVersions(transportVersion, Map.of()) ) ) .build(); @@ -207,7 +210,7 @@ public void testConcurrentChangesDoNotOverlap() { .compatibilityVersions( Maps.transformValues( versions(NEXT_TRANSPORT_VERSION, NEXT_TRANSPORT_VERSION, TransportVersions.V_8_8_0), - CompatibilityVersions::new + transportVersion -> new CompatibilityVersions(transportVersion, Map.of()) ) ) .build(); @@ -228,7 +231,7 @@ public void testFailedRequestsAreRetried() { .compatibilityVersions( Maps.transformValues( versions(NEXT_TRANSPORT_VERSION, TransportVersions.V_8_8_0, TransportVersions.V_8_8_0), - CompatibilityVersions::new + transportVersion -> new CompatibilityVersions(transportVersion, Map.of()) ) ) .build(); diff --git a/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java b/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java index 0391cbf83608c..b3b598f2bd38c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java @@ -10,21 +10,75 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; +import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TransportVersionUtils; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.stream.IntStream; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; public class CompatibilityVersionsTests extends ESTestCase { - public void testMinimumVersions() { + public void testEmptyVersionsList() { assertThat( - CompatibilityVersions.minimumVersions(Map.of()), - equalTo(new CompatibilityVersions(TransportVersions.MINIMUM_COMPATIBLE)) + CompatibilityVersions.minimumVersions(List.of()), + equalTo(new CompatibilityVersions(TransportVersions.MINIMUM_COMPATIBLE, Map.of())) ); + } + + public void testMinimumTransportVersions() { + TransportVersion version1 = TransportVersionUtils.getNextVersion(TransportVersions.MINIMUM_COMPATIBLE, true); + TransportVersion version2 = TransportVersionUtils.randomVersionBetween( + random(), + TransportVersionUtils.getNextVersion(version1, true), + TransportVersion.current() + ); + + CompatibilityVersions compatibilityVersions1 = new CompatibilityVersions(version1, Map.of()); + CompatibilityVersions compatibilityVersions2 = new CompatibilityVersions(version2, Map.of()); + + List versions = List.of(compatibilityVersions1, compatibilityVersions2); + + assertThat(CompatibilityVersions.minimumVersions(versions), equalTo(compatibilityVersions1)); + } + + public void testMinimumMappingsVersions() { + SystemIndexDescriptor.MappingsVersion v1 = new SystemIndexDescriptor.MappingsVersion(1, 1); + SystemIndexDescriptor.MappingsVersion v2 = new SystemIndexDescriptor.MappingsVersion(2, 2); + SystemIndexDescriptor.MappingsVersion v3 = new SystemIndexDescriptor.MappingsVersion(3, 3); + Map mappings1 = Map.of(".system-index-1", v3, ".system-index-2", v1); + Map mappings2 = Map.of(".system-index-1", v2, ".system-index-2", v2); + Map mappings3 = Map.of(".system-index-3", v1); + + CompatibilityVersions compatibilityVersions1 = new CompatibilityVersions(TransportVersion.current(), mappings1); + CompatibilityVersions compatibilityVersions2 = new CompatibilityVersions(TransportVersion.current(), mappings2); + CompatibilityVersions compatibilityVersions3 = new CompatibilityVersions(TransportVersion.current(), mappings3); + + List versions = List.of(compatibilityVersions1, compatibilityVersions2, compatibilityVersions3); + + assertThat( + CompatibilityVersions.minimumVersions(versions), + equalTo( + new CompatibilityVersions( + TransportVersion.current(), + Map.of(".system-index-1", v2, ".system-index-2", v1, ".system-index-3", v1) + ) + ) + ); + } + /** + * By design, all versions should increase monotonically through releases, so we shouldn't have a situation + * where the minimum transport version is in one CompatibilityVersions object and a minimum system + * index is in another. However, the minimumVersions method we're testing will handle that situation without + * complaint. + */ + public void testMinimumsAreMerged() { TransportVersion version1 = TransportVersionUtils.getNextVersion(TransportVersions.MINIMUM_COMPATIBLE, true); TransportVersion version2 = TransportVersionUtils.randomVersionBetween( random(), @@ -32,11 +86,84 @@ public void testMinimumVersions() { TransportVersion.current() ); - CompatibilityVersions compatibilityVersions1 = new CompatibilityVersions(version1); - CompatibilityVersions compatibilityVersions2 = new CompatibilityVersions(version2); + SystemIndexDescriptor.MappingsVersion v1 = new SystemIndexDescriptor.MappingsVersion(1, 1); + SystemIndexDescriptor.MappingsVersion v2 = new SystemIndexDescriptor.MappingsVersion(2, 2); + Map mappings1 = Map.of(".system-index-1", v2); + Map mappings2 = Map.of(".system-index-1", v1); + + CompatibilityVersions compatibilityVersions1 = new CompatibilityVersions(version1, mappings1); + CompatibilityVersions compatibilityVersions2 = new CompatibilityVersions(version2, mappings2); + + List versions = List.of(compatibilityVersions1, compatibilityVersions2); + + assertThat(CompatibilityVersions.minimumVersions(versions), equalTo(new CompatibilityVersions(version1, mappings2))); + } + + public void testPreventJoinClusterWithUnsupportedTransportVersion() { + List transportVersions = IntStream.range(0, randomIntBetween(2, 10)) + .mapToObj(i -> TransportVersionUtils.randomCompatibleVersion(random())) + .toList(); + TransportVersion min = Collections.min(transportVersions); + List compatibilityVersions = transportVersions.stream() + .map(transportVersion -> new CompatibilityVersions(transportVersion, Map.of())) + .toList(); + + // should not throw + CompatibilityVersions.ensureVersionsCompatibility( + new CompatibilityVersions(TransportVersionUtils.randomVersionBetween(random(), min, TransportVersion.current()), Map.of()), + compatibilityVersions + ); + + IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> CompatibilityVersions.ensureVersionsCompatibility( + new CompatibilityVersions( + TransportVersionUtils.randomVersionBetween( + random(), + TransportVersionUtils.getFirstVersion(), + TransportVersionUtils.getPreviousVersion(min) + ), + Map.of() + ), + compatibilityVersions + ) + ); + assertThat(e.getMessage(), containsString("may not join a cluster with minimum transport version")); + } + + public void testPreventJoinClusterWithUnsupportedMappingsVersion() { + List compatibilityVersions = IntStream.range(0, randomIntBetween(2, 10)) + .mapToObj( + i -> new CompatibilityVersions( + TransportVersion.current(), + Map.of(".system-index", new SystemIndexDescriptor.MappingsVersion(randomIntBetween(2, 10), -1)) + ) + ) + .toList(); + int min = compatibilityVersions.stream() + .mapToInt(v -> v.systemIndexMappingsVersion().get(".system-index").version()) + .min() + .orElse(2); - Map versionsMap = Map.of("node1", compatibilityVersions1, "node2", compatibilityVersions2); + // should not throw + CompatibilityVersions.ensureVersionsCompatibility( + new CompatibilityVersions( + TransportVersion.current(), + Map.of(".system-index", new SystemIndexDescriptor.MappingsVersion(min, -1)) + ), + compatibilityVersions + ); - assertThat(CompatibilityVersions.minimumVersions(versionsMap), equalTo(compatibilityVersions1)); + IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> CompatibilityVersions.ensureVersionsCompatibility( + new CompatibilityVersions( + TransportVersion.current(), + Map.of(".system-index", new SystemIndexDescriptor.MappingsVersion(randomIntBetween(1, min - 1), -1)) + ), + compatibilityVersions + ) + ); + assertThat(e.getMessage(), containsString("may not join a cluster with minimum system index mappings versions")); } } diff --git a/server/src/test/java/org/elasticsearch/indices/SystemIndexDescriptorTests.java b/server/src/test/java/org/elasticsearch/indices/SystemIndexDescriptorTests.java index 7a651dea0c878..1a372de6129a3 100644 --- a/server/src/test/java/org/elasticsearch/indices/SystemIndexDescriptorTests.java +++ b/server/src/test/java/org/elasticsearch/indices/SystemIndexDescriptorTests.java @@ -386,6 +386,18 @@ public void testNegativeMappingsVersion() { assertThat(e.getMessage(), equalTo("The mappings version must not be negative")); } + public void testMappingsVersionCompareTo() { + SystemIndexDescriptor.MappingsVersion mv1 = new SystemIndexDescriptor.MappingsVersion(1, randomInt(20)); + SystemIndexDescriptor.MappingsVersion mv2 = new SystemIndexDescriptor.MappingsVersion(2, randomInt(20)); + + NullPointerException e = expectThrows(NullPointerException.class, () -> mv1.compareTo(null)); + assertThat(e.getMessage(), equalTo("Cannot compare null MappingsVersion")); + + assertThat(mv1.compareTo(mv2), equalTo(-1)); + assertThat(mv1.compareTo(mv1), equalTo(0)); + assertThat(mv2.compareTo(mv1), equalTo(1)); + } + public void testHashesIgnoreMappingMetadata() { String mappingFormatString = """ { diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index b0ad7d333d172..e547a736df034 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -422,7 +422,7 @@ public ClusterState joinNodesAndBecomeMaster(ClusterState clusterState, List new JoinTask.NodeJoinTask( node, - new CompatibilityVersions(transportVersion), + new CompatibilityVersions(transportVersion, Map.of()), DUMMY_REASON, ActionListener.running(() -> { throw new AssertionError("should not complete publication"); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersionsUtils.java b/test/framework/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersionsUtils.java index 80815dffa4a30..b8949cb92da71 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersionsUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersionsUtils.java @@ -9,8 +9,13 @@ package org.elasticsearch.cluster.version; import org.elasticsearch.TransportVersion; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TransportVersionUtils; +import java.util.Map; + public class CompatibilityVersionsUtils { /** @@ -23,7 +28,7 @@ public class CompatibilityVersionsUtils { * @return Compatibility versions known at compile time. */ public static CompatibilityVersions staticCurrent() { - return new CompatibilityVersions(TransportVersion.current()); + return new CompatibilityVersions(TransportVersion.current(), Map.of()); } /** @@ -34,6 +39,20 @@ public static CompatibilityVersions staticCurrent() { * @return Random valid compatibility versions */ public static CompatibilityVersions staticRandom() { - return new CompatibilityVersions(TransportVersionUtils.randomVersion()); + return new CompatibilityVersions(TransportVersionUtils.randomVersion(), Map.of()); + } + + public static CompatibilityVersions fakeSystemIndicesRandom() { + return new CompatibilityVersions( + TransportVersionUtils.randomVersion(), + ESTestCase.randomMap( + 0, + 3, + () -> Tuple.tuple( + "." + ESTestCase.randomAlphaOfLength(5), + new SystemIndexDescriptor.MappingsVersion(ESTestCase.randomInt(20), ESTestCase.randomInt()) + ) + ) + ); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/TransportVersionUtilsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/TransportVersionUtilsTests.java index 87743158995d4..c85fda5eee4b3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/TransportVersionUtilsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/TransportVersionUtilsTests.java @@ -23,13 +23,13 @@ public class TransportVersionUtilsTests extends ESTestCase { private static final Map transportVersions = Map.of( "Alfredo", - new CompatibilityVersions(TransportVersions.V_7_0_0), + new CompatibilityVersions(TransportVersions.V_7_0_0, Map.of()), "Bertram", - new CompatibilityVersions(TransportVersions.V_7_0_1), + new CompatibilityVersions(TransportVersions.V_7_0_1, Map.of()), "Charles", - new CompatibilityVersions(TransportVersions.V_8_500_020), + new CompatibilityVersions(TransportVersions.V_8_500_020, Map.of()), "Dominic", - new CompatibilityVersions(TransportVersions.V_8_0_0) + new CompatibilityVersions(TransportVersions.V_8_0_0, Map.of()) ); private static final ClusterState state = new ClusterState( @@ -53,7 +53,10 @@ public void testGetMinTransportVersion() { public void testIsMinTransformVersionSameAsCurrent() { assertThat(TransportVersionUtils.isMinTransportVersionSameAsCurrent(state), equalTo(false)); - Map transportVersions1 = Map.of("Eugene", new CompatibilityVersions(TransportVersion.current())); + Map transportVersions1 = Map.of( + "Eugene", + new CompatibilityVersions(TransportVersion.current(), Map.of()) + ); ClusterState state1 = new ClusterState( new ClusterName("harry"),