Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void setUp() throws Exception {
.metadata(metadata)
.routingTable(routingTable)
.nodes(nb)
.compatibilityVersions(compatibilityVersions)
.nodeIdsToCompatibilityVersions(compatibilityVersions)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.elasticsearch.snapshots.SnapshotInfoTestUtils;
import org.elasticsearch.snapshots.SnapshotsInProgressSerializationTests;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.TransportVersionUtils;
import org.elasticsearch.test.VersionUtils;

import java.util.Collections;
Expand Down Expand Up @@ -80,8 +79,8 @@ public void testClusterStateDiffSerialization() throws Exception {
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(masterNode).add(otherNode).localNodeId(masterNode.getId()).build();
ClusterState clusterState = ClusterState.builder(new ClusterName("test"))
.nodes(discoveryNodes)
.putTransportVersion("master", TransportVersionUtils.randomVersion(random()))
.putTransportVersion("other", TransportVersionUtils.randomVersion(random()))
.putCompatibilityVersions("master", CompatibilityVersionsUtils.fakeSystemIndicesRandom())
.putCompatibilityVersions("other", CompatibilityVersionsUtils.fakeSystemIndicesRandom())
.build();
ClusterState clusterStateFromDiffs = ClusterState.Builder.fromBytes(
ClusterState.Builder.toBytes(clusterState),
Expand Down Expand Up @@ -250,7 +249,7 @@ private ClusterState.Builder randomNodes(ClusterState clusterState) {
versions.put(id, CompatibilityVersionsUtils.fakeSystemIndicesRandom());
}

return ClusterState.builder(clusterState).nodes(nodes).compatibilityVersions(versions);
return ClusterState.builder(clusterState).nodes(nodes).nodeIdsToCompatibilityVersions(versions);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, fi

if (request.nodes()) {
builder.nodes(currentState.nodes());
builder.compatibilityVersions(getCompatibilityVersions(currentState));
builder.nodeIdsToCompatibilityVersions(getCompatibilityVersions(currentState));
}
if (request.routingTable()) {
if (request.indices().length > 0) {
Expand Down
42 changes: 36 additions & 6 deletions server/src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -777,13 +777,35 @@ public DiscoveryNodes nodes() {
return nodes;
}

// Deprecate to keep downstream projects compiling
@Deprecated(forRemoval = true)
public Builder putTransportVersion(String nodeId, TransportVersion transportVersion) {
// TODO[wrb]: system index mappings versions will be added in a followup
compatibilityVersions.put(nodeId, new CompatibilityVersions(Objects.requireNonNull(transportVersion, nodeId), Map.of()));
return putCompatibilityVersions(nodeId, transportVersion, Map.of());
}

public Builder putCompatibilityVersions(
String nodeId,
TransportVersion transportVersion,
Map<String, SystemIndexDescriptor.MappingsVersion> systemIndexMappingsVersions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: it seems to me that this method is always called with an empty map. Maybe we could remove the parameter and give it a clearer name, like putCompatibilityVersionsWithNoSystemIndexMapping (I know, it's long, maybe there is something better). But something that makes the caller aware they are skipping something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API I'm aiming for here is:

  1. a method that puts a CompatibilityVersions object to the map, keyed to node ID
  2. a method that sets the entire map of CompatibilityVersions for the builder
  3. a convenience method that takes all the kinds of versions and constructs a CompatibilityVersions object. As we add new kinds of versions, we'll add arguments to this method.

Since "CompatibilityVersions" is already plural, it's a little awkward to name the method that takes the whole map. I think "nodeIdToCompatibityVersionsMap" might be a more informative name.

But I think I want to keep the three-arg compatibility versions method as-is. Eventually we'll add more things to CompatibilityVersions, and I don't want to add methods to handle subsets of available types of versions.

) {
return putCompatibilityVersions(
nodeId,
new CompatibilityVersions(Objects.requireNonNull(transportVersion, nodeId), systemIndexMappingsVersions)
);
}

public Builder putCompatibilityVersions(String nodeId, CompatibilityVersions versions) {
compatibilityVersions.put(nodeId, versions);
return this;
}

// Deprecate to keep downstream projects compiling
@Deprecated(forRemoval = true)
public Builder compatibilityVersions(Map<String, CompatibilityVersions> versions) {
return nodeIdsToCompatibilityVersions(versions);
}

public Builder nodeIdsToCompatibilityVersions(Map<String, CompatibilityVersions> versions) {
versions.forEach((key, value) -> Objects.requireNonNull(value, key));
// remove all versions not present in the new map
this.compatibilityVersions.keySet().retainAll(versions.keySet());
Expand Down Expand Up @@ -923,11 +945,15 @@ public static ClusterState readFrom(StreamInput in, DiscoveryNode localNode) thr
builder.routingTable = RoutingTable.readFrom(in);
builder.nodes = DiscoveryNodes.readFrom(in, localNode);
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) {
builder.compatibilityVersions(in.readMap(CompatibilityVersions::readVersion));
builder.nodeIdsToCompatibilityVersions(in.readMap(CompatibilityVersions::readVersion));
} else {
// this clusterstate is from a pre-8.8.0 node
// infer the versions from discoverynodes for now
builder.nodes().getNodes().values().forEach(n -> builder.putTransportVersion(n.getId(), inferTransportVersion(n)));
// leave mappings versions empty
builder.nodes()
.getNodes()
.values()
.forEach(n -> builder.putCompatibilityVersions(n.getId(), inferTransportVersion(n), Map.of()));
}
builder.blocks = ClusterBlocks.readFrom(in);
int customSize = in.readVInt();
Expand Down Expand Up @@ -1076,10 +1102,14 @@ public ClusterState apply(ClusterState state) {
builder.routingTable(routingTable.apply(state.routingTable));
builder.nodes(nodes.apply(state.nodes));
if (versions != null) {
builder.compatibilityVersions(this.versions.apply(state.compatibilityVersions));
builder.nodeIdsToCompatibilityVersions(this.versions.apply(state.compatibilityVersions));
} else {
// infer the versions from discoverynodes for now
builder.nodes().getNodes().values().forEach(n -> builder.putTransportVersion(n.getId(), inferTransportVersion(n)));
// leave mappings versions empty
builder.nodes()
.getNodes()
.values()
.forEach(n -> builder.putCompatibilityVersions(n.getId(), inferTransportVersion(n), Map.of()));
}
builder.metadata(metadata.apply(state.metadata));
builder.blocks(blocks.apply(state.blocks));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.ListenableActionFuture;
Expand Down Expand Up @@ -175,6 +174,7 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
private final LagDetector lagDetector;
private final ClusterFormationFailureHelper clusterFormationFailureHelper;
private final JoinReasonService joinReasonService;
private final CompatibilityVersions compatibilityVersions;

private Mode mode;
private Optional<DiscoveryNode> lastKnownLeader;
Expand Down Expand Up @@ -317,6 +317,7 @@ public Coordinator(
this.peerFinderListeners = new CopyOnWriteArrayList<>();
this.peerFinderListeners.add(clusterBootstrapService);
this.leaderHeartbeatService = leaderHeartbeatService;
this.compatibilityVersions = compatibilityVersions;
}

/**
Expand Down Expand Up @@ -1064,7 +1065,7 @@ protected void doStart() {
.addGlobalBlock(noMasterBlockService.getNoMasterBlock())
)
.nodes(DiscoveryNodes.builder().add(getLocalNode()).localNodeId(getLocalNode().getId()))
.putTransportVersion(getLocalNode().getId(), TransportVersion.current())
.putCompatibilityVersions(getLocalNode().getId(), compatibilityVersions)
.metadata(metadata)
.build();
applierState = initialState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public ClusterState execute(BatchExecutionContext<JoinTask> batchExecutionContex
}

final ClusterState clusterStateWithNewNodesAndDesiredNodes = DesiredNodes.updateDesiredNodesStatusIfNeeded(
newState.nodes(nodesBuilder).compatibilityVersions(compatibilityVersionsMap).build()
newState.nodes(nodesBuilder).nodeIdsToCompatibilityVersions(compatibilityVersionsMap).build()
);
final ClusterState updatedState = allocationService.adaptAutoExpandReplicas(clusterStateWithNewNodesAndDesiredNodes);
assert enforceVersionBarrier == false
Expand Down Expand Up @@ -295,7 +295,7 @@ protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(
// or removed by us above
ClusterState tmpState = ClusterState.builder(currentState)
.nodes(nodesBuilder)
.compatibilityVersions(compatibilityVersions)
.nodeIdsToCompatibilityVersions(compatibilityVersions)
.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(NoMasterBlockService.NO_MASTER_BLOCK_ID))
.metadata(
Metadata.builder(currentState.metadata())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ protected ClusterState remainingNodesClusterState(
DiscoveryNodes.Builder remainingNodesBuilder,
Map<String, CompatibilityVersions> compatibilityVersions
) {
return ClusterState.builder(currentState).nodes(remainingNodesBuilder).compatibilityVersions(compatibilityVersions).build();
return ClusterState.builder(currentState)
.nodes(remainingNodesBuilder)
.nodeIdsToCompatibilityVersions(compatibilityVersions)
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public ClusterState execute(BatchExecutionContext<NodeTransportVersionTask> cont
assert (recordedTv != null) || (context.initialState().nodes().nodeExists(e.getKey()) == false)
: "Node " + e.getKey() + " is in the cluster but does not have an associated transport version recorded";
if (Objects.equals(recordedTv, INFERRED_TRANSPORT_VERSION)) {
builder.putTransportVersion(e.getKey(), e.getValue());
builder.putCompatibilityVersions(e.getKey(), e.getValue(), Map.of()); // unknown mappings versions
modified = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public static ClusterState setLocalNode(
) {
return ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).build())
.compatibilityVersions(Map.of(localNode.getId(), compatibilityVersions))
.putCompatibilityVersions(localNode.getId(), compatibilityVersions)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ public class SystemIndices {
new Feature(SYNONYMS_FEATURE_NAME, "Manages synonyms", List.of(SYNONYMS_DESCRIPTOR))
).collect(Collectors.toUnmodifiableMap(Feature::getName, Function.identity()));

public static final Map<String, SystemIndexDescriptor.MappingsVersion> SERVER_SYSTEM_MAPPINGS_VERSIONS =
SERVER_SYSTEM_FEATURE_DESCRIPTORS.values()
.stream()
.flatMap(feature -> feature.getIndexDescriptors().stream())
.filter(SystemIndexDescriptor::isAutomaticallyManaged)
.collect(Collectors.toMap(SystemIndexDescriptor::getIndexPattern, SystemIndexDescriptor::getMappingsVersion));

/**
* The node's full list of system features is stored here. The map is keyed
* on the value of {@link Feature#getName()}, and is used for fast lookup of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.ToXContent;
Expand Down Expand Up @@ -130,7 +131,12 @@ public void testToXContentWithDeprecatedClusterState() {
{
"node_id": "node0",
"transport_version": "8000099",
"mappings_versions": {}
"mappings_versions": {
".system-index": {
"version": 1,
"hash": 0
}
}
}
],
"metadata": {
Expand Down Expand Up @@ -323,7 +329,11 @@ private static ClusterState createClusterState() {
var node0 = DiscoveryNodeUtils.create("node0", new TransportAddress(TransportAddress.META_ADDRESS, 9000));
return ClusterState.builder(new ClusterName("test"))
.nodes(new DiscoveryNodes.Builder().add(node0).masterNodeId(node0.getId()).build())
.putTransportVersion(node0.getId(), TransportVersions.V_8_0_0)
.putCompatibilityVersions(
node0.getId(),
TransportVersions.V_8_0_0,
Map.of(".system-index", new SystemIndexDescriptor.MappingsVersion(1, 0))
)
.metadata(
Metadata.builder()
.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TransportVersionUtils;
Expand Down Expand Up @@ -1049,7 +1050,7 @@ private ClusterState buildClusterState() throws IOException {
.add(DiscoveryNodeUtils.create("nodeId1", new TransportAddress(InetAddress.getByName("127.0.0.1"), 111)))
.build()
)
.compatibilityVersions(
.nodeIdsToCompatibilityVersions(
Map.of(
"nodeId1",
new CompatibilityVersions(TransportVersion.current(), Map.of(".tasks", new SystemIndexDescriptor.MappingsVersion(1, 1)))
Expand Down Expand Up @@ -1159,7 +1160,7 @@ public void testGetMinTransportVersion() throws IOException {

for (int i = 0; i < numNodes; i++) {
TransportVersion tv = TransportVersionUtils.randomVersion();
builder.putTransportVersion("nodeTv" + i, tv);
builder.putCompatibilityVersions("nodeTv" + i, tv, SystemIndices.SERVER_SYSTEM_MAPPINGS_VERSIONS);
minVersion = Collections.min(List.of(minVersion, tv));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void testNothingFixedWhenNothingToInfer() {

ClusterState testState = ClusterState.builder(ClusterState.EMPTY_STATE)
.nodes(node(Version.V_8_8_0))
.compatibilityVersions(versions(new CompatibilityVersions(TransportVersions.V_8_8_0, Map.of())))
.nodeIdsToCompatibilityVersions(versions(new CompatibilityVersions(TransportVersions.V_8_8_0, Map.of())))
.build();

TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null);
Expand All @@ -128,7 +128,7 @@ public void testNothingFixedWhenOnNextVersion() {

ClusterState testState = ClusterState.builder(ClusterState.EMPTY_STATE)
.nodes(node(NEXT_VERSION))
.compatibilityVersions(versions(new CompatibilityVersions(NEXT_TRANSPORT_VERSION, Map.of())))
.nodeIdsToCompatibilityVersions(versions(new CompatibilityVersions(NEXT_TRANSPORT_VERSION, Map.of())))
.build();

TransportVersionsFixupListener listeners = new TransportVersionsFixupListener(taskQueue, client, null, null);
Expand All @@ -143,7 +143,7 @@ public void testNothingFixedWhenOnPreviousVersion() {

ClusterState testState = ClusterState.builder(ClusterState.EMPTY_STATE)
.nodes(node(Version.V_8_7_0, Version.V_8_8_0))
.compatibilityVersions(
.nodeIdsToCompatibilityVersions(
Maps.transformValues(
versions(TransportVersions.V_8_7_0, TransportVersions.V_8_8_0),
transportVersion -> new CompatibilityVersions(transportVersion, Map.of())
Expand All @@ -164,7 +164,7 @@ public void testVersionsAreFixed() {

ClusterState testState = ClusterState.builder(ClusterState.EMPTY_STATE)
.nodes(node(NEXT_VERSION, NEXT_VERSION, NEXT_VERSION))
.compatibilityVersions(
.nodeIdsToCompatibilityVersions(
Maps.transformValues(
versions(NEXT_TRANSPORT_VERSION, TransportVersions.V_8_8_0, TransportVersions.V_8_8_0),
transportVersion -> new CompatibilityVersions(transportVersion, Map.of())
Expand Down Expand Up @@ -193,7 +193,7 @@ public void testConcurrentChangesDoNotOverlap() {

ClusterState testState1 = ClusterState.builder(ClusterState.EMPTY_STATE)
.nodes(node(NEXT_VERSION, NEXT_VERSION, NEXT_VERSION))
.compatibilityVersions(
.nodeIdsToCompatibilityVersions(
Maps.transformValues(
versions(NEXT_TRANSPORT_VERSION, TransportVersions.V_8_8_0, TransportVersions.V_8_8_0),
transportVersion -> new CompatibilityVersions(transportVersion, Map.of())
Expand All @@ -208,7 +208,7 @@ public void testConcurrentChangesDoNotOverlap() {

ClusterState testState2 = ClusterState.builder(ClusterState.EMPTY_STATE)
.nodes(node(NEXT_VERSION, NEXT_VERSION, NEXT_VERSION))
.compatibilityVersions(
.nodeIdsToCompatibilityVersions(
Maps.transformValues(
versions(NEXT_TRANSPORT_VERSION, NEXT_TRANSPORT_VERSION, TransportVersions.V_8_8_0),
transportVersion -> new CompatibilityVersions(transportVersion, Map.of())
Expand All @@ -229,7 +229,7 @@ public void testFailedRequestsAreRetried() {

ClusterState testState1 = ClusterState.builder(ClusterState.EMPTY_STATE)
.nodes(node(NEXT_VERSION, NEXT_VERSION, NEXT_VERSION))
.compatibilityVersions(
.nodeIdsToCompatibilityVersions(
Maps.transformValues(
versions(NEXT_TRANSPORT_VERSION, TransportVersions.V_8_8_0, TransportVersions.V_8_8_0),
transportVersion -> new CompatibilityVersions(transportVersion, Map.of())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;

import java.util.ArrayList;
Expand Down Expand Up @@ -535,7 +536,7 @@ public static ClusterState state(
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.nodes(discoBuilder);
for (DiscoveryNode node : allNodes) {
state.putTransportVersion(node.getId(), transportVersion);
state.putCompatibilityVersions(node.getId(), transportVersion, SystemIndices.SERVER_SYSTEM_MAPPINGS_VERSIONS);
}

Metadata.Builder metadataBuilder = Metadata.builder().generateClusterUuidIfNeeded();
Expand Down
Loading