Skip to content

Commit

Permalink
Health info overview (#89275)
Browse files Browse the repository at this point in the history
This PR enables the nodes of a cluster to push their health info to the health node.
Co-authored-by: Andrei Dan <andrei.dan@elastic.co>
  • Loading branch information
gmarouli committed Sep 5, 2022
1 parent 492f5b1 commit e63d70b
Show file tree
Hide file tree
Showing 13 changed files with 1,030 additions and 75 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.health;

import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.health.node.DiskHealthInfo;
import org.elasticsearch.health.node.HealthInfoCache;
import org.elasticsearch.health.node.LocalHealthMonitor;
import org.elasticsearch.health.node.selection.HealthNode;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 4)
public class UpdateHealthInfoCacheIT extends ESIntegTestCase {

private static final DiskHealthInfo GREEN = new DiskHealthInfo(HealthStatus.GREEN, null);

public void testNodesReportingHealth() throws Exception {
try (InternalTestCluster internalCluster = internalCluster(); Client client = internalCluster.client()) {
decreasePollingInterval(client);
ClusterState state = internalCluster.client().admin().cluster().prepareState().clear().setNodes(true).get().getState();
String[] nodeIds = state.getNodes().getNodes().keySet().toArray(new String[0]);
DiscoveryNode healthNode = waitAndGetHealthNode(client);
assertThat(healthNode, notNullValue());
assertBusy(() -> {
Map<String, DiskHealthInfo> healthInfoCache = internalCluster.getInstance(HealthInfoCache.class, healthNode.getName())
.getDiskHealthInfo();
assertThat(healthInfoCache.size(), equalTo(nodeIds.length));
for (String nodeId : nodeIds) {
assertThat(healthInfoCache.get(nodeId), equalTo(GREEN));
}
});
} catch (IOException e) {
throw new RuntimeException("Failed to close internal cluster: " + e.getMessage(), e);
}
}

public void testNodeLeavingCluster() throws Exception {
try (InternalTestCluster internalCluster = internalCluster(); Client client = internalCluster.client()) {
decreasePollingInterval(client);
ClusterState state = internalCluster.client().admin().cluster().prepareState().clear().setNodes(true).get().getState();
Collection<DiscoveryNode> nodes = state.getNodes().getNodes().values();
DiscoveryNode healthNode = waitAndGetHealthNode(client);
assertThat(healthNode, notNullValue());
DiscoveryNode nodeToLeave = nodes.stream().filter(node -> {
boolean isMaster = node.getName().equals(internalCluster.getMasterName());
boolean isHealthNode = node.getId().equals(healthNode.getId());
// We have dedicated tests for master and health node
return isMaster == false && isHealthNode == false;
}).findAny().orElseThrow();
internalCluster.stopNode(nodeToLeave.getName());
assertBusy(() -> {
Map<String, DiskHealthInfo> healthInfoCache = internalCluster.getInstance(HealthInfoCache.class, healthNode.getName())
.getDiskHealthInfo();
assertThat(healthInfoCache.size(), equalTo(nodes.size() - 1));
for (DiscoveryNode node : nodes) {
if (node.getId().equals(nodeToLeave.getId())) {
assertThat(healthInfoCache.containsKey(node.getId()), equalTo(false));
} else {
assertThat(healthInfoCache.get(node.getId()), equalTo(GREEN));
}
}
});
} catch (IOException e) {
throw new RuntimeException("Failed to close internal cluster: " + e.getMessage(), e);
}
}

public void testHealthNodeFailOver() throws Exception {
try (InternalTestCluster internalCluster = internalCluster(); Client client = internalCluster.client()) {
decreasePollingInterval(client);
ClusterState state = internalCluster.client().admin().cluster().prepareState().clear().setNodes(true).get().getState();
String[] nodeIds = state.getNodes().getNodes().keySet().toArray(new String[0]);
DiscoveryNode healthNodeToBeShutDown = waitAndGetHealthNode(client);
assertThat(healthNodeToBeShutDown, notNullValue());
internalCluster.restartNode(healthNodeToBeShutDown.getName());
ensureStableCluster(nodeIds.length);
DiscoveryNode newHealthNode = waitAndGetHealthNode(client);
assertThat(newHealthNode, notNullValue());
logger.info("Previous health node {}, new health node {}.", healthNodeToBeShutDown, newHealthNode);
assertBusy(() -> {
Map<String, DiskHealthInfo> healthInfoCache = internalCluster.getInstance(HealthInfoCache.class, newHealthNode.getName())
.getDiskHealthInfo();
assertThat(healthInfoCache.size(), equalTo(nodeIds.length));
for (String nodeId : nodeIds) {
assertThat(healthInfoCache.get(nodeId), equalTo(GREEN));
}
});
} catch (IOException e) {
throw new RuntimeException("Failed to close internal cluster: " + e.getMessage(), e);
}
}

public void testMasterFailure() throws Exception {
try (InternalTestCluster internalCluster = internalCluster(); Client client = internalCluster.client()) {
decreasePollingInterval(client);
ClusterState state = internalCluster.client().admin().cluster().prepareState().clear().setNodes(true).get().getState();
String[] nodeIds = state.getNodes().getNodes().keySet().toArray(new String[0]);
DiscoveryNode healthNodeBeforeIncident = waitAndGetHealthNode(client);
assertThat(healthNodeBeforeIncident, notNullValue());
String masterName = internalCluster.getMasterName();
logger.info("Restarting elected master node {}.", masterName);
internalCluster.restartNode(masterName);
ensureStableCluster(nodeIds.length);
DiscoveryNode newHealthNode = waitAndGetHealthNode(client);
assertThat(newHealthNode, notNullValue());
assertBusy(() -> {
Map<String, DiskHealthInfo> healthInfoCache = internalCluster.getInstance(HealthInfoCache.class, newHealthNode.getName())
.getDiskHealthInfo();
assertThat(healthInfoCache.size(), equalTo(nodeIds.length));
for (String nodeId : nodeIds) {
assertThat(healthInfoCache.get(nodeId), equalTo(GREEN));
}
});
} catch (IOException e) {
throw new RuntimeException("Failed to close internal cluster: " + e.getMessage(), e);
}
}

@Nullable
private static DiscoveryNode waitAndGetHealthNode(Client client) throws InterruptedException {
DiscoveryNode[] healthNode = new DiscoveryNode[1];
waitUntil(() -> {
ClusterState state = client.admin().cluster().prepareState().clear().setMetadata(true).setNodes(true).get().getState();
healthNode[0] = HealthNode.findHealthNode(state);
return healthNode[0] != null;
}, 2, TimeUnit.SECONDS);
return healthNode[0];
}

private void decreasePollingInterval(Client client) {
client.admin()
.cluster()
.updateSettings(
new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder().put(LocalHealthMonitor.POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(10))
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards;
import org.elasticsearch.health.GetHealthAction;
import org.elasticsearch.health.RestGetHealthAction;
import org.elasticsearch.health.node.UpdateHealthInfoCacheAction;
import org.elasticsearch.health.node.selection.HealthNode;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.indices.SystemIndices;
Expand Down Expand Up @@ -700,6 +702,10 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(UpdateDesiredNodesAction.INSTANCE, TransportUpdateDesiredNodesAction.class);
actions.register(DeleteDesiredNodesAction.INSTANCE, TransportDeleteDesiredNodesAction.class);

if (HealthNode.isEnabled()) {
actions.register(UpdateHealthInfoCacheAction.INSTANCE, UpdateHealthInfoCacheAction.TransportAction.class);
}

return unmodifiableMap(actions.getRegistry());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,33 @@

package org.elasticsearch.health.node;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.health.HealthStatus;

import java.io.IOException;

/**
* The health status of the disk space of this node along with the cause.
*/
record DiskHealthInfo(HealthStatus healthStatus, Cause cause) {
public record DiskHealthInfo(HealthStatus healthStatus, @Nullable Cause cause) implements Writeable {
DiskHealthInfo(HealthStatus healthStatus) {
this(healthStatus, null);
}

enum Cause {
public DiskHealthInfo(StreamInput in) throws IOException {
this(in.readEnum(HealthStatus.class), in.readOptionalEnum(Cause.class));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
healthStatus.writeTo(out);
out.writeOptionalEnum(cause);
}

public enum Cause {
NODE_OVER_HIGH_THRESHOLD,
NODE_OVER_THE_FLOOD_STAGE_THRESHOLD,
FROZEN_NODE_OVER_FLOOD_STAGE_THRESHOLD,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.health.node;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.health.node.selection.HealthNode;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Keeps track of several health statuses per node that can be used in health.
*/
public class HealthInfoCache implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(HealthInfoCache.class);
private volatile ConcurrentHashMap<String, DiskHealthInfo> diskInfoByNode = new ConcurrentHashMap<>();

private HealthInfoCache() {}

public static HealthInfoCache create(ClusterService clusterService) {
HealthInfoCache healthInfoCache = new HealthInfoCache();
clusterService.addListener(healthInfoCache);
return healthInfoCache;
}

public void updateNodeHealth(String nodeId, DiskHealthInfo diskHealthInfo) {
diskInfoByNode.put(nodeId, diskHealthInfo);
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
DiscoveryNode currentHealthNode = HealthNode.findHealthNode(event.state());
DiscoveryNode localNode = event.state().nodes().getLocalNode();
if (currentHealthNode != null && localNode.getId().equals(currentHealthNode.getId())) {
if (event.nodesRemoved()) {
for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) {
diskInfoByNode.remove(removedNode.getId());
}
}
// Resetting the cache is not synchronized for efficiency and simplicity.
// Processing a delayed update after the cache has been emptied because
// the node is not the health node anymore has small impact since it will
// be reset in the next round again.
} else if (diskInfoByNode.isEmpty() == false) {
logger.debug("Node [{}][{}] is no longer the health node, emptying the cache.", localNode.getName(), localNode.getId());
diskInfoByNode = new ConcurrentHashMap<>();
}
}

// A shallow copy is enough because the inner data is immutable.
public Map<String, DiskHealthInfo> getDiskHealthInfo() {
return Map.copyOf(diskInfoByNode);
}
}

0 comments on commit e63d70b

Please sign in to comment.