Skip to content

Commit

Permalink
Wrap transport version in cluster state (#99114)
Browse files Browse the repository at this point in the history
Cluster state currently holds a cluster minimum transport version and a map of nodes to transport versions. However, to determine node compatibility, we will need to account for more types of versions in cluster state than just the transport version (see #99076). Here we introduce a wrapper class to cluster state and update accessors and builders to use the new method. (I would have liked to re-use org.elasticsearch.cluster.node.VersionInformation, but that one holds IndexVersion rather than TransportVersion.

* Introduce CompatibilityVersions to cluster state class
  • Loading branch information
williamrandolph committed Sep 6, 2023
1 parent 8752d80 commit d32902c
Show file tree
Hide file tree
Showing 19 changed files with 321 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexVersion;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down Expand Up @@ -140,17 +141,17 @@ public void setUp() throws Exception {
}
RoutingTable routingTable = rb.build();
DiscoveryNodes.Builder nb = DiscoveryNodes.builder();
Map<String, TransportVersion> transportVersions = new HashMap<>();
Map<String, CompatibilityVersions> compatibilityVersions = new HashMap<>();
for (int i = 1; i <= numNodes; i++) {
String id = "node" + i;
nb.add(Allocators.newNode(id, Collections.singletonMap("tag", "tag_" + (i % numTags))));
transportVersions.put(id, TransportVersion.current());
compatibilityVersions.put(id, new CompatibilityVersions(TransportVersion.current()));
}
initialClusterState = ClusterState.builder(ClusterName.DEFAULT)
.metadata(metadata)
.routingTable(routingTable)
.nodes(nb)
.transportVersions(transportVersions)
.compatibilityVersions(compatibilityVersions)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,5 +156,5 @@ org.elasticsearch.cluster.service.ClusterService#submitUnbatchedStateUpdateTask(
org.elasticsearch.cluster.ClusterStateTaskExecutor$TaskContext#success(java.util.function.Consumer)
org.elasticsearch.cluster.ClusterStateTaskExecutor$TaskContext#success(java.util.function.Consumer, org.elasticsearch.cluster.ClusterStateAckListener)

@defaultMessage ClusterState#transportVersions are for internal use only. Use ClusterState#getMinTransportVersion or a different version. See TransportVersion javadocs for more info.
org.elasticsearch.cluster.ClusterState#transportVersions()
@defaultMessage ClusterState#compatibilityVersions are for internal use only. Use ClusterState#getMinVersions or a different version. See TransportVersion javadocs for more info.
org.elasticsearch.cluster.ClusterState#compatibilityVersions()
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.cluster;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand All @@ -31,6 +30,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand Down Expand Up @@ -227,29 +227,29 @@ private DiscoveryNode randomNode(String nodeId) {
*/
private ClusterState.Builder randomNodes(ClusterState clusterState) {
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes());
Map<String, TransportVersion> transports = new HashMap<>(clusterState.transportVersions());
Map<String, CompatibilityVersions> versions = new HashMap<>(clusterState.compatibilityVersions());
List<String> nodeIds = randomSubsetOf(
randomInt(clusterState.nodes().getNodes().size() - 1),
clusterState.nodes().getNodes().keySet().toArray(new String[0])
);
for (String nodeId : nodeIds) {
if (nodeId.startsWith("node-")) {
nodes.remove(nodeId);
transports.remove(nodeId);
versions.remove(nodeId);
if (randomBoolean()) {
nodes.add(randomNode(nodeId));
transports.put(nodeId, TransportVersionUtils.randomVersion(random()));
versions.put(nodeId, new CompatibilityVersions(TransportVersionUtils.randomVersion(random())));
}
}
}
int additionalNodeCount = randomIntBetween(1, 20);
for (int i = 0; i < additionalNodeCount; i++) {
String id = "node-" + randomAlphaOfLength(10);
nodes.add(randomNode(id));
transports.put(id, TransportVersionUtils.randomVersion(random()));
versions.put(id, new CompatibilityVersions(TransportVersionUtils.randomVersion(random())));
}

return ClusterState.builder(clusterState).nodes(nodes).transportVersions(transports);
return ClusterState.builder(clusterState).nodes(nodes).compatibilityVersions(versions);
}

/**
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
exports org.elasticsearch.cluster.routing.allocation.command;
exports org.elasticsearch.cluster.routing.allocation.decider;
exports org.elasticsearch.cluster.service;
exports org.elasticsearch.cluster.version;
exports org.elasticsearch.common;
exports org.elasticsearch.common.blobstore;
exports org.elasticsearch.common.blobstore.fs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
Expand All @@ -26,6 +25,7 @@
import org.elasticsearch.cluster.metadata.Metadata.Custom;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -138,9 +138,9 @@ public void onTimeout(TimeValue timeout) {
}
}

@SuppressForbidden(reason = "exposing ClusterState#transportVersions requires reading them")
private static Map<String, TransportVersion> getTransportVersions(ClusterState clusterState) {
return clusterState.transportVersions();
@SuppressForbidden(reason = "exposing ClusterState#compatibilityVersions requires reading them")
private static Map<String, CompatibilityVersions> getCompatibilityVersions(ClusterState clusterState) {
return clusterState.compatibilityVersions();
}

private ClusterStateResponse buildResponse(final ClusterStateRequest request, final ClusterState currentState) {
Expand All @@ -151,7 +151,7 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, fi

if (request.nodes()) {
builder.nodes(currentState.nodes());
builder.transportVersions(getTransportVersions(currentState));
builder.compatibilityVersions(getCompatibilityVersions(currentState));
}
if (request.routingTable()) {
if (request.indices().length > 0) {
Expand Down

0 comments on commit d32902c

Please sign in to comment.