From cc75cba8524268c90f3272ec04292e63f3f53963 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Tue, 5 Sep 2023 15:48:52 -0400 Subject: [PATCH 01/12] Add system indices to compatibility versions --- .../allocation/AllocationBenchmark.java | 2 +- .../cluster/ClusterStateDiffIT.java | 4 +- .../elasticsearch/cluster/ClusterState.java | 6 +- .../cluster/coordination/JoinRequest.java | 3 +- .../version/CompatibilityVersions.java | 45 ++++++++---- .../java/org/elasticsearch/node/Node.java | 5 +- .../cluster/coordination/MessagesTests.java | 6 +- .../coordination/NodeJoinExecutorTests.java | 10 ++- .../TransportVersionsFixupListenerTests.java | 17 +++-- .../version/CompatibilityVersionsTests.java | 70 +++++++++++++++++-- .../indices/cluster/ClusterStateChanges.java | 2 +- .../version/CompatibilityVersionsUtils.java | 6 +- .../ml/utils/TransportVersionUtilsTests.java | 13 ++-- 13 files changed, 143 insertions(+), 46 deletions(-) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java index 56002554cb140..b9b644a1376bb 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java @@ -145,7 +145,7 @@ public void setUp() throws Exception { for (int i = 1; i <= numNodes; i++) { String id = "node" + i; nb.add(Allocators.newNode(id, Collections.singletonMap("tag", "tag_" + (i % numTags)))); - compatibilityVersions.put(id, new CompatibilityVersions(TransportVersion.current())); + compatibilityVersions.put(id, new CompatibilityVersions(TransportVersion.current(), Map.of())); } initialClusterState = ClusterState.builder(ClusterName.DEFAULT) .metadata(metadata) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java index f561cc50b4f19..1990dbab4384c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java @@ -238,7 +238,7 @@ private ClusterState.Builder randomNodes(ClusterState clusterState) { versions.remove(nodeId); if (randomBoolean()) { nodes.add(randomNode(nodeId)); - versions.put(nodeId, new CompatibilityVersions(TransportVersionUtils.randomVersion(random()))); + versions.put(nodeId, new CompatibilityVersions(TransportVersionUtils.randomVersion(random()), Map.of())); } } } @@ -246,7 +246,7 @@ private ClusterState.Builder randomNodes(ClusterState clusterState) { for (int i = 0; i < additionalNodeCount; i++) { String id = "node-" + randomAlphaOfLength(10); nodes.add(randomNode(id)); - versions.put(id, new CompatibilityVersions(TransportVersionUtils.randomVersion(random()))); + versions.put(id, new CompatibilityVersions(TransportVersionUtils.randomVersion(random()), Map.of())); } return ClusterState.builder(clusterState).nodes(nodes).compatibilityVersions(versions); diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index 6ca8fe26edd54..d9dc7a0221704 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -138,7 +138,7 @@ public void write(CompatibilityVersions value, StreamOutput out) throws IOExcept @Override public CompatibilityVersions read(StreamInput in, String key) throws IOException { - return new CompatibilityVersions(TransportVersion.readVersion(in)); + return new CompatibilityVersions(TransportVersion.readVersion(in), Map.of()); } }; @@ -222,7 +222,7 @@ public ClusterState( this.routingNodes = routingNodes; assert assertConsistentRoutingNodes(routingTable, nodes, routingNodes); this.minVersions = blocks.hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) - ? new CompatibilityVersions(TransportVersions.MINIMUM_COMPATIBLE) + ? new CompatibilityVersions(TransportVersions.MINIMUM_COMPATIBLE, Map.of()) : CompatibilityVersions.minimumVersions(compatibilityVersions); } @@ -788,7 +788,7 @@ public DiscoveryNodes nodes() { } public Builder putTransportVersion(String nodeId, TransportVersion transportVersion) { - compatibilityVersions.put(nodeId, new CompatibilityVersions(Objects.requireNonNull(transportVersion, nodeId))); + compatibilityVersions.put(nodeId, new CompatibilityVersions(Objects.requireNonNull(transportVersion, nodeId), Map.of())); return this; } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java index 3d70fca6723af..8e86f1eba2951 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java @@ -16,6 +16,7 @@ import org.elasticsearch.transport.TransportRequest; import java.io.IOException; +import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -66,7 +67,7 @@ public JoinRequest(StreamInput in) throws IOException { compatibilityVersions = CompatibilityVersions.readVersion(in); } else { // there's a 1-1 mapping from Version to TransportVersion before 8.8.0 - compatibilityVersions = new CompatibilityVersions(TransportVersion.fromId(sourceNode.getVersion().id)); + compatibilityVersions = new CompatibilityVersions(TransportVersion.fromId(sourceNode.getVersion().id), Map.of()); } minimumTerm = in.readLong(); optionalJoin = Optional.ofNullable(in.readOptionalWriteable(Join::new)); diff --git a/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java b/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java index 5e7692e645d6a..4ccb7f5c05d56 100644 --- a/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java +++ b/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java @@ -13,24 +13,30 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.util.Comparator; +import java.util.HashMap; import java.util.Map; /** * Wraps component version numbers for cluster state * *

Cluster state will need to carry version information for different independently versioned components. - * This wrapper lets us wrap these versions one level below {@link org.elasticsearch.cluster.ClusterState}. - * It's similar to {@link org.elasticsearch.cluster.node.VersionInformation}, but this class is meant to - * be constructed during node startup and hold values from plugins as well. + * This wrapper lets us wrap these versions one level below {@link org.elasticsearch.cluster.ClusterState}. It's similar to + * {@link org.elasticsearch.cluster.node.VersionInformation}, but this class is meant to be constructed during node startup and hold values + * from plugins as well. * - * @param transportVersion A transport version, usually a minimum compatible one for a node. + * @param transportVersion A transport version, usually a minimum compatible one for a node. + * @param systemIndexMappingsVersion A map of system index names to versions for their mappings. */ -public record CompatibilityVersions(TransportVersion transportVersion) implements Writeable, ToXContentFragment { +public record CompatibilityVersions( + TransportVersion transportVersion, + Map systemIndexMappingsVersion +) implements Writeable, ToXContentFragment { /** * Constructs a VersionWrapper collecting all the minimum versions from the values of the map. @@ -39,22 +45,32 @@ public record CompatibilityVersions(TransportVersion transportVersion) implement * @return Minimum versions for the cluster */ public static CompatibilityVersions minimumVersions(Map compatibilityVersions) { - return new CompatibilityVersions( - compatibilityVersions.values() - .stream() - .map(CompatibilityVersions::transportVersion) - .min(Comparator.naturalOrder()) - // In practice transportVersions is always nonempty (except in tests) but use a conservative default anyway: - .orElse(TransportVersions.MINIMUM_COMPATIBLE) - ); + TransportVersion minimumTransport = compatibilityVersions.values() + .stream() + .map(CompatibilityVersions::transportVersion) + .min(Comparator.naturalOrder()) + // In practice transportVersions is always nonempty (except in tests) but use a conservative default anyway: + .orElse(TransportVersions.MINIMUM_COMPATIBLE); + + Map minimumMappingsVersions = new HashMap<>(); + compatibilityVersions.values() + .stream() + .flatMap(mv -> mv.systemIndexMappingsVersion().entrySet().stream()) + .forEach( + entry -> minimumMappingsVersions.merge(entry.getKey(), entry.getValue(), (v1, v2) -> v1.version() < v2.version() ? v1 : v2) + ); + + return new CompatibilityVersions(minimumTransport, minimumMappingsVersions); } public static CompatibilityVersions readVersion(StreamInput in) throws IOException { - return new CompatibilityVersions(TransportVersion.readVersion(in)); + // TODO[wrb]: transport version change + return new CompatibilityVersions(TransportVersion.readVersion(in), Map.of()); } @Override public void writeTo(StreamOutput out) throws IOException { + // TODO[wrb]: transport version change TransportVersion.writeVersion(this.transportVersion(), out); } @@ -68,6 +84,7 @@ public void writeTo(StreamOutput out) throws IOException { */ @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + // TODO[wrb]: transport version change builder.field("transport_version", this.transportVersion().toString()); return builder; } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 37f18e36f0a6e..d21db8e016b9e 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -927,7 +927,10 @@ protected Node( ); clusterInfoService.addListener(diskThresholdMonitor::onNewInfo); - CompatibilityVersions compatibilityVersions = new CompatibilityVersions(TransportVersion.current()); + CompatibilityVersions compatibilityVersions = new CompatibilityVersions( + TransportVersion.current(), + systemIndices.getMappingsVersions() + ); final DiscoveryModule discoveryModule = new DiscoveryModule( settings, transportService, diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java index 676914ec0bed2..974070a72c552 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.test.EqualsHashCodeTestUtils.CopyFunction; import org.elasticsearch.test.TransportVersionUtils; +import java.util.Map; import java.util.Optional; import java.util.Set; @@ -243,7 +244,7 @@ public void testJoinRequestEqualsHashCodeSerialization() { ); JoinRequest initialJoinRequest = new JoinRequest( initialJoin.getSourceNode(), - new CompatibilityVersions(TransportVersionUtils.randomVersion()), + new CompatibilityVersions(TransportVersionUtils.randomVersion(), Map.of()), randomNonNegativeLong(), randomBoolean() ? Optional.empty() : Optional.of(initialJoin) ); @@ -263,7 +264,8 @@ public void testJoinRequestEqualsHashCodeSerialization() { return new JoinRequest( joinRequest.getSourceNode(), new CompatibilityVersions( - TransportVersionUtils.randomVersion(Set.of(joinRequest.getCompatibilityVersions().transportVersion())) + TransportVersionUtils.randomVersion(Set.of(joinRequest.getCompatibilityVersions().transportVersion())), + Map.of() ), joinRequest.getMinimumTerm(), joinRequest.getOptionalJoin() diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java index 4807d5ee984ca..f9dc00e6c1e93 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java @@ -46,6 +46,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; @@ -162,11 +163,13 @@ public void testPreventJoinClusterWithUnsupportedTransportVersion() { .mapToObj(i -> TransportVersionUtils.randomCompatibleVersion(random())) .toList(); TransportVersion min = Collections.min(versions); - List compatibilityVersions = versions.stream().map(CompatibilityVersions::new).toList(); + List compatibilityVersions = versions.stream() + .map(transportVersion -> new CompatibilityVersions(transportVersion, Map.of())) + .toList(); // should not throw NodeJoinExecutor.ensureTransportVersionBarrier( - new CompatibilityVersions(TransportVersionUtils.randomVersionBetween(random(), min, TransportVersion.current())), + new CompatibilityVersions(TransportVersionUtils.randomVersionBetween(random(), min, TransportVersion.current()), Map.of()), compatibilityVersions ); expectThrows( @@ -177,7 +180,8 @@ public void testPreventJoinClusterWithUnsupportedTransportVersion() { random(), TransportVersionUtils.getFirstVersion(), TransportVersionUtils.getPreviousVersion(min) - ) + ), + Map.of() ), compatibilityVersions ) diff --git a/server/src/test/java/org/elasticsearch/cluster/service/TransportVersionsFixupListenerTests.java b/server/src/test/java/org/elasticsearch/cluster/service/TransportVersionsFixupListenerTests.java index 8dd8925cd86de..b1c23573fc96f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/TransportVersionsFixupListenerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/TransportVersionsFixupListenerTests.java @@ -110,7 +110,7 @@ public void testNothingFixedWhenNothingToInfer() { ClusterState testState = ClusterState.builder(ClusterState.EMPTY_STATE) .nodes(node(Version.V_8_8_0)) - .compatibilityVersions(versions(new CompatibilityVersions(TransportVersions.V_8_8_0))) + .compatibilityVersions(versions(new CompatibilityVersions(TransportVersions.V_8_8_0, Map.of()))) .build(); TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null); @@ -125,7 +125,7 @@ public void testNothingFixedWhenOnNextVersion() { ClusterState testState = ClusterState.builder(ClusterState.EMPTY_STATE) .nodes(node(NEXT_VERSION)) - .compatibilityVersions(versions(new CompatibilityVersions(NEXT_TRANSPORT_VERSION))) + .compatibilityVersions(versions(new CompatibilityVersions(NEXT_TRANSPORT_VERSION, Map.of()))) .build(); TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null); @@ -141,7 +141,10 @@ public void testNothingFixedWhenOnPreviousVersion() { ClusterState testState = ClusterState.builder(ClusterState.EMPTY_STATE) .nodes(node(Version.V_8_7_0, Version.V_8_8_0)) .compatibilityVersions( - Maps.transformValues(versions(TransportVersions.V_8_7_0, TransportVersions.V_8_8_0), CompatibilityVersions::new) + Maps.transformValues( + versions(TransportVersions.V_8_7_0, TransportVersions.V_8_8_0), + transportVersion -> new CompatibilityVersions(transportVersion, Map.of()) + ) ) .build(); @@ -161,7 +164,7 @@ public void testVersionsAreFixed() { .compatibilityVersions( Maps.transformValues( versions(NEXT_TRANSPORT_VERSION, TransportVersions.V_8_8_0, TransportVersions.V_8_8_0), - CompatibilityVersions::new + transportVersion -> new CompatibilityVersions(transportVersion, Map.of()) ) ) .build(); @@ -190,7 +193,7 @@ public void testConcurrentChangesDoNotOverlap() { .compatibilityVersions( Maps.transformValues( versions(NEXT_TRANSPORT_VERSION, TransportVersions.V_8_8_0, TransportVersions.V_8_8_0), - CompatibilityVersions::new + transportVersion -> new CompatibilityVersions(transportVersion, Map.of()) ) ) .build(); @@ -205,7 +208,7 @@ public void testConcurrentChangesDoNotOverlap() { .compatibilityVersions( Maps.transformValues( versions(NEXT_TRANSPORT_VERSION, NEXT_TRANSPORT_VERSION, TransportVersions.V_8_8_0), - CompatibilityVersions::new + transportVersion -> new CompatibilityVersions(transportVersion, Map.of()) ) ) .build(); @@ -225,7 +228,7 @@ public void testFailedRequestsAreRetried() { .compatibilityVersions( Maps.transformValues( versions(NEXT_TRANSPORT_VERSION, TransportVersions.V_8_8_0, TransportVersions.V_8_8_0), - CompatibilityVersions::new + transportVersion -> new CompatibilityVersions(transportVersion, Map.of()) ) ) .build(); diff --git a/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java b/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java index 0391cbf83608c..1eb2bffc1a867 100644 --- a/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; +import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TransportVersionUtils; @@ -19,12 +20,14 @@ public class CompatibilityVersionsTests extends ESTestCase { - public void testMinimumVersions() { + public void testEmptyVersionsMap() { assertThat( CompatibilityVersions.minimumVersions(Map.of()), - equalTo(new CompatibilityVersions(TransportVersions.MINIMUM_COMPATIBLE)) + equalTo(new CompatibilityVersions(TransportVersions.MINIMUM_COMPATIBLE, Map.of())) ); + } + public void testMinimumTransportVersions() { TransportVersion version1 = TransportVersionUtils.getNextVersion(TransportVersions.MINIMUM_COMPATIBLE, true); TransportVersion version2 = TransportVersionUtils.randomVersionBetween( random(), @@ -32,11 +35,70 @@ public void testMinimumVersions() { TransportVersion.current() ); - CompatibilityVersions compatibilityVersions1 = new CompatibilityVersions(version1); - CompatibilityVersions compatibilityVersions2 = new CompatibilityVersions(version2); + CompatibilityVersions compatibilityVersions1 = new CompatibilityVersions(version1, Map.of()); + CompatibilityVersions compatibilityVersions2 = new CompatibilityVersions(version2, Map.of()); Map versionsMap = Map.of("node1", compatibilityVersions1, "node2", compatibilityVersions2); assertThat(CompatibilityVersions.minimumVersions(versionsMap), equalTo(compatibilityVersions1)); } + + public void testMinimumMappingsVersions() { + SystemIndexDescriptor.MappingsVersion v1 = new SystemIndexDescriptor.MappingsVersion(1, 1); + SystemIndexDescriptor.MappingsVersion v2 = new SystemIndexDescriptor.MappingsVersion(2, 2); + SystemIndexDescriptor.MappingsVersion v3 = new SystemIndexDescriptor.MappingsVersion(3, 3); + Map mappings1 = Map.of(".system-index-1", v3, ".system-index-2", v1); + Map mappings2 = Map.of(".system-index-1", v2, ".system-index-2", v2); + Map mappings3 = Map.of(".system-index-3", v1); + + CompatibilityVersions compatibilityVersions1 = new CompatibilityVersions(TransportVersion.current(), mappings1); + CompatibilityVersions compatibilityVersions2 = new CompatibilityVersions(TransportVersion.current(), mappings2); + CompatibilityVersions compatibilityVersions3 = new CompatibilityVersions(TransportVersion.current(), mappings3); + + Map versionsMap = Map.of( + "node1", + compatibilityVersions1, + "node2", + compatibilityVersions2, + "node3", + compatibilityVersions3 + ); + + assertThat( + CompatibilityVersions.minimumVersions(versionsMap), + equalTo( + new CompatibilityVersions( + TransportVersion.current(), + Map.of(".system-index-1", v2, ".system-index-2", v1, ".system-index-3", v1) + ) + ) + ); + } + + /** + * By design, all versions should increase monotonically through releases, so we shouldn't have a situation + * where the minimum transport version is in one CompatibilityVersions object and a minimum system + * index is in another. However, the minimumVersions method we're testing will handle that situation without + * complaint. + */ + public void testMinimumsAreMerged() { + TransportVersion version1 = TransportVersionUtils.getNextVersion(TransportVersion.MINIMUM_COMPATIBLE, true); + TransportVersion version2 = TransportVersionUtils.randomVersionBetween( + random(), + TransportVersionUtils.getNextVersion(version1, true), + TransportVersion.current() + ); + + SystemIndexDescriptor.MappingsVersion v1 = new SystemIndexDescriptor.MappingsVersion(1, 1); + SystemIndexDescriptor.MappingsVersion v2 = new SystemIndexDescriptor.MappingsVersion(2, 2); + Map mappings1 = Map.of(".system-index-1", v2); + Map mappings2 = Map.of(".system-index-1", v1); + + CompatibilityVersions compatibilityVersions1 = new CompatibilityVersions(version1, mappings1); + CompatibilityVersions compatibilityVersions2 = new CompatibilityVersions(version2, mappings2); + + Map versionsMap = Map.of("node1", compatibilityVersions1, "node2", compatibilityVersions2); + + assertThat(CompatibilityVersions.minimumVersions(versionsMap), equalTo(new CompatibilityVersions(version1, mappings2))); + } } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index b0ad7d333d172..e547a736df034 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -422,7 +422,7 @@ public ClusterState joinNodesAndBecomeMaster(ClusterState clusterState, List new JoinTask.NodeJoinTask( node, - new CompatibilityVersions(transportVersion), + new CompatibilityVersions(transportVersion, Map.of()), DUMMY_REASON, ActionListener.running(() -> { throw new AssertionError("should not complete publication"); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersionsUtils.java b/test/framework/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersionsUtils.java index 80815dffa4a30..0ba87897f285e 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersionsUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersionsUtils.java @@ -11,6 +11,8 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.test.TransportVersionUtils; +import java.util.Map; + public class CompatibilityVersionsUtils { /** @@ -23,7 +25,7 @@ public class CompatibilityVersionsUtils { * @return Compatibility versions known at compile time. */ public static CompatibilityVersions staticCurrent() { - return new CompatibilityVersions(TransportVersion.current()); + return new CompatibilityVersions(TransportVersion.current(), Map.of()); } /** @@ -34,6 +36,6 @@ public static CompatibilityVersions staticCurrent() { * @return Random valid compatibility versions */ public static CompatibilityVersions staticRandom() { - return new CompatibilityVersions(TransportVersionUtils.randomVersion()); + return new CompatibilityVersions(TransportVersionUtils.randomVersion(), Map.of()); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/TransportVersionUtilsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/TransportVersionUtilsTests.java index 87743158995d4..c85fda5eee4b3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/TransportVersionUtilsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/TransportVersionUtilsTests.java @@ -23,13 +23,13 @@ public class TransportVersionUtilsTests extends ESTestCase { private static final Map transportVersions = Map.of( "Alfredo", - new CompatibilityVersions(TransportVersions.V_7_0_0), + new CompatibilityVersions(TransportVersions.V_7_0_0, Map.of()), "Bertram", - new CompatibilityVersions(TransportVersions.V_7_0_1), + new CompatibilityVersions(TransportVersions.V_7_0_1, Map.of()), "Charles", - new CompatibilityVersions(TransportVersions.V_8_500_020), + new CompatibilityVersions(TransportVersions.V_8_500_020, Map.of()), "Dominic", - new CompatibilityVersions(TransportVersions.V_8_0_0) + new CompatibilityVersions(TransportVersions.V_8_0_0, Map.of()) ); private static final ClusterState state = new ClusterState( @@ -53,7 +53,10 @@ public void testGetMinTransportVersion() { public void testIsMinTransformVersionSameAsCurrent() { assertThat(TransportVersionUtils.isMinTransportVersionSameAsCurrent(state), equalTo(false)); - Map transportVersions1 = Map.of("Eugene", new CompatibilityVersions(TransportVersion.current())); + Map transportVersions1 = Map.of( + "Eugene", + new CompatibilityVersions(TransportVersion.current(), Map.of()) + ); ClusterState state1 = new ClusterState( new ClusterName("harry"), From fa03eccfe81239608ba59f2e51166f6ab97d6dfd Mon Sep 17 00:00:00 2001 From: William Brafford Date: Tue, 5 Sep 2023 16:33:55 -0400 Subject: [PATCH 02/12] Serialization --- .../org/elasticsearch/TransportVersions.java | 1 + .../version/CompatibilityVersions.java | 17 +++++++--- .../indices/SystemIndexDescriptor.java | 27 ++++++++++++++- .../cluster/ClusterStateTests.java | 33 ++++++++++++++++--- .../version/CompatibilityVersionsTests.java | 2 +- 5 files changed, 70 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index a5b456236a94d..9bdba8b45d60c 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -135,6 +135,7 @@ static TransportVersion def(int id, String uniqueId) { public static final TransportVersion V_8_500_069 = def(8_500_069, "5b804027-d8a0-421b-9970-1f53d766854b"); public static final TransportVersion V_8_500_070 = def(8_500_070, "6BADC9CD-3C9D-4381-8BD9-B305CAA93F86"); public static final TransportVersion V_8_500_071 = def(8_500_071, "a86dfc08-3026-4f01-90ef-6d6de003e217"); + public static final TransportVersion V_8_500_072 = def(8_500_072, "9128e16a-e4f7-41c4-b04f-842955bfc1b4"); /* * STOP! READ THIS FIRST! No, really, * ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _ diff --git a/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java b/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java index 4ccb7f5c05d56..a94415c74fd9d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java +++ b/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java @@ -64,14 +64,23 @@ public static CompatibilityVersions minimumVersions(Map mappingsVersions = Map.of(); + if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_500_072)) { + mappingsVersions = in.readMap(SystemIndexDescriptor.MappingsVersion::new); + } + + return new CompatibilityVersions(transportVersion, mappingsVersions); } @Override public void writeTo(StreamOutput out) throws IOException { - // TODO[wrb]: transport version change TransportVersion.writeVersion(this.transportVersion(), out); + + if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_500_072)) { + out.writeMap(this.systemIndexMappingsVersion(), (o, v) -> v.writeTo(o)); + } } /** @@ -84,8 +93,8 @@ public void writeTo(StreamOutput out) throws IOException { */ @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - // TODO[wrb]: transport version change builder.field("transport_version", this.transportVersion().toString()); + builder.field("mappings_versions", this.systemIndexMappingsVersion); return builder; } } diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java b/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java index 4d37cea8bcf98..4b8269d1ce8a2 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java @@ -18,15 +18,20 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.SystemIndexMetadataUpgradeService; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lucene.RegExp; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -649,7 +654,27 @@ public boolean isInternal() { * The hash is a hash of the system index descriptor's mappings so that we can warn * in case of inconsistencies across nodes. */ - public record MappingsVersion(int version, int hash) {}; + public record MappingsVersion(int version, int hash) implements Writeable, ToXContent { + + public MappingsVersion(StreamInput in) throws IOException { + this(in.readVInt(), in.readInt()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(version); + out.writeInt(hash); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("version", version); + builder.field("hash", hash); + builder.endObject(); + return builder; + } + } /** * Provides a fluent API for building a {@link SystemIndexDescriptor}. Validation still happens in that class. diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java index e3c5865333b94..461582d28d8f5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.version.CompatibilityVersions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Iterators; @@ -44,6 +45,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TransportVersionUtils; @@ -217,7 +219,13 @@ public void testToXContent() throws IOException { "nodes_versions" : [ { "node_id" : "nodeId1", - "transport_version" : "%s" + "transport_version" : "%s", + "mappings_versions" : { + ".tasks" : { + "version" : 1, + "hash" : 1 + } + } } ], "metadata": { @@ -479,7 +487,13 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti "nodes_versions" : [ { "node_id" : "nodeId1", - "transport_version" : "%s" + "transport_version" : "%s", + "mappings_versions" : { + ".tasks" : { + "version" : 1, + "hash" : 1 + } + } } ], "metadata" : { @@ -737,7 +751,13 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti "nodes_versions" : [ { "node_id" : "nodeId1", - "transport_version" : "%s" + "transport_version" : "%s", + "mappings_versions" : { + ".tasks" : { + "version" : 1, + "hash" : 1 + } + } } ], "metadata" : { @@ -1051,7 +1071,12 @@ private ClusterState buildClusterState() throws IOException { .add(DiscoveryNodeUtils.create("nodeId1", new TransportAddress(InetAddress.getByName("127.0.0.1"), 111))) .build() ) - .putTransportVersion("nodeId1", TransportVersion.current()) + .compatibilityVersions( + Map.of( + "nodeId1", + new CompatibilityVersions(TransportVersion.current(), Map.of(".tasks", new SystemIndexDescriptor.MappingsVersion(1, 1))) + ) + ) .blocks( ClusterBlocks.builder() .addGlobalBlock( diff --git a/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java b/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java index 1eb2bffc1a867..8c29a6ead5c14 100644 --- a/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java @@ -82,7 +82,7 @@ public void testMinimumMappingsVersions() { * complaint. */ public void testMinimumsAreMerged() { - TransportVersion version1 = TransportVersionUtils.getNextVersion(TransportVersion.MINIMUM_COMPATIBLE, true); + TransportVersion version1 = TransportVersionUtils.getNextVersion(TransportVersions.MINIMUM_COMPATIBLE, true); TransportVersion version2 = TransportVersionUtils.randomVersionBetween( random(), TransportVersionUtils.getNextVersion(version1, true), From 789cf99a209bef2669d6867606980347e15fdd64 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Tue, 5 Sep 2023 16:35:37 -0400 Subject: [PATCH 03/12] TODO --- .../org/elasticsearch/cluster/coordination/NodeJoinExecutor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/NodeJoinExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/NodeJoinExecutor.java index 55cf6ea8a398d..030c66102168d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/NodeJoinExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/NodeJoinExecutor.java @@ -143,6 +143,7 @@ public ClusterState execute(BatchExecutionContext batchExecutionContex CompatibilityVersions compatibilityVersions = nodeJoinTask.compatibilityVersions(); if (enforceVersionBarrier) { ensureVersionBarrier(node.getVersion(), minClusterNodeVersion); + // TODO[wrb]: add system indices version barrier and refactor this logic into CompatibilityVersions ensureTransportVersionBarrier(compatibilityVersions, compatibilityVersionsMap.values()); } blockForbiddenVersions(compatibilityVersions.transportVersion()); From 832f402b808a71dbb72a466f0d172836df4745ed Mon Sep 17 00:00:00 2001 From: William Brafford Date: Tue, 5 Sep 2023 17:19:51 -0400 Subject: [PATCH 04/12] Add version barrier for system indices --- .../elasticsearch/cluster/ClusterState.java | 2 +- .../coordination/NodeJoinExecutor.java | 3 +- .../version/CompatibilityVersions.java | 43 +++++++-- .../coordination/NodeJoinExecutorTests.java | 35 -------- .../version/CompatibilityVersionsTests.java | 87 +++++++++++++++---- 5 files changed, 112 insertions(+), 58 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index d9dc7a0221704..cdd0a58bb0739 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -223,7 +223,7 @@ public ClusterState( assert assertConsistentRoutingNodes(routingTable, nodes, routingNodes); this.minVersions = blocks.hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) ? new CompatibilityVersions(TransportVersions.MINIMUM_COMPATIBLE, Map.of()) - : CompatibilityVersions.minimumVersions(compatibilityVersions); + : CompatibilityVersions.minimumVersions(compatibilityVersions.values()); } private static boolean assertConsistentRoutingNodes( diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/NodeJoinExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/NodeJoinExecutor.java index 030c66102168d..dd52f20c7355a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/NodeJoinExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/NodeJoinExecutor.java @@ -143,8 +143,7 @@ public ClusterState execute(BatchExecutionContext batchExecutionContex CompatibilityVersions compatibilityVersions = nodeJoinTask.compatibilityVersions(); if (enforceVersionBarrier) { ensureVersionBarrier(node.getVersion(), minClusterNodeVersion); - // TODO[wrb]: add system indices version barrier and refactor this logic into CompatibilityVersions - ensureTransportVersionBarrier(compatibilityVersions, compatibilityVersionsMap.values()); + CompatibilityVersions.ensureVersionsCompatibility(compatibilityVersions, compatibilityVersionsMap.values()); } blockForbiddenVersions(compatibilityVersions.transportVersion()); ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion); diff --git a/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java b/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java index a94415c74fd9d..64133416cd97a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java +++ b/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java @@ -18,6 +18,7 @@ import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.Map; @@ -44,17 +45,15 @@ public record CompatibilityVersions( * @param compatibilityVersions A map of strings (typically node identifiers) and versions wrappers * @return Minimum versions for the cluster */ - public static CompatibilityVersions minimumVersions(Map compatibilityVersions) { - TransportVersion minimumTransport = compatibilityVersions.values() - .stream() + public static CompatibilityVersions minimumVersions(Collection compatibilityVersions) { + TransportVersion minimumTransport = compatibilityVersions.stream() .map(CompatibilityVersions::transportVersion) .min(Comparator.naturalOrder()) // In practice transportVersions is always nonempty (except in tests) but use a conservative default anyway: .orElse(TransportVersions.MINIMUM_COMPATIBLE); Map minimumMappingsVersions = new HashMap<>(); - compatibilityVersions.values() - .stream() + compatibilityVersions.stream() .flatMap(mv -> mv.systemIndexMappingsVersion().entrySet().stream()) .forEach( entry -> minimumMappingsVersions.merge(entry.getKey(), entry.getValue(), (v1, v2) -> v1.version() < v2.version() ? v1 : v2) @@ -63,6 +62,40 @@ public static CompatibilityVersions minimumVersions(Map existing) { + CompatibilityVersions minimumClusterVersions = minimumVersions(existing); + + if (candidate.transportVersion().before(minimumClusterVersions.transportVersion())) { + throw new IllegalStateException( + "node with transport version [" + + candidate.transportVersion() + + "] may not join a cluster with minimum transport version [" + + minimumClusterVersions.transportVersion() + + "]" + ); + } + + Map candidateInvalid = new HashMap<>(); + Map existingInvalid = new HashMap<>(); + for (Map.Entry candidates : candidate.systemIndexMappingsVersion().entrySet()) { + if (minimumClusterVersions.systemIndexMappingsVersion().containsKey(candidates.getKey()) + && minimumClusterVersions.systemIndexMappingsVersion().get(candidates.getKey()).version() > candidates.getValue() + .version()) { + candidateInvalid.put(candidates.getKey(), candidates.getValue()); + existingInvalid.put(candidates.getKey(), minimumClusterVersions.systemIndexMappingsVersion().get(candidates.getKey())); + } + } + if (candidateInvalid.isEmpty() == false) { + throw new IllegalStateException( + "node with system index mappings versions [" + + candidateInvalid + + "] may not join a cluster with minimum system index mappings versions [" + + existingInvalid + + "]" + ); + } + } + public static CompatibilityVersions readVersion(StreamInput in) throws IOException { TransportVersion transportVersion = TransportVersion.readVersion(in); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java index f9dc00e6c1e93..19aa035817a01 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java @@ -8,7 +8,6 @@ package org.elasticsearch.cluster.coordination; import org.apache.logging.log4j.Level; -import org.elasticsearch.TransportVersion; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; @@ -28,7 +27,6 @@ import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils; -import org.elasticsearch.cluster.version.CompatibilityVersions; import org.elasticsearch.cluster.version.CompatibilityVersionsUtils; import org.elasticsearch.common.Priority; import org.elasticsearch.common.ReferenceDocs; @@ -38,7 +36,6 @@ import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; -import org.elasticsearch.test.TransportVersionUtils; import org.elasticsearch.test.index.IndexVersionUtils; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -46,10 +43,8 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.stream.IntStream; import java.util.stream.Stream; import static org.elasticsearch.cluster.metadata.DesiredNodesTestCase.assertDesiredNodesStatusIsCorrect; @@ -158,36 +153,6 @@ public void testPreventJoinClusterWithUnsupportedNodeVersions() { } } - public void testPreventJoinClusterWithUnsupportedTransportVersion() { - List versions = IntStream.range(0, randomIntBetween(2, 10)) - .mapToObj(i -> TransportVersionUtils.randomCompatibleVersion(random())) - .toList(); - TransportVersion min = Collections.min(versions); - List compatibilityVersions = versions.stream() - .map(transportVersion -> new CompatibilityVersions(transportVersion, Map.of())) - .toList(); - - // should not throw - NodeJoinExecutor.ensureTransportVersionBarrier( - new CompatibilityVersions(TransportVersionUtils.randomVersionBetween(random(), min, TransportVersion.current()), Map.of()), - compatibilityVersions - ); - expectThrows( - IllegalStateException.class, - () -> NodeJoinExecutor.ensureTransportVersionBarrier( - new CompatibilityVersions( - TransportVersionUtils.randomVersionBetween( - random(), - TransportVersionUtils.getFirstVersion(), - TransportVersionUtils.getPreviousVersion(min) - ), - Map.of() - ), - compatibilityVersions - ) - ); - } - public void testSuccess() { Settings.builder().build(); Metadata.Builder metaBuilder = Metadata.builder(); diff --git a/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java b/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java index 8c29a6ead5c14..02251eff8d3e6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java @@ -14,15 +14,18 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TransportVersionUtils; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.stream.IntStream; import static org.hamcrest.Matchers.equalTo; public class CompatibilityVersionsTests extends ESTestCase { - public void testEmptyVersionsMap() { + public void testEmptyVersionsList() { assertThat( - CompatibilityVersions.minimumVersions(Map.of()), + CompatibilityVersions.minimumVersions(List.of()), equalTo(new CompatibilityVersions(TransportVersions.MINIMUM_COMPATIBLE, Map.of())) ); } @@ -38,9 +41,9 @@ public void testMinimumTransportVersions() { CompatibilityVersions compatibilityVersions1 = new CompatibilityVersions(version1, Map.of()); CompatibilityVersions compatibilityVersions2 = new CompatibilityVersions(version2, Map.of()); - Map versionsMap = Map.of("node1", compatibilityVersions1, "node2", compatibilityVersions2); + List versions = List.of(compatibilityVersions1, compatibilityVersions2); - assertThat(CompatibilityVersions.minimumVersions(versionsMap), equalTo(compatibilityVersions1)); + assertThat(CompatibilityVersions.minimumVersions(versions), equalTo(compatibilityVersions1)); } public void testMinimumMappingsVersions() { @@ -55,17 +58,10 @@ public void testMinimumMappingsVersions() { CompatibilityVersions compatibilityVersions2 = new CompatibilityVersions(TransportVersion.current(), mappings2); CompatibilityVersions compatibilityVersions3 = new CompatibilityVersions(TransportVersion.current(), mappings3); - Map versionsMap = Map.of( - "node1", - compatibilityVersions1, - "node2", - compatibilityVersions2, - "node3", - compatibilityVersions3 - ); + List versions = List.of(compatibilityVersions1, compatibilityVersions2, compatibilityVersions3); assertThat( - CompatibilityVersions.minimumVersions(versionsMap), + CompatibilityVersions.minimumVersions(versions), equalTo( new CompatibilityVersions( TransportVersion.current(), @@ -97,8 +93,69 @@ public void testMinimumsAreMerged() { CompatibilityVersions compatibilityVersions1 = new CompatibilityVersions(version1, mappings1); CompatibilityVersions compatibilityVersions2 = new CompatibilityVersions(version2, mappings2); - Map versionsMap = Map.of("node1", compatibilityVersions1, "node2", compatibilityVersions2); + List versions = List.of(compatibilityVersions1, compatibilityVersions2); + + assertThat(CompatibilityVersions.minimumVersions(versions), equalTo(new CompatibilityVersions(version1, mappings2))); + } + + public void testPreventJoinClusterWithUnsupportedTransportVersion() { + List transportVersions = IntStream.range(0, randomIntBetween(2, 10)) + .mapToObj(i -> TransportVersionUtils.randomCompatibleVersion(random())) + .toList(); + TransportVersion min = Collections.min(transportVersions); + List compatibilityVersions = transportVersions.stream() + .map(transportVersion -> new CompatibilityVersions(transportVersion, Map.of())) + .toList(); + + // should not throw + CompatibilityVersions.ensureVersionsCompatibility( + new CompatibilityVersions(TransportVersionUtils.randomVersionBetween(random(), min, TransportVersion.current()), Map.of()), + compatibilityVersions + ); + expectThrows( + IllegalStateException.class, + () -> CompatibilityVersions.ensureVersionsCompatibility( + new CompatibilityVersions( + TransportVersionUtils.randomVersionBetween( + random(), + TransportVersionUtils.getFirstVersion(), + TransportVersionUtils.getPreviousVersion(min) + ), + Map.of() + ), + compatibilityVersions + ) + ); + } - assertThat(CompatibilityVersions.minimumVersions(versionsMap), equalTo(new CompatibilityVersions(version1, mappings2))); + public void testPreventJoinClusterWithUnsupportedMappingsVersion() { + List compatibilityVersions = IntStream.range(0, randomIntBetween(2, 10)) + .mapToObj( + i -> new CompatibilityVersions( + TransportVersion.current(), + Map.of(".system-index", new SystemIndexDescriptor.MappingsVersion(randomIntBetween(2, 10), -1)) + ) + ) + .toList(); + int min = compatibilityVersions.stream().mapToInt(v -> v.systemIndexMappingsVersion().get(".system-index").version()).min().orElse(2); + + // should not throw + CompatibilityVersions.ensureVersionsCompatibility( + new CompatibilityVersions( + TransportVersion.current(), + Map.of(".system-index", new SystemIndexDescriptor.MappingsVersion(min, -1)) + ), + compatibilityVersions + ); + expectThrows( + IllegalStateException.class, + () -> CompatibilityVersions.ensureVersionsCompatibility( + new CompatibilityVersions( + TransportVersion.current(), + Map.of(".system-index", new SystemIndexDescriptor.MappingsVersion(randomIntBetween(1, min - 1), -1)) + ), + compatibilityVersions + ) + ); } } From ef827ec660855f24b21a18685beffdca70b23312 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Wed, 6 Sep 2023 10:13:23 -0400 Subject: [PATCH 05/12] Add get min mappings versions method to clusterstate --- .../main/java/org/elasticsearch/cluster/ClusterState.java | 5 +++++ .../cluster/version/CompatibilityVersionsTests.java | 5 ++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index cdd0a58bb0739..aedf8d8530318 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -45,6 +45,7 @@ import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; import org.elasticsearch.core.Nullable; +import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContent; @@ -287,6 +288,10 @@ public TransportVersion getMinTransportVersion() { return this.minVersions.transportVersion(); } + public Map getMinSystemIndexMappingVersions() { + return this.minVersions.systemIndexMappingsVersion(); + } + public Metadata metadata() { return this.metadata; } diff --git a/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java b/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java index 02251eff8d3e6..a3a81377a0705 100644 --- a/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java @@ -137,7 +137,10 @@ public void testPreventJoinClusterWithUnsupportedMappingsVersion() { ) ) .toList(); - int min = compatibilityVersions.stream().mapToInt(v -> v.systemIndexMappingsVersion().get(".system-index").version()).min().orElse(2); + int min = compatibilityVersions.stream() + .mapToInt(v -> v.systemIndexMappingsVersion().get(".system-index").version()) + .min() + .orElse(2); // should not throw CompatibilityVersions.ensureVersionsCompatibility( From ed470c815178eabe8cbfc1b7d6146a67b783b0b8 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Thu, 7 Sep 2023 09:56:14 -0400 Subject: [PATCH 06/12] Delegate cluster state diff value writer --- .../src/main/java/org/elasticsearch/cluster/ClusterState.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index aedf8d8530318..ff0dd62e18e83 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -134,12 +134,12 @@ default boolean isPrivate() { new DiffableUtils.NonDiffableValueSerializer<>() { @Override public void write(CompatibilityVersions value, StreamOutput out) throws IOException { - TransportVersion.writeVersion(value.transportVersion(), out); + value.writeTo(out); } @Override public CompatibilityVersions read(StreamInput in, String key) throws IOException { - return new CompatibilityVersions(TransportVersion.readVersion(in), Map.of()); + return CompatibilityVersions.readVersion(in); } }; From d69494a27ee8835d57967eac3ecc2d1d178eb84d Mon Sep 17 00:00:00 2001 From: William Brafford Date: Thu, 7 Sep 2023 13:38:24 -0400 Subject: [PATCH 07/12] Add convenience method for fake data to CompatibilityVersionsUtils --- .../cluster/ClusterStateDiffIT.java | 5 +++-- .../org/elasticsearch/cluster/ClusterState.java | 1 + .../cluster/coordination/JoinRequest.java | 1 + .../cluster/coordination/MessagesTests.java | 3 ++- .../version/CompatibilityVersionsUtils.java | 17 +++++++++++++++++ 5 files changed, 24 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java index 1990dbab4384c..373213be479a7 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.version.CompatibilityVersions; +import org.elasticsearch.cluster.version.CompatibilityVersionsUtils; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -238,7 +239,7 @@ private ClusterState.Builder randomNodes(ClusterState clusterState) { versions.remove(nodeId); if (randomBoolean()) { nodes.add(randomNode(nodeId)); - versions.put(nodeId, new CompatibilityVersions(TransportVersionUtils.randomVersion(random()), Map.of())); + versions.put(nodeId, CompatibilityVersionsUtils.fakeSystemIndicesRandom()); } } } @@ -246,7 +247,7 @@ private ClusterState.Builder randomNodes(ClusterState clusterState) { for (int i = 0; i < additionalNodeCount; i++) { String id = "node-" + randomAlphaOfLength(10); nodes.add(randomNode(id)); - versions.put(id, new CompatibilityVersions(TransportVersionUtils.randomVersion(random()), Map.of())); + versions.put(id, CompatibilityVersionsUtils.fakeSystemIndicesRandom()); } return ClusterState.builder(clusterState).nodes(nodes).compatibilityVersions(versions); diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index ff0dd62e18e83..0cc04c28f7b9a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -792,6 +792,7 @@ public DiscoveryNodes nodes() { return nodes; } + // TODO[wrb]: Add Map argument public Builder putTransportVersion(String nodeId, TransportVersion transportVersion) { compatibilityVersions.put(nodeId, new CompatibilityVersions(Objects.requireNonNull(transportVersion, nodeId), Map.of())); return this; diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java index 8e86f1eba2951..d8958f75c7aa5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java @@ -67,6 +67,7 @@ public JoinRequest(StreamInput in) throws IOException { compatibilityVersions = CompatibilityVersions.readVersion(in); } else { // there's a 1-1 mapping from Version to TransportVersion before 8.8.0 + // no known mapping versions here compatibilityVersions = new CompatibilityVersions(TransportVersion.fromId(sourceNode.getVersion().id), Map.of()); } minimumTerm = in.readLong(); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java index 974070a72c552..f779d5ea56dfa 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.version.CompatibilityVersions; +import org.elasticsearch.cluster.version.CompatibilityVersionsUtils; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.EqualsHashCodeTestUtils; @@ -244,7 +245,7 @@ public void testJoinRequestEqualsHashCodeSerialization() { ); JoinRequest initialJoinRequest = new JoinRequest( initialJoin.getSourceNode(), - new CompatibilityVersions(TransportVersionUtils.randomVersion(), Map.of()), + CompatibilityVersionsUtils.fakeSystemIndicesRandom(), randomNonNegativeLong(), randomBoolean() ? Optional.empty() : Optional.of(initialJoin) ); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersionsUtils.java b/test/framework/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersionsUtils.java index 0ba87897f285e..b8949cb92da71 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersionsUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersionsUtils.java @@ -9,6 +9,9 @@ package org.elasticsearch.cluster.version; import org.elasticsearch.TransportVersion; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TransportVersionUtils; import java.util.Map; @@ -38,4 +41,18 @@ public static CompatibilityVersions staticCurrent() { public static CompatibilityVersions staticRandom() { return new CompatibilityVersions(TransportVersionUtils.randomVersion(), Map.of()); } + + public static CompatibilityVersions fakeSystemIndicesRandom() { + return new CompatibilityVersions( + TransportVersionUtils.randomVersion(), + ESTestCase.randomMap( + 0, + 3, + () -> Tuple.tuple( + "." + ESTestCase.randomAlphaOfLength(5), + new SystemIndexDescriptor.MappingsVersion(ESTestCase.randomInt(20), ESTestCase.randomInt()) + ) + ) + ); + } } From 2e0e99ded21f3c0cbe2b1dfa87c40e25cb7fd854 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Thu, 7 Sep 2023 11:32:56 -0400 Subject: [PATCH 08/12] Fix ClusterRerouteResponseTests --- .../admin/cluster/reroute/ClusterRerouteResponseTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java index d2a482fa58b0e..51c0c9ca6d87f 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java @@ -135,7 +135,8 @@ public void testToXContentWithDeprecatedClusterState() { "nodes_versions": [ { "node_id": "node0", - "transport_version": "8000099" + "transport_version": "8000099", + "mappings_versions": {} } ], "metadata": { From f4fe77c5a93bff28d49c4f2ffa7ecdb23f98fac4 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Mon, 11 Sep 2023 09:53:38 -0400 Subject: [PATCH 09/12] Add comments explaining uses of empty mappings versions --- .../benchmark/routing/allocation/AllocationBenchmark.java | 1 + .../src/main/java/org/elasticsearch/cluster/ClusterState.java | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java index b9b644a1376bb..9daa5c24f3bd4 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java @@ -145,6 +145,7 @@ public void setUp() throws Exception { for (int i = 1; i <= numNodes; i++) { String id = "node" + i; nb.add(Allocators.newNode(id, Collections.singletonMap("tag", "tag_" + (i % numTags)))); + // system index mappings versions not needed here, so we use Map.of() compatibilityVersions.put(id, new CompatibilityVersions(TransportVersion.current(), Map.of())); } initialClusterState = ClusterState.builder(ClusterName.DEFAULT) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index b55a293c2b967..603a93ab11f79 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -223,7 +223,7 @@ public ClusterState( this.routingNodes = routingNodes; assert assertConsistentRoutingNodes(routingTable, nodes, routingNodes); this.minVersions = blocks.hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) - ? new CompatibilityVersions(TransportVersions.MINIMUM_COMPATIBLE, Map.of()) + ? new CompatibilityVersions(TransportVersions.MINIMUM_COMPATIBLE, Map.of()) // empty map because cluster state is unknown : CompatibilityVersions.minimumVersions(compatibilityVersions.values()); } @@ -777,8 +777,8 @@ public DiscoveryNodes nodes() { return nodes; } - // TODO[wrb]: Add Map argument public Builder putTransportVersion(String nodeId, TransportVersion transportVersion) { + // TODO[wrb]: system index mappings versions will be added in a followup compatibilityVersions.put(nodeId, new CompatibilityVersions(Objects.requireNonNull(transportVersion, nodeId), Map.of())); return this; } From 5a584fa9264d7789dd97b5fd3e8346c595f1dce1 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Mon, 11 Sep 2023 09:55:38 -0400 Subject: [PATCH 10/12] bump transport version --- server/src/main/java/org/elasticsearch/TransportVersions.java | 2 +- .../elasticsearch/cluster/version/CompatibilityVersions.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 9bdba8b45d60c..3c46a09bc9885 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -135,7 +135,7 @@ static TransportVersion def(int id, String uniqueId) { public static final TransportVersion V_8_500_069 = def(8_500_069, "5b804027-d8a0-421b-9970-1f53d766854b"); public static final TransportVersion V_8_500_070 = def(8_500_070, "6BADC9CD-3C9D-4381-8BD9-B305CAA93F86"); public static final TransportVersion V_8_500_071 = def(8_500_071, "a86dfc08-3026-4f01-90ef-6d6de003e217"); - public static final TransportVersion V_8_500_072 = def(8_500_072, "9128e16a-e4f7-41c4-b04f-842955bfc1b4"); + public static final TransportVersion V_8_500_073 = def(8_500_073, "9128e16a-e4f7-41c4-b04f-842955bfc1b4"); /* * STOP! READ THIS FIRST! No, really, * ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _ diff --git a/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java b/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java index 64133416cd97a..a88af617dae2f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java +++ b/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java @@ -100,7 +100,7 @@ public static CompatibilityVersions readVersion(StreamInput in) throws IOExcepti TransportVersion transportVersion = TransportVersion.readVersion(in); Map mappingsVersions = Map.of(); - if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_500_072)) { + if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_500_073)) { mappingsVersions = in.readMap(SystemIndexDescriptor.MappingsVersion::new); } @@ -111,7 +111,7 @@ public static CompatibilityVersions readVersion(StreamInput in) throws IOExcepti public void writeTo(StreamOutput out) throws IOException { TransportVersion.writeVersion(this.transportVersion(), out); - if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_500_072)) { + if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_500_073)) { out.writeMap(this.systemIndexMappingsVersion(), (o, v) -> v.writeTo(o)); } } From 1ed2b1ae32a43215a0f1a3048917b179b169e756 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Tue, 12 Sep 2023 10:12:15 -0400 Subject: [PATCH 11/12] Make MappingsVersion Comparable --- .../cluster/version/CompatibilityVersions.java | 9 +++++---- .../elasticsearch/indices/SystemIndexDescriptor.java | 8 +++++++- .../indices/SystemIndexDescriptorTests.java | 12 ++++++++++++ 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java b/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java index a88af617dae2f..5130cd4f327c9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java +++ b/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java @@ -22,6 +22,8 @@ import java.util.Comparator; import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Wraps component version numbers for cluster state @@ -52,11 +54,10 @@ public static CompatibilityVersions minimumVersions(Collection minimumMappingsVersions = new HashMap<>(); - compatibilityVersions.stream() + Map minimumMappingsVersions = compatibilityVersions.stream() .flatMap(mv -> mv.systemIndexMappingsVersion().entrySet().stream()) - .forEach( - entry -> minimumMappingsVersions.merge(entry.getKey(), entry.getValue(), (v1, v2) -> v1.version() < v2.version() ? v1 : v2) + .collect( + Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (v1, v2) -> Stream.of(v1, v2).min(Comparator.naturalOrder()).get()) ); return new CompatibilityVersions(minimumTransport, minimumMappingsVersions); diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java b/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java index 4b8269d1ce8a2..d6441a2920f43 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java @@ -654,7 +654,7 @@ public boolean isInternal() { * The hash is a hash of the system index descriptor's mappings so that we can warn * in case of inconsistencies across nodes. */ - public record MappingsVersion(int version, int hash) implements Writeable, ToXContent { + public record MappingsVersion(int version, int hash) implements Writeable, ToXContent, Comparable { public MappingsVersion(StreamInput in) throws IOException { this(in.readVInt(), in.readInt()); @@ -674,6 +674,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.endObject(); return builder; } + + @Override + public int compareTo(MappingsVersion o) { + Objects.requireNonNull(o, "Cannot compare null MappingsVersion"); + return Integer.compare(this.version, o.version); + } } /** diff --git a/server/src/test/java/org/elasticsearch/indices/SystemIndexDescriptorTests.java b/server/src/test/java/org/elasticsearch/indices/SystemIndexDescriptorTests.java index 7a651dea0c878..1a372de6129a3 100644 --- a/server/src/test/java/org/elasticsearch/indices/SystemIndexDescriptorTests.java +++ b/server/src/test/java/org/elasticsearch/indices/SystemIndexDescriptorTests.java @@ -386,6 +386,18 @@ public void testNegativeMappingsVersion() { assertThat(e.getMessage(), equalTo("The mappings version must not be negative")); } + public void testMappingsVersionCompareTo() { + SystemIndexDescriptor.MappingsVersion mv1 = new SystemIndexDescriptor.MappingsVersion(1, randomInt(20)); + SystemIndexDescriptor.MappingsVersion mv2 = new SystemIndexDescriptor.MappingsVersion(2, randomInt(20)); + + NullPointerException e = expectThrows(NullPointerException.class, () -> mv1.compareTo(null)); + assertThat(e.getMessage(), equalTo("Cannot compare null MappingsVersion")); + + assertThat(mv1.compareTo(mv2), equalTo(-1)); + assertThat(mv1.compareTo(mv1), equalTo(0)); + assertThat(mv2.compareTo(mv1), equalTo(1)); + } + public void testHashesIgnoreMappingMetadata() { String mappingFormatString = """ { From 59181b418083acbad17296c126e1120b78947b77 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Tue, 12 Sep 2023 10:28:29 -0400 Subject: [PATCH 12/12] Respond to PR feedback --- .../cluster/version/CompatibilityVersions.java | 6 +++--- .../cluster/version/CompatibilityVersionsTests.java | 9 +++++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java b/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java index 5130cd4f327c9..a9c5298a4325e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java +++ b/server/src/main/java/org/elasticsearch/cluster/version/CompatibilityVersions.java @@ -22,6 +22,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -79,9 +80,8 @@ public static void ensureVersionsCompatibility(CompatibilityVersions candidate, Map candidateInvalid = new HashMap<>(); Map existingInvalid = new HashMap<>(); for (Map.Entry candidates : candidate.systemIndexMappingsVersion().entrySet()) { - if (minimumClusterVersions.systemIndexMappingsVersion().containsKey(candidates.getKey()) - && minimumClusterVersions.systemIndexMappingsVersion().get(candidates.getKey()).version() > candidates.getValue() - .version()) { + var mapping = minimumClusterVersions.systemIndexMappingsVersion().get(candidates.getKey()); + if (Objects.nonNull(mapping) && mapping.version() > candidates.getValue().version()) { candidateInvalid.put(candidates.getKey(), candidates.getValue()); existingInvalid.put(candidates.getKey(), minimumClusterVersions.systemIndexMappingsVersion().get(candidates.getKey())); } diff --git a/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java b/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java index a3a81377a0705..b3b598f2bd38c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/version/CompatibilityVersionsTests.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.stream.IntStream; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; public class CompatibilityVersionsTests extends ESTestCase { @@ -112,7 +113,8 @@ public void testPreventJoinClusterWithUnsupportedTransportVersion() { new CompatibilityVersions(TransportVersionUtils.randomVersionBetween(random(), min, TransportVersion.current()), Map.of()), compatibilityVersions ); - expectThrows( + + IllegalStateException e = expectThrows( IllegalStateException.class, () -> CompatibilityVersions.ensureVersionsCompatibility( new CompatibilityVersions( @@ -126,6 +128,7 @@ public void testPreventJoinClusterWithUnsupportedTransportVersion() { compatibilityVersions ) ); + assertThat(e.getMessage(), containsString("may not join a cluster with minimum transport version")); } public void testPreventJoinClusterWithUnsupportedMappingsVersion() { @@ -150,7 +153,8 @@ public void testPreventJoinClusterWithUnsupportedMappingsVersion() { ), compatibilityVersions ); - expectThrows( + + IllegalStateException e = expectThrows( IllegalStateException.class, () -> CompatibilityVersions.ensureVersionsCompatibility( new CompatibilityVersions( @@ -160,5 +164,6 @@ public void testPreventJoinClusterWithUnsupportedMappingsVersion() { compatibilityVersions ) ); + assertThat(e.getMessage(), containsString("may not join a cluster with minimum system index mappings versions")); } }