Skip to content

Commit

Permalink
Keep track of desired nodes cluster membership (#84165)
Browse files Browse the repository at this point in the history
This commit adds tracking for desired nodes cluster membership.

When desired nodes are updated they are matched against the current
cluster members. Additionally when a node joins the cluster the
desired nodes cluster membership is updated.
  • Loading branch information
fcofdez committed May 3, 2022
1 parent 2f14148 commit ce9819f
Show file tree
Hide file tree
Showing 13 changed files with 495 additions and 5 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/84165.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 84165
summary: Keep track of desired nodes cluster membership
area: Distributed
type: enhancement
issues: []
1 change: 1 addition & 0 deletions docs/reference/indices/shard-stores.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ The API returns the following response:
"name": "node_t0",
"ephemeral_id" : "9NlXRFGCT1m8tkvYCMK-8A",
"transport_address": "local[1]",
"external_id": "node_t0",
"attributes": {},
"roles": [...]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,5 @@ public String toString() {
+ version
+ ']';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -34,7 +35,7 @@
import static java.lang.String.format;
import static org.elasticsearch.node.Node.NODE_EXTERNAL_ID_SETTING;

public class DesiredNodes implements Writeable, ToXContentObject {
public class DesiredNodes implements Writeable, ToXContentObject, Iterable<DesiredNode> {

private static final ParseField HISTORY_ID_FIELD = new ParseField("history_id");
private static final ParseField VERSION_FIELD = new ParseField("version");
Expand Down Expand Up @@ -160,6 +161,11 @@ public List<DesiredNode> nodes() {
return List.copyOf(nodes.values());
}

@Override
public Iterator<DesiredNode> iterator() {
return nodes.values().iterator();
}

@Nullable
public DesiredNode find(String externalId) {
return nodes.get(externalId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;

import java.util.HashSet;
import java.util.Set;

public class DesiredNodesMembershipService implements ClusterStateListener {
private final Set<DesiredNode> members;
private String latestHistoryId = null;

DesiredNodesMembershipService() {
this.members = new HashSet<>();
}

public static DesiredNodesMembershipService create(ClusterService clusterService) {
var tracker = new DesiredNodesMembershipService();
clusterService.addListener(tracker);
return tracker;
}

@Override
public synchronized void clusterChanged(ClusterChangedEvent event) {
final var clusterState = event.state();
final var desiredNodes = DesiredNodes.latestFromClusterState(clusterState);
if (desiredNodes == null) {
members.clear();
return;
}

if (event.nodesChanged()) {
final var nodesDelta = event.nodesDelta();

for (DiscoveryNode addedNode : nodesDelta.addedNodes()) {
final var desiredNode = desiredNodes.find(addedNode.getExternalId());
if (desiredNode != null) {
members.add(desiredNode);
}
}
} else if (event.changedCustomMetadataSet().contains(DesiredNodesMetadata.TYPE) || latestHistoryId == null) {
if (desiredNodes.historyID().equals(latestHistoryId) == false) {
members.clear();
}
latestHistoryId = desiredNodes.historyID();

final Set<DesiredNode> removedDesiredNodes = new HashSet<>(members);
desiredNodes.nodes().forEach(removedDesiredNodes::remove);

for (DiscoveryNode node : clusterState.nodes()) {
final var desiredNode = desiredNodes.find(node.getExternalId());
if (desiredNode != null) {
members.add(desiredNode);
}
}

members.removeAll(removedDesiredNodes);
}
}

public synchronized boolean isMember(DesiredNode desiredNode) {
return members.contains(desiredNode);
}

// visible for testing
synchronized int trackedMembersCount() {
return members.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

Expand Down Expand Up @@ -68,7 +69,7 @@ public static DesiredNodesMetadata fromXContent(XContentParser parser) throws IO

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(LATEST_FIELD.getPreferredName(), latestDesiredNodes);
builder.field(LATEST_FIELD.getPreferredName(), (ToXContent) latestDesiredNodes);
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
Expand All @@ -37,6 +38,7 @@
public class DiscoveryNode implements Writeable, ToXContentFragment {

static final String COORDINATING_ONLY = "coordinating_only";
public static final Version EXTERNAL_ID_VERSION = Version.V_8_3_0;

public static boolean hasRole(final Settings settings, final DiscoveryNodeRole role) {
// this method can be called before the o.e.n.NodeRoleSettings.NODE_ROLES_SETTING is initialized
Expand Down Expand Up @@ -109,6 +111,7 @@ public static boolean isDedicatedFrozenNode(final Settings settings) {
private final Map<String, String> attributes;
private final Version version;
private final SortedSet<DiscoveryNodeRole> roles;
private final String externalId;

/**
* Creates a new {@link DiscoveryNode}
Expand Down Expand Up @@ -189,6 +192,46 @@ public DiscoveryNode(
);
}

/**
* Creates a new {@link DiscoveryNode}
* <p>
* <b>Note:</b> if the version of the node is unknown {@link Version#minimumCompatibilityVersion()} should be used for the current
* version. it corresponds to the minimum version this elasticsearch version can communicate with. If a higher version is used
* the node might not be able to communicate with the remote node. After initial handshakes node versions will be discovered
* and updated.
* </p>
*
* @param nodeName the nodes name
* @param nodeId the nodes unique persistent id. An ephemeral id will be auto generated.
* @param externalId the external id used to identify this node by external systems
* @param address the nodes transport address
* @param attributes node attributes
* @param roles node roles
* @param version the version of the node
*/
public DiscoveryNode(
String nodeName,
String nodeId,
String externalId,
TransportAddress address,
Map<String, String> attributes,
Set<DiscoveryNodeRole> roles,
Version version
) {
this(
nodeName,
nodeId,
UUIDs.randomBase64UUID(),
address.address().getHostString(),
address.getAddress(),
address,
attributes,
roles,
version,
externalId
);
}

/**
* Creates a new {@link DiscoveryNode}.
* <p>
Expand Down Expand Up @@ -217,6 +260,40 @@ public DiscoveryNode(
Map<String, String> attributes,
Set<DiscoveryNodeRole> roles,
Version version
) {
this(nodeName, nodeId, ephemeralId, hostName, hostAddress, address, attributes, roles, version, null);
}

/**
* Creates a new {@link DiscoveryNode}.
* <p>
* <b>Note:</b> if the version of the node is unknown {@link Version#minimumCompatibilityVersion()} should be used for the current
* version. it corresponds to the minimum version this elasticsearch version can communicate with. If a higher version is used
* the node might not be able to communicate with the remote node. After initial handshakes node versions will be discovered
* and updated.
* </p>
*
* @param nodeName the nodes name
* @param nodeId the nodes unique persistent id
* @param ephemeralId the nodes unique ephemeral id
* @param hostAddress the nodes host address
* @param address the nodes transport address
* @param attributes node attributes
* @param roles node roles
* @param version the version of the node
* @param externalId the external id used to identify this node by external systems
*/
public DiscoveryNode(
String nodeName,
String nodeId,
String ephemeralId,
String hostName,
String hostAddress,
TransportAddress address,
Map<String, String> attributes,
Set<DiscoveryNodeRole> roles,
Version version,
String externalId
) {
if (nodeName != null) {
this.nodeName = nodeStringDeduplicator.deduplicate(nodeName);
Expand All @@ -238,13 +315,22 @@ public DiscoveryNode(
assert DiscoveryNodeRole.roleNames().stream().noneMatch(attributes::containsKey)
: "Node roles must not be provided as attributes but saw attributes " + attributes;
this.roles = Collections.unmodifiableSortedSet(new TreeSet<>(roles));
this.externalId = Objects.requireNonNullElse(externalId, this.nodeName);
}

/** Creates a DiscoveryNode representing the local node. */
public static DiscoveryNode createLocal(Settings settings, TransportAddress publishAddress, String nodeId) {
Map<String, String> attributes = Node.NODE_ATTRIBUTES.getAsMap(settings);
Set<DiscoveryNodeRole> roles = getRolesFromSettings(settings);
return new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), nodeId, publishAddress, attributes, roles, Version.CURRENT);
return new DiscoveryNode(
Node.NODE_NAME_SETTING.get(settings),
nodeId,
Node.NODE_EXTERNAL_ID_SETTING.get(settings),
publishAddress,
attributes,
roles,
Version.CURRENT
);
}

/** extract node roles from the given settings */
Expand Down Expand Up @@ -286,6 +372,11 @@ public DiscoveryNode(StreamInput in) throws IOException {
}
this.roles = Collections.unmodifiableSortedSet(roles);
this.version = Version.readVersion(in);
if (in.getVersion().onOrAfter(EXTERNAL_ID_VERSION)) {
this.externalId = in.readString();
} else {
this.externalId = nodeName;
}
}

@Override
Expand All @@ -304,6 +395,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(role.canContainData());
}
Version.writeVersion(version, out);
if (out.getVersion().onOrAfter(EXTERNAL_ID_VERSION)) {
out.writeString(externalId);
}
}

/**
Expand Down Expand Up @@ -338,6 +432,13 @@ public String getName() {
return this.nodeName;
}

/**
* The external id used to identify this node by external systems
*/
public String getExternalId() {
return externalId;
}

/**
* The node attributes.
*/
Expand Down Expand Up @@ -444,6 +545,9 @@ public void appendDescriptionWithoutAttributes(StringBuilder stringBuilder) {
}
stringBuilder.append('{').append(nodeId).append('}');
stringBuilder.append('{').append(ephemeralId).append('}');
if (externalId.length() > 0) {
stringBuilder.append('{').append(externalId).append('}');
}
stringBuilder.append('{').append(hostName).append('}');
stringBuilder.append('{').append(address).append('}');
if (roles.isEmpty() == false) {
Expand All @@ -465,6 +569,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("name", getName());
builder.field("ephemeral_id", getEphemeralId());
builder.field("transport_address", getAddress().toString());
builder.field("external_id", getExternalId());

builder.startObject("attributes");
for (Map.Entry<String, String> entry : attributes.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public void testToXContent() throws IOException {
"name": "",
"ephemeral_id": "%s",
"transport_address": "0.0.0.0:9000",
"external_id": "",
"attributes": {},
"roles": [
"data",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public void testToXContent() throws IOException {
"name": "",
"ephemeral_id": "%s",
"transport_address": "127.0.0.1:111",
"external_id": "",
"attributes": {},
"roles": [
"data",
Expand Down Expand Up @@ -390,6 +391,7 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti
"name" : "",
"ephemeral_id" : "%s",
"transport_address" : "127.0.0.1:111",
"external_id" : "",
"attributes" : { },
"roles" : [
"data",
Expand Down Expand Up @@ -593,6 +595,7 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti
"name" : "",
"ephemeral_id" : "%s",
"transport_address" : "127.0.0.1:111",
"external_id" : "",
"attributes" : { },
"roles" : [
"data",
Expand Down

0 comments on commit ce9819f

Please sign in to comment.