Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ public void setUp() throws Exception {
for (int i = 1; i <= numNodes; i++) {
String id = "node" + i;
nb.add(Allocators.newNode(id, Collections.singletonMap("tag", "tag_" + (i % numTags))));
compatibilityVersions.put(id, new CompatibilityVersions(TransportVersion.current()));
// system index mappings versions not needed here, so we use Map.of()
compatibilityVersions.put(id, new CompatibilityVersions(TransportVersion.current(), Map.of()));
}
initialClusterState = ClusterState.builder(ClusterName.DEFAULT)
.metadata(metadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.cluster.version.CompatibilityVersionsUtils;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand Down Expand Up @@ -238,15 +239,15 @@ private ClusterState.Builder randomNodes(ClusterState clusterState) {
versions.remove(nodeId);
if (randomBoolean()) {
nodes.add(randomNode(nodeId));
versions.put(nodeId, new CompatibilityVersions(TransportVersionUtils.randomVersion(random())));
versions.put(nodeId, CompatibilityVersionsUtils.fakeSystemIndicesRandom());
}
}
}
int additionalNodeCount = randomIntBetween(1, 20);
for (int i = 0; i < additionalNodeCount; i++) {
String id = "node-" + randomAlphaOfLength(10);
nodes.add(randomNode(id));
versions.put(id, new CompatibilityVersions(TransportVersionUtils.randomVersion(random())));
versions.put(id, CompatibilityVersionsUtils.fakeSystemIndicesRandom());
}

return ClusterState.builder(clusterState).nodes(nodes).compatibilityVersions(versions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ static TransportVersion def(int id, String uniqueId) {
public static final TransportVersion V_8_500_070 = def(8_500_070, "6BADC9CD-3C9D-4381-8BD9-B305CAA93F86");
public static final TransportVersion V_8_500_071 = def(8_500_071, "a86dfc08-3026-4f01-90ef-6d6de003e217");
public static final TransportVersion V_8_500_072 = def(8_500_072, "e2df7d80-7b74-4afd-9734-aee0fc256025");
public static final TransportVersion V_8_500_073 = def(8_500_073, "9128e16a-e4f7-41c4-b04f-842955bfc1b4");
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
16 changes: 11 additions & 5 deletions server/src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContent;

Expand Down Expand Up @@ -133,12 +134,12 @@ default boolean isPrivate() {
new DiffableUtils.NonDiffableValueSerializer<>() {
@Override
public void write(CompatibilityVersions value, StreamOutput out) throws IOException {
TransportVersion.writeVersion(value.transportVersion(), out);
value.writeTo(out);
}

@Override
public CompatibilityVersions read(StreamInput in, String key) throws IOException {
return new CompatibilityVersions(TransportVersion.readVersion(in));
return CompatibilityVersions.readVersion(in);
}
};

Expand Down Expand Up @@ -222,8 +223,8 @@ public ClusterState(
this.routingNodes = routingNodes;
assert assertConsistentRoutingNodes(routingTable, nodes, routingNodes);
this.minVersions = blocks.hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
? new CompatibilityVersions(TransportVersions.MINIMUM_COMPATIBLE)
: CompatibilityVersions.minimumVersions(compatibilityVersions);
? new CompatibilityVersions(TransportVersions.MINIMUM_COMPATIBLE, Map.of()) // empty map because cluster state is unknown
: CompatibilityVersions.minimumVersions(compatibilityVersions.values());
}

private static boolean assertConsistentRoutingNodes(
Expand Down Expand Up @@ -287,6 +288,10 @@ public TransportVersion getMinTransportVersion() {
return this.minVersions.transportVersion();
}

public Map<String, SystemIndexDescriptor.MappingsVersion> getMinSystemIndexMappingVersions() {
return this.minVersions.systemIndexMappingsVersion();
}

public Metadata metadata() {
return this.metadata;
}
Expand Down Expand Up @@ -773,7 +778,8 @@ public DiscoveryNodes nodes() {
}

public Builder putTransportVersion(String nodeId, TransportVersion transportVersion) {
compatibilityVersions.put(nodeId, new CompatibilityVersions(Objects.requireNonNull(transportVersion, nodeId)));
// TODO[wrb]: system index mappings versions will be added in a followup
compatibilityVersions.put(nodeId, new CompatibilityVersions(Objects.requireNonNull(transportVersion, nodeId), Map.of()));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

Expand Down Expand Up @@ -66,7 +67,8 @@ public JoinRequest(StreamInput in) throws IOException {
compatibilityVersions = CompatibilityVersions.readVersion(in);
} else {
// there's a 1-1 mapping from Version to TransportVersion before 8.8.0
compatibilityVersions = new CompatibilityVersions(TransportVersion.fromId(sourceNode.getVersion().id));
// no known mapping versions here
compatibilityVersions = new CompatibilityVersions(TransportVersion.fromId(sourceNode.getVersion().id), Map.of());
}
minimumTerm = in.readLong();
optionalJoin = Optional.ofNullable(in.readOptionalWriteable(Join::new));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public ClusterState execute(BatchExecutionContext<JoinTask> batchExecutionContex
CompatibilityVersions compatibilityVersions = nodeJoinTask.compatibilityVersions();
if (enforceVersionBarrier) {
ensureVersionBarrier(node.getVersion(), minClusterNodeVersion);
ensureTransportVersionBarrier(compatibilityVersions, compatibilityVersionsMap.values());
CompatibilityVersions.ensureVersionsCompatibility(compatibilityVersions, compatibilityVersionsMap.values());
}
blockForbiddenVersions(compatibilityVersions.transportVersion());
ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,49 +13,108 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Wraps component version numbers for cluster state
*
* <p>Cluster state will need to carry version information for different independently versioned components.
* This wrapper lets us wrap these versions one level below {@link org.elasticsearch.cluster.ClusterState}.
* It's similar to {@link org.elasticsearch.cluster.node.VersionInformation}, but this class is meant to
* be constructed during node startup and hold values from plugins as well.
* This wrapper lets us wrap these versions one level below {@link org.elasticsearch.cluster.ClusterState}. It's similar to
* {@link org.elasticsearch.cluster.node.VersionInformation}, but this class is meant to be constructed during node startup and hold values
* from plugins as well.
*
* @param transportVersion A transport version, usually a minimum compatible one for a node.
* @param transportVersion A transport version, usually a minimum compatible one for a node.
* @param systemIndexMappingsVersion A map of system index names to versions for their mappings.
*/
public record CompatibilityVersions(TransportVersion transportVersion) implements Writeable, ToXContentFragment {
public record CompatibilityVersions(
TransportVersion transportVersion,
Map<String, org.elasticsearch.indices.SystemIndexDescriptor.MappingsVersion> systemIndexMappingsVersion
) implements Writeable, ToXContentFragment {

/**
* Constructs a VersionWrapper collecting all the minimum versions from the values of the map.
*
* @param compatibilityVersions A map of strings (typically node identifiers) and versions wrappers
* @return Minimum versions for the cluster
*/
public static CompatibilityVersions minimumVersions(Map<String, CompatibilityVersions> compatibilityVersions) {
return new CompatibilityVersions(
compatibilityVersions.values()
.stream()
.map(CompatibilityVersions::transportVersion)
.min(Comparator.naturalOrder())
// In practice transportVersions is always nonempty (except in tests) but use a conservative default anyway:
.orElse(TransportVersions.MINIMUM_COMPATIBLE)
);
public static CompatibilityVersions minimumVersions(Collection<CompatibilityVersions> compatibilityVersions) {
TransportVersion minimumTransport = compatibilityVersions.stream()
.map(CompatibilityVersions::transportVersion)
.min(Comparator.naturalOrder())
// In practice transportVersions is always nonempty (except in tests) but use a conservative default anyway:
.orElse(TransportVersions.MINIMUM_COMPATIBLE);

Map<String, SystemIndexDescriptor.MappingsVersion> minimumMappingsVersions = compatibilityVersions.stream()
.flatMap(mv -> mv.systemIndexMappingsVersion().entrySet().stream())
.collect(
Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (v1, v2) -> Stream.of(v1, v2).min(Comparator.naturalOrder()).get())
);

return new CompatibilityVersions(minimumTransport, minimumMappingsVersions);
}

public static void ensureVersionsCompatibility(CompatibilityVersions candidate, Collection<CompatibilityVersions> existing) {
CompatibilityVersions minimumClusterVersions = minimumVersions(existing);

if (candidate.transportVersion().before(minimumClusterVersions.transportVersion())) {
throw new IllegalStateException(
"node with transport version ["
+ candidate.transportVersion()
+ "] may not join a cluster with minimum transport version ["
+ minimumClusterVersions.transportVersion()
+ "]"
);
}

Map<String, SystemIndexDescriptor.MappingsVersion> candidateInvalid = new HashMap<>();
Map<String, SystemIndexDescriptor.MappingsVersion> existingInvalid = new HashMap<>();
for (Map.Entry<String, SystemIndexDescriptor.MappingsVersion> candidates : candidate.systemIndexMappingsVersion().entrySet()) {
var mapping = minimumClusterVersions.systemIndexMappingsVersion().get(candidates.getKey());
if (Objects.nonNull(mapping) && mapping.version() > candidates.getValue().version()) {
candidateInvalid.put(candidates.getKey(), candidates.getValue());
existingInvalid.put(candidates.getKey(), minimumClusterVersions.systemIndexMappingsVersion().get(candidates.getKey()));
}
}
if (candidateInvalid.isEmpty() == false) {
throw new IllegalStateException(
"node with system index mappings versions ["
+ candidateInvalid
+ "] may not join a cluster with minimum system index mappings versions ["
+ existingInvalid
+ "]"
);
}
}

public static CompatibilityVersions readVersion(StreamInput in) throws IOException {
return new CompatibilityVersions(TransportVersion.readVersion(in));
TransportVersion transportVersion = TransportVersion.readVersion(in);

Map<String, SystemIndexDescriptor.MappingsVersion> mappingsVersions = Map.of();
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_500_073)) {
mappingsVersions = in.readMap(SystemIndexDescriptor.MappingsVersion::new);
}

return new CompatibilityVersions(transportVersion, mappingsVersions);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
TransportVersion.writeVersion(this.transportVersion(), out);

if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_500_073)) {
out.writeMap(this.systemIndexMappingsVersion(), (o, v) -> v.writeTo(o));
}
}

/**
Expand All @@ -69,6 +128,7 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("transport_version", this.transportVersion().toString());
builder.field("mappings_versions", this.systemIndexMappingsVersion);
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.SystemIndexMetadataUpgradeService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.RegExp;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -649,7 +654,33 @@ public boolean isInternal() {
* The hash is a hash of the system index descriptor's mappings so that we can warn
* in case of inconsistencies across nodes.
*/
public record MappingsVersion(int version, int hash) {};
public record MappingsVersion(int version, int hash) implements Writeable, ToXContent, Comparable<MappingsVersion> {

public MappingsVersion(StreamInput in) throws IOException {
this(in.readVInt(), in.readInt());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(version);
out.writeInt(hash);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("version", version);
builder.field("hash", hash);
builder.endObject();
return builder;
}

@Override
public int compareTo(MappingsVersion o) {
Objects.requireNonNull(o, "Cannot compare null MappingsVersion");
return Integer.compare(this.version, o.version);
}
}

/**
* Provides a fluent API for building a {@link SystemIndexDescriptor}. Validation still happens in that class.
Expand Down
5 changes: 4 additions & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,10 @@ protected Node(
resourcesToClose.add(circuitBreakerService);
modules.add(new GatewayModule());

CompatibilityVersions compatibilityVersions = new CompatibilityVersions(TransportVersion.current());
CompatibilityVersions compatibilityVersions = new CompatibilityVersions(
TransportVersion.current(),
systemIndices.getMappingsVersions()
);
PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
modules.add(settingsModule);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public void testToXContentWithDeprecatedClusterState() {
"nodes_versions": [
{
"node_id": "node0",
"transport_version": "8000099"
"transport_version": "8000099",
"mappings_versions": {}
}
],
"metadata": {
Expand Down
Loading