Skip to content

Commit

Permalink
Add mappings versions to CompatibilityVersions (#99307)
Browse files Browse the repository at this point in the history
CompatibilityVersions now holds a map of system index names to their
mappings versions, alongside the transport version. We also add mapping
versions to the "minimum version barrier": if a node has a system index
whose version is below the cluster mappings version for that system
index, it is not allowed to join the cluster.
  • Loading branch information
williamrandolph committed Sep 12, 2023
1 parent 2072be9 commit b5e06da
Show file tree
Hide file tree
Showing 19 changed files with 354 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ public void setUp() throws Exception {
for (int i = 1; i <= numNodes; i++) {
String id = "node" + i;
nb.add(Allocators.newNode(id, Collections.singletonMap("tag", "tag_" + (i % numTags))));
compatibilityVersions.put(id, new CompatibilityVersions(TransportVersion.current()));
// system index mappings versions not needed here, so we use Map.of()
compatibilityVersions.put(id, new CompatibilityVersions(TransportVersion.current(), Map.of()));
}
initialClusterState = ClusterState.builder(ClusterName.DEFAULT)
.metadata(metadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.cluster.version.CompatibilityVersionsUtils;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand Down Expand Up @@ -238,15 +239,15 @@ private ClusterState.Builder randomNodes(ClusterState clusterState) {
versions.remove(nodeId);
if (randomBoolean()) {
nodes.add(randomNode(nodeId));
versions.put(nodeId, new CompatibilityVersions(TransportVersionUtils.randomVersion(random())));
versions.put(nodeId, CompatibilityVersionsUtils.fakeSystemIndicesRandom());
}
}
}
int additionalNodeCount = randomIntBetween(1, 20);
for (int i = 0; i < additionalNodeCount; i++) {
String id = "node-" + randomAlphaOfLength(10);
nodes.add(randomNode(id));
versions.put(id, new CompatibilityVersions(TransportVersionUtils.randomVersion(random())));
versions.put(id, CompatibilityVersionsUtils.fakeSystemIndicesRandom());
}

return ClusterState.builder(clusterState).nodes(nodes).compatibilityVersions(versions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ static TransportVersion def(int id, String uniqueId) {
public static final TransportVersion V_8_500_070 = def(8_500_070, "6BADC9CD-3C9D-4381-8BD9-B305CAA93F86");
public static final TransportVersion V_8_500_071 = def(8_500_071, "a86dfc08-3026-4f01-90ef-6d6de003e217");
public static final TransportVersion V_8_500_072 = def(8_500_072, "e2df7d80-7b74-4afd-9734-aee0fc256025");
public static final TransportVersion V_8_500_073 = def(8_500_073, "9128e16a-e4f7-41c4-b04f-842955bfc1b4");
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
16 changes: 11 additions & 5 deletions server/src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContent;

Expand Down Expand Up @@ -133,12 +134,12 @@ default boolean isPrivate() {
new DiffableUtils.NonDiffableValueSerializer<>() {
@Override
public void write(CompatibilityVersions value, StreamOutput out) throws IOException {
TransportVersion.writeVersion(value.transportVersion(), out);
value.writeTo(out);
}

@Override
public CompatibilityVersions read(StreamInput in, String key) throws IOException {
return new CompatibilityVersions(TransportVersion.readVersion(in));
return CompatibilityVersions.readVersion(in);
}
};

Expand Down Expand Up @@ -222,8 +223,8 @@ public ClusterState(
this.routingNodes = routingNodes;
assert assertConsistentRoutingNodes(routingTable, nodes, routingNodes);
this.minVersions = blocks.hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
? new CompatibilityVersions(TransportVersions.MINIMUM_COMPATIBLE)
: CompatibilityVersions.minimumVersions(compatibilityVersions);
? new CompatibilityVersions(TransportVersions.MINIMUM_COMPATIBLE, Map.of()) // empty map because cluster state is unknown
: CompatibilityVersions.minimumVersions(compatibilityVersions.values());
}

private static boolean assertConsistentRoutingNodes(
Expand Down Expand Up @@ -287,6 +288,10 @@ public TransportVersion getMinTransportVersion() {
return this.minVersions.transportVersion();
}

public Map<String, SystemIndexDescriptor.MappingsVersion> getMinSystemIndexMappingVersions() {
return this.minVersions.systemIndexMappingsVersion();
}

public Metadata metadata() {
return this.metadata;
}
Expand Down Expand Up @@ -773,7 +778,8 @@ public DiscoveryNodes nodes() {
}

public Builder putTransportVersion(String nodeId, TransportVersion transportVersion) {
compatibilityVersions.put(nodeId, new CompatibilityVersions(Objects.requireNonNull(transportVersion, nodeId)));
// TODO[wrb]: system index mappings versions will be added in a followup
compatibilityVersions.put(nodeId, new CompatibilityVersions(Objects.requireNonNull(transportVersion, nodeId), Map.of()));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

Expand Down Expand Up @@ -66,7 +67,8 @@ public JoinRequest(StreamInput in) throws IOException {
compatibilityVersions = CompatibilityVersions.readVersion(in);
} else {
// there's a 1-1 mapping from Version to TransportVersion before 8.8.0
compatibilityVersions = new CompatibilityVersions(TransportVersion.fromId(sourceNode.getVersion().id));
// no known mapping versions here
compatibilityVersions = new CompatibilityVersions(TransportVersion.fromId(sourceNode.getVersion().id), Map.of());
}
minimumTerm = in.readLong();
optionalJoin = Optional.ofNullable(in.readOptionalWriteable(Join::new));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public ClusterState execute(BatchExecutionContext<JoinTask> batchExecutionContex
CompatibilityVersions compatibilityVersions = nodeJoinTask.compatibilityVersions();
if (enforceVersionBarrier) {
ensureVersionBarrier(node.getVersion(), minClusterNodeVersion);
ensureTransportVersionBarrier(compatibilityVersions, compatibilityVersionsMap.values());
CompatibilityVersions.ensureVersionsCompatibility(compatibilityVersions, compatibilityVersionsMap.values());
}
blockForbiddenVersions(compatibilityVersions.transportVersion());
ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,49 +13,108 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Wraps component version numbers for cluster state
*
* <p>Cluster state will need to carry version information for different independently versioned components.
* This wrapper lets us wrap these versions one level below {@link org.elasticsearch.cluster.ClusterState}.
* It's similar to {@link org.elasticsearch.cluster.node.VersionInformation}, but this class is meant to
* be constructed during node startup and hold values from plugins as well.
* This wrapper lets us wrap these versions one level below {@link org.elasticsearch.cluster.ClusterState}. It's similar to
* {@link org.elasticsearch.cluster.node.VersionInformation}, but this class is meant to be constructed during node startup and hold values
* from plugins as well.
*
* @param transportVersion A transport version, usually a minimum compatible one for a node.
* @param transportVersion A transport version, usually a minimum compatible one for a node.
* @param systemIndexMappingsVersion A map of system index names to versions for their mappings.
*/
public record CompatibilityVersions(TransportVersion transportVersion) implements Writeable, ToXContentFragment {
public record CompatibilityVersions(
TransportVersion transportVersion,
Map<String, org.elasticsearch.indices.SystemIndexDescriptor.MappingsVersion> systemIndexMappingsVersion
) implements Writeable, ToXContentFragment {

/**
* Constructs a VersionWrapper collecting all the minimum versions from the values of the map.
*
* @param compatibilityVersions A map of strings (typically node identifiers) and versions wrappers
* @return Minimum versions for the cluster
*/
public static CompatibilityVersions minimumVersions(Map<String, CompatibilityVersions> compatibilityVersions) {
return new CompatibilityVersions(
compatibilityVersions.values()
.stream()
.map(CompatibilityVersions::transportVersion)
.min(Comparator.naturalOrder())
// In practice transportVersions is always nonempty (except in tests) but use a conservative default anyway:
.orElse(TransportVersions.MINIMUM_COMPATIBLE)
);
public static CompatibilityVersions minimumVersions(Collection<CompatibilityVersions> compatibilityVersions) {
TransportVersion minimumTransport = compatibilityVersions.stream()
.map(CompatibilityVersions::transportVersion)
.min(Comparator.naturalOrder())
// In practice transportVersions is always nonempty (except in tests) but use a conservative default anyway:
.orElse(TransportVersions.MINIMUM_COMPATIBLE);

Map<String, SystemIndexDescriptor.MappingsVersion> minimumMappingsVersions = compatibilityVersions.stream()
.flatMap(mv -> mv.systemIndexMappingsVersion().entrySet().stream())
.collect(
Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (v1, v2) -> Stream.of(v1, v2).min(Comparator.naturalOrder()).get())
);

return new CompatibilityVersions(minimumTransport, minimumMappingsVersions);
}

public static void ensureVersionsCompatibility(CompatibilityVersions candidate, Collection<CompatibilityVersions> existing) {
CompatibilityVersions minimumClusterVersions = minimumVersions(existing);

if (candidate.transportVersion().before(minimumClusterVersions.transportVersion())) {
throw new IllegalStateException(
"node with transport version ["
+ candidate.transportVersion()
+ "] may not join a cluster with minimum transport version ["
+ minimumClusterVersions.transportVersion()
+ "]"
);
}

Map<String, SystemIndexDescriptor.MappingsVersion> candidateInvalid = new HashMap<>();
Map<String, SystemIndexDescriptor.MappingsVersion> existingInvalid = new HashMap<>();
for (Map.Entry<String, SystemIndexDescriptor.MappingsVersion> candidates : candidate.systemIndexMappingsVersion().entrySet()) {
var mapping = minimumClusterVersions.systemIndexMappingsVersion().get(candidates.getKey());
if (Objects.nonNull(mapping) && mapping.version() > candidates.getValue().version()) {
candidateInvalid.put(candidates.getKey(), candidates.getValue());
existingInvalid.put(candidates.getKey(), minimumClusterVersions.systemIndexMappingsVersion().get(candidates.getKey()));
}
}
if (candidateInvalid.isEmpty() == false) {
throw new IllegalStateException(
"node with system index mappings versions ["
+ candidateInvalid
+ "] may not join a cluster with minimum system index mappings versions ["
+ existingInvalid
+ "]"
);
}
}

public static CompatibilityVersions readVersion(StreamInput in) throws IOException {
return new CompatibilityVersions(TransportVersion.readVersion(in));
TransportVersion transportVersion = TransportVersion.readVersion(in);

Map<String, SystemIndexDescriptor.MappingsVersion> mappingsVersions = Map.of();
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_500_073)) {
mappingsVersions = in.readMap(SystemIndexDescriptor.MappingsVersion::new);
}

return new CompatibilityVersions(transportVersion, mappingsVersions);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
TransportVersion.writeVersion(this.transportVersion(), out);

if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_500_073)) {
out.writeMap(this.systemIndexMappingsVersion(), (o, v) -> v.writeTo(o));
}
}

/**
Expand All @@ -69,6 +128,7 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("transport_version", this.transportVersion().toString());
builder.field("mappings_versions", this.systemIndexMappingsVersion);
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.SystemIndexMetadataUpgradeService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.RegExp;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -649,7 +654,33 @@ public boolean isInternal() {
* The hash is a hash of the system index descriptor's mappings so that we can warn
* in case of inconsistencies across nodes.
*/
public record MappingsVersion(int version, int hash) {};
public record MappingsVersion(int version, int hash) implements Writeable, ToXContent, Comparable<MappingsVersion> {

public MappingsVersion(StreamInput in) throws IOException {
this(in.readVInt(), in.readInt());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(version);
out.writeInt(hash);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("version", version);
builder.field("hash", hash);
builder.endObject();
return builder;
}

@Override
public int compareTo(MappingsVersion o) {
Objects.requireNonNull(o, "Cannot compare null MappingsVersion");
return Integer.compare(this.version, o.version);
}
}

/**
* Provides a fluent API for building a {@link SystemIndexDescriptor}. Validation still happens in that class.
Expand Down
5 changes: 4 additions & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,10 @@ protected Node(
resourcesToClose.add(circuitBreakerService);
modules.add(new GatewayModule());

CompatibilityVersions compatibilityVersions = new CompatibilityVersions(TransportVersion.current());
CompatibilityVersions compatibilityVersions = new CompatibilityVersions(
TransportVersion.current(),
systemIndices.getMappingsVersions()
);
PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
modules.add(settingsModule);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public void testToXContentWithDeprecatedClusterState() {
"nodes_versions": [
{
"node_id": "node0",
"transport_version": "8000099"
"transport_version": "8000099",
"mappings_versions": {}
}
],
"metadata": {
Expand Down

0 comments on commit b5e06da

Please sign in to comment.