Skip to content

Commit

Permalink
Introduce an easy way to get node id by its name (#107392)
Browse files Browse the repository at this point in the history
Our test utility returns the node name when starting a new node.
A lot of APIs (such as routing table or node shutdown) require a node id.
This change introduces a simple way to retrieve the node id based on its name.
  • Loading branch information
idegtiarenko committed Apr 12, 2024
1 parent 7a5f400 commit 32bcb13
Show file tree
Hide file tree
Showing 19 changed files with 45 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class PrevalidateNodeRemovalRestIT extends HttpSmokeTestCase {

public void testRestStatusCode() throws IOException {
String node1Name = internalCluster().getRandomNodeName();
String node1Id = internalCluster().clusterService(node1Name).localNode().getId();
String node1Id = getNodeId(node1Name);
ensureGreen();
RestClient client = getRestClient();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void testNodeRemovalFromNonRedCluster() throws Exception {
PrevalidateNodeRemovalRequest.Builder req = PrevalidateNodeRemovalRequest.builder();
switch (randomIntBetween(0, 2)) {
case 0 -> req.setNames(nodeName);
case 1 -> req.setIds(internalCluster().clusterService(nodeName).localNode().getId());
case 1 -> req.setIds(getNodeId(nodeName));
case 2 -> req.setExternalIds(internalCluster().clusterService(nodeName).localNode().getExternalId());
default -> throw new IllegalStateException("Unexpected value");
}
Expand Down Expand Up @@ -156,7 +156,7 @@ public void testNodeRemovalFromRedClusterWithLocalShardCopy() throws Exception {
// Prevalidate removal of node1
PrevalidateNodeRemovalRequest req = PrevalidateNodeRemovalRequest.builder().setNames(node1).build();
PrevalidateNodeRemovalResponse resp = client().execute(PrevalidateNodeRemovalAction.INSTANCE, req).get();
String node1Id = internalCluster().clusterService(node1).localNode().getId();
String node1Id = getNodeId(node1);
assertFalse(resp.getPrevalidation().isSafe());
assertThat(resp.getPrevalidation().message(), equalTo("removal of the following nodes might not be safe: [" + node1Id + "]"));
assertThat(resp.getPrevalidation().nodes().size(), equalTo(1));
Expand Down Expand Up @@ -187,7 +187,7 @@ public void testNodeRemovalFromRedClusterWithTimeout() throws Exception {
.timeout(TimeValue.timeValueSeconds(1));
PrevalidateNodeRemovalResponse resp = client().execute(PrevalidateNodeRemovalAction.INSTANCE, req).get();
assertFalse("prevalidation result should return false", resp.getPrevalidation().isSafe());
String node2Id = internalCluster().clusterService(node2).localNode().getId();
String node2Id = getNodeId(node2);
assertThat(
resp.getPrevalidation().message(),
equalTo("cannot prevalidate removal of nodes with the following IDs: [" + node2Id + "]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public void testCheckShards() throws Exception {
.stream()
.map(ShardRouting::shardId)
.collect(Collectors.toSet());
String node1Id = internalCluster().clusterService(node1).localNode().getId();
String node2Id = internalCluster().clusterService(node2).localNode().getId();
String node1Id = getNodeId(node1);
String node2Id = getNodeId(node2);
Set<ShardId> shardIdsToCheck = new HashSet<>(shardIds);
boolean includeUnknownShardId = randomBoolean();
if (includeUnknownShardId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ public void testSendingShardFailure() throws Exception {
String nonMasterNode = randomFrom(nonMasterNodes);
assertAcked(prepareCreate("test").setSettings(indexSettings(3, 2)));
ensureGreen();
String nonMasterNodeId = internalCluster().clusterService(nonMasterNode).localNode().getId();
String nonMasterNodeId = getNodeId(nonMasterNode);

// fail a random shard
ShardRouting failedShard = randomFrom(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,8 @@ public void testShardActiveElseWhere() throws Exception {
final String masterNode = internalCluster().getMasterName();
final String nonMasterNode = nodes.get(0).equals(masterNode) ? nodes.get(1) : nodes.get(0);

final String masterId = internalCluster().clusterService(masterNode).localNode().getId();
final String nonMasterId = internalCluster().clusterService(nonMasterNode).localNode().getId();
final String masterId = getNodeId(masterNode);
final String nonMasterId = getNodeId(nonMasterNode);

final int numShards = scaledRandomIntBetween(2, 10);
assertAcked(prepareCreate("test").setSettings(indexSettings(numShards, 0)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.monitor.os.OsInfo;
Expand All @@ -29,16 +28,16 @@
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class SimpleNodesInfoIT extends ESIntegTestCase {

public void testNodesInfos() throws Exception {
List<String> nodesIds = internalCluster().startNodes(2);
final String node_1 = nodesIds.get(0);
final String node_2 = nodesIds.get(1);
public void testNodesInfos() {
List<String> nodesNames = internalCluster().startNodes(2);
final String node_1 = nodesNames.get(0);
final String node_2 = nodesNames.get(1);

ClusterHealthResponse clusterHealth = clusterAdmin().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").get();
logger.info("--> done cluster_health, status {}", clusterHealth.getStatus());

String server1NodeId = internalCluster().getInstance(ClusterService.class, node_1).state().nodes().getLocalNodeId();
String server2NodeId = internalCluster().getInstance(ClusterService.class, node_2).state().nodes().getLocalNodeId();
String server1NodeId = getNodeId(node_1);
String server2NodeId = getNodeId(node_2);
logger.info("--> started nodes: {} and {}", server1NodeId, server2NodeId);

NodesInfoResponse response = clusterAdmin().prepareNodesInfo().get();
Expand Down Expand Up @@ -68,16 +67,16 @@ public void testNodesInfos() throws Exception {
assertThat(response.getNodesMap().get(server2NodeId), notNullValue());
}

public void testNodesInfosTotalIndexingBuffer() throws Exception {
List<String> nodesIds = internalCluster().startNodes(2);
final String node_1 = nodesIds.get(0);
final String node_2 = nodesIds.get(1);
public void testNodesInfosTotalIndexingBuffer() {
List<String> nodesNames = internalCluster().startNodes(2);
final String node_1 = nodesNames.get(0);
final String node_2 = nodesNames.get(1);

ClusterHealthResponse clusterHealth = clusterAdmin().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").get();
logger.info("--> done cluster_health, status {}", clusterHealth.getStatus());

String server1NodeId = internalCluster().getInstance(ClusterService.class, node_1).state().nodes().getLocalNodeId();
String server2NodeId = internalCluster().getInstance(ClusterService.class, node_2).state().nodes().getLocalNodeId();
String server1NodeId = getNodeId(node_1);
String server2NodeId = getNodeId(node_2);
logger.info("--> started nodes: {} and {}", server1NodeId, server2NodeId);

NodesInfoResponse response = clusterAdmin().prepareNodesInfo().get();
Expand All @@ -103,19 +102,19 @@ public void testNodesInfosTotalIndexingBuffer() throws Exception {
}

public void testAllocatedProcessors() throws Exception {
List<String> nodesIds = internalCluster().startNodes(
List<String> nodeNames = internalCluster().startNodes(
Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 2.9).build(),
Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 5.9).build()
);

final String node_1 = nodesIds.get(0);
final String node_2 = nodesIds.get(1);
final String node_1 = nodeNames.get(0);
final String node_2 = nodeNames.get(1);

ClusterHealthResponse clusterHealth = clusterAdmin().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").get();
logger.info("--> done cluster_health, status {}", clusterHealth.getStatus());

String server1NodeId = internalCluster().getInstance(ClusterService.class, node_1).state().nodes().getLocalNodeId();
String server2NodeId = internalCluster().getInstance(ClusterService.class, node_2).state().nodes().getLocalNodeId();
String server1NodeId = getNodeId(node_1);
String server2NodeId = getNodeId(node_2);
logger.info("--> started nodes: {} and {}", server1NodeId, server2NodeId);

NodesInfoResponse response = clusterAdmin().prepareNodesInfo().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void testPersistentActionWithNoAvailableNode() throws Exception {

Settings nodeSettings = Settings.builder().put(nodeSettings(0, Settings.EMPTY)).put("node.attr.test_attr", "test").build();
String newNode = internalCluster().startNode(nodeSettings);
String newNodeId = internalCluster().clusterService(newNode).localNode().getId();
String newNodeId = getNodeId(newNode);
waitForTaskToStart();

TaskInfo taskInfo = clusterAdmin().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks().get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
Expand Down Expand Up @@ -135,7 +134,7 @@ protected void checkTransientErrorsDuringRecoveryAreRetried(String recoveryActio
ensureSearchable(indexName);

ClusterStateResponse stateResponse = clusterAdmin().prepareState().get();
final String blueNodeId = internalCluster().getInstance(ClusterService.class, blueNodeName).localNode().getId();
final String blueNodeId = getNodeId(blueNodeName);

assertFalse(stateResponse.getState().getRoutingNodes().node(blueNodeId).isEmpty());

Expand Down Expand Up @@ -231,7 +230,7 @@ public void checkDisconnectsWhileRecovering(String recoveryActionToBlock) throws
ensureSearchable(indexName);

ClusterStateResponse stateResponse = clusterAdmin().prepareState().get();
final String blueNodeId = internalCluster().getInstance(ClusterService.class, blueNodeName).localNode().getId();
final String blueNodeId = getNodeId(blueNodeName);

assertFalse(stateResponse.getState().getRoutingNodes().node(blueNodeId).isEmpty());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,10 @@ public static void awaitClusterState(Logger logger, String viaNode, Predicate<Cl
ClusterServiceUtils.awaitClusterState(logger, statePredicate, internalCluster().getInstance(ClusterService.class, viaNode));
}

public static String getNodeId(String nodeName) {
return internalCluster().getInstance(ClusterService.class, nodeName).localNode().getId();
}

/**
* Waits until at least a give number of document is visible for searchers
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,20 @@

import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.List;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.notNullValue;

public class ComponentVersionsNodesInfoIT extends ESIntegTestCase {

public void testNodesInfoComponentVersions() {
List<String> nodesIds = internalCluster().startNodes(1);
final String node_1 = nodesIds.get(0);
final String node_1 = internalCluster().startNode();

ClusterHealthResponse clusterHealth = clusterAdmin().prepareHealth().setWaitForGreenStatus().setWaitForNodes("1").get();
logger.info("--> done cluster_health, status {}", clusterHealth.getStatus());

String server1NodeId = internalCluster().getInstance(ClusterService.class, node_1).state().nodes().getLocalNodeId();
String server1NodeId = getNodeId(node_1);
logger.info("--> started nodes: {}", server1NodeId);

NodesInfoResponse response = clusterAdmin().prepareNodesInfo().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps;
Expand Down Expand Up @@ -268,7 +267,7 @@ private static void enrich(Map<String, List<String>> keys, String coordinatingNo
EnrichStatsAction.Response statsResponse = client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request())
.actionGet();
assertThat(statsResponse.getCoordinatorStats().size(), equalTo(internalCluster().size()));
String nodeId = internalCluster().getInstance(ClusterService.class, coordinatingNode).localNode().getId();
String nodeId = getNodeId(coordinatingNode);
CoordinatorStats stats = statsResponse.getCoordinatorStats().stream().filter(s -> s.getNodeId().equals(nodeId)).findAny().get();
assertThat(stats.getNodeId(), equalTo(nodeId));
assertThat(stats.getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ public void testMultipleNodes() throws Exception {
assertThat(((StringTerms) aggregation).getBuckets().size(), equalTo(nbNodes));

for (String nodeName : internalCluster().getNodeNames()) {
StringTerms.Bucket bucket = ((StringTerms) aggregation).getBucketByKey(
internalCluster().clusterService(nodeName).localNode().getId()
);
StringTerms.Bucket bucket = ((StringTerms) aggregation).getBucketByKey(getNodeId(nodeName));
// At least 1 doc must exist per node, but it can be more than 1
// because the first node may have already collected many node stats documents
// whereas the last node just started to collect node stats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void testExport() throws Exception {
aggregation.getBuckets().size()
);
for (String nodeName : internalCluster().getNodeNames()) {
String nodeId = internalCluster().clusterService(nodeName).localNode().getId();
String nodeId = getNodeId(nodeName);
Terms.Bucket bucket = aggregation.getBucketByKey(nodeId);
assertTrue("No bucket found for node id [" + nodeId + "]", bucket != null);
assertTrue(bucket.getDocCount() >= 1L);
Expand Down Expand Up @@ -208,7 +208,7 @@ public void testExport() throws Exception {
response -> {
Terms aggregation = response.getAggregations().get("agg_nodes_ids");
for (String nodeName : internalCluster().getNodeNames()) {
String nodeId = internalCluster().clusterService(nodeName).localNode().getId();
String nodeId = getNodeId(nodeName);
Terms.Bucket bucket = aggregation.getBucketByKey(nodeId);
assertTrue("No bucket found for node id [" + nodeId + "]", bucket != null);
assertTrue(bucket.getDocCount() >= 1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void testNodeRemovalFromClusterWihRedSearchableSnapshotIndex() throws Exc
PrevalidateNodeRemovalRequest.Builder req = PrevalidateNodeRemovalRequest.builder();
switch (randomIntBetween(0, 2)) {
case 0 -> req.setNames(node2);
case 1 -> req.setIds(internalCluster().clusterService(node2).localNode().getId());
case 1 -> req.setIds(getNodeId(node2));
case 2 -> req.setExternalIds(internalCluster().clusterService(node2).localNode().getExternalId());
default -> throw new IllegalStateException("Unexpected value");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -36,7 +35,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
public void testDesiredBalanceWithShutdown() throws Exception {

final var oldNodeName = internalCluster().startNode();
final var oldNodeId = internalCluster().getInstance(ClusterService.class, oldNodeName).localNode().getId();
final var oldNodeId = getNodeId(oldNodeName);

createIndex(
INDEX,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ShutdownAwarePlugin;
import org.elasticsearch.test.ESIntegTestCase;
Expand Down Expand Up @@ -44,21 +41,8 @@ public void testShutdownAwarePlugin() throws Exception {

final String shutdownNode;
final String remainNode;
NodesInfoResponse nodes = clusterAdmin().prepareNodesInfo().clear().get();
final String node1Id = nodes.getNodes()
.stream()
.map(NodeInfo::getNode)
.filter(node -> node.getName().equals(node1))
.map(DiscoveryNode::getId)
.findFirst()
.orElseThrow();
final String node2Id = nodes.getNodes()
.stream()
.map(NodeInfo::getNode)
.filter(node -> node.getName().equals(node2))
.map(DiscoveryNode::getId)
.findFirst()
.orElseThrow();
final String node1Id = getNodeId(node1);
final String node2Id = getNodeId(node2);

if (randomBoolean()) {
shutdownNode = node1Id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@

package org.elasticsearch.xpack.shutdown;

import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -93,17 +90,6 @@ private void deleteNodeShutdown(String nodeId) {
assertAcked(client().execute(DeleteShutdownNodeAction.INSTANCE, new DeleteShutdownNodeAction.Request(nodeId)));
}

private String getNodeId(String nodeName) {
NodesInfoResponse nodes = clusterAdmin().prepareNodesInfo().clear().get();
return nodes.getNodes()
.stream()
.map(NodeInfo::getNode)
.filter(node -> node.getName().equals(nodeName))
.map(DiscoveryNode::getId)
.findFirst()
.orElseThrow();
}

private void assertNoShuttingDownNodes(String nodeId) throws ExecutionException, InterruptedException {
var response = client().execute(GetShutdownStatusAction.INSTANCE, new GetShutdownStatusAction.Request(nodeId)).get();
assertThat(response.getShutdownStatuses(), empty());
Expand Down

0 comments on commit 32bcb13

Please sign in to comment.