Skip to content

Commit

Permalink
Health API - Monitoring local disk health (#88390)
Browse files Browse the repository at this point in the history
This PR introduces the local health monitoring functionality needed for
#84811 . The monitor uses the `NodeService` to get the disk usage stats
and determines the node's disk health.

When a change in the disk's is detected or when the health node changes,
this class would be responsible to send the node's health to the health
node. Currently this is simulated with a method that just logs the
current health.

The monitor keeps the last reported health, this way, if something fails
on the next check it will try to resend the new health state.
  • Loading branch information
gmarouli committed Aug 3, 2022
1 parent c7e10e7 commit d828c2a
Show file tree
Hide file tree
Showing 8 changed files with 633 additions and 11 deletions.
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@
exports org.elasticsearch.env;
exports org.elasticsearch.gateway;
exports org.elasticsearch.health;
exports org.elasticsearch.health.node;
exports org.elasticsearch.health.node.selection;
exports org.elasticsearch.http;
exports org.elasticsearch.index;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.PersistedClusterStateService;
import org.elasticsearch.health.node.LocalHealthMonitor;
import org.elasticsearch.health.node.selection.HealthNode;
import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor;
import org.elasticsearch.http.HttpTransportSettings;
Expand Down Expand Up @@ -524,7 +525,8 @@ public void apply(Settings value, Settings current, Settings previous) {
CoordinationDiagnosticsService.NODE_HAS_MASTER_LOOKUP_TIMEFRAME_SETTING,
MasterHistory.MAX_HISTORY_AGE_SETTING,
ReadinessService.PORT,
HealthNode.isEnabled() ? HealthNodeTaskExecutor.ENABLED_SETTING : null
HealthNode.isEnabled() ? HealthNodeTaskExecutor.ENABLED_SETTING : null,
HealthNode.isEnabled() ? LocalHealthMonitor.POLL_INTERVAL_SETTING : null
).filter(Objects::nonNull).collect(Collectors.toSet());

static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,25 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

private ByteSizeValue getFreeBytes(ByteSizeValue total, RelativeByteSizeValue watermark, ByteSizeValue maxHeadroom) {
if (watermark.isAbsolute()) {
return watermark.getAbsolute();
}
return ByteSizeValue.ofBytes(total.getBytes() - watermark.calculateValue(total, maxHeadroom).getBytes());
}

public ByteSizeValue getFreeBytesHighWatermark(ByteSizeValue total) {
return getFreeBytes(total, highWatermark, ByteSizeValue.MINUS_ONE);
}

public ByteSizeValue getFreeBytesFloodStageWatermark(ByteSizeValue total) {
return getFreeBytes(total, floodStageWatermark, ByteSizeValue.MINUS_ONE);
}

public ByteSizeValue getFreeBytesFrozenFloodStageWatermark(ByteSizeValue total) {
return getFreeBytes(total, frozenFloodStageWatermark, frozenFloodStageMaxHeadroom);
}

private String getThresholdStringRep(RelativeByteSizeValue relativeByteSizeValue) {
if (relativeByteSizeValue.isAbsolute()) {
return relativeByteSizeValue.getAbsolute().getStringRep();
Expand Down Expand Up @@ -186,11 +205,11 @@ public int hashCode() {
);
}

static Builder newBuilder() {
public static Builder newBuilder() {
return new Builder();
}

static Builder newBuilder(Disk disk) {
public static Builder newBuilder(Disk disk) {
return new Builder(disk);
}

Expand All @@ -210,16 +229,16 @@ private Builder(Disk disk) {

private Builder() {}

Builder highWatermark(RelativeByteSizeValue highWatermark) {
public Disk.Builder highWatermark(RelativeByteSizeValue highWatermark) {
this.highWatermark = highWatermark;
return this;
}

Builder highWatermark(String highWatermark, String setting) {
public Disk.Builder highWatermark(String highWatermark, String setting) {
return highWatermark(RelativeByteSizeValue.parseRelativeByteSizeValue(highWatermark, setting));
}

Builder floodStageWatermark(RelativeByteSizeValue floodStageWatermark) {
public Disk.Builder floodStageWatermark(RelativeByteSizeValue floodStageWatermark) {
this.floodStageWatermark = floodStageWatermark;
return this;
}
Expand All @@ -228,25 +247,25 @@ public Builder floodStageWatermark(String floodStageWatermark, String setting) {
return floodStageWatermark(RelativeByteSizeValue.parseRelativeByteSizeValue(floodStageWatermark, setting));
}

Builder frozenFloodStageWatermark(RelativeByteSizeValue frozenFloodStageWatermark) {
public Disk.Builder frozenFloodStageWatermark(RelativeByteSizeValue frozenFloodStageWatermark) {
this.frozenFloodStageWatermark = frozenFloodStageWatermark;
return this;
}

Builder frozenFloodStageWatermark(String frozenFloodStageWatermark, String setting) {
public Disk.Builder frozenFloodStageWatermark(String frozenFloodStageWatermark, String setting) {
return frozenFloodStageWatermark(RelativeByteSizeValue.parseRelativeByteSizeValue(frozenFloodStageWatermark, setting));
}

Builder frozenFloodStageMaxHeadroom(ByteSizeValue frozenFloodStageMaxHeadroom) {
public Disk.Builder frozenFloodStageMaxHeadroom(ByteSizeValue frozenFloodStageMaxHeadroom) {
this.frozenFloodStageMaxHeadroom = frozenFloodStageMaxHeadroom;
return this;
}

Builder frozenFloodStageMaxHeadroom(String frozenFloodStageMaxHeadroom, String setting) {
public Disk.Builder frozenFloodStageMaxHeadroom(String frozenFloodStageMaxHeadroom, String setting) {
return frozenFloodStageMaxHeadroom(ByteSizeValue.parseBytesSizeValue(frozenFloodStageMaxHeadroom, setting));
}

Disk build() {
public Disk build() {
return new Disk(highWatermark, floodStageWatermark, frozenFloodStageWatermark, frozenFloodStageMaxHeadroom);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.health.node;

import org.elasticsearch.health.HealthStatus;

/**
* The health status of the disk space of this node along with the cause.
*/
record DiskHealthInfo(HealthStatus healthStatus, Cause cause) {
DiskHealthInfo(HealthStatus healthStatus) {
this(healthStatus, null);
}

enum Cause {
NODE_OVER_HIGH_THRESHOLD,
NODE_OVER_THE_FLOOD_STAGE_THRESHOLD,
FROZEN_NODE_OVER_FLOOD_STAGE_THRESHOLD,
NODE_HAS_NO_DISK_STATS
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.health.node;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.health.HealthStatus;
import org.elasticsearch.health.metadata.HealthMetadata;
import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* This class monitors the health of the node regarding the load on several resources.
* Currently, it only checks for available disk space. Furthermore, it informs the health
* node about the local health upon change or when a new node is detected.
*/
public class LocalHealthMonitor implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(LocalHealthMonitor.class);

public static final Setting<TimeValue> POLL_INTERVAL_SETTING = Setting.timeSetting(
"health.reporting.local.monitor.interval",
TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(10),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private final ClusterService clusterService;
private final ThreadPool threadPool;
private final DiskCheck diskCheck;

private volatile TimeValue monitorInterval;
private volatile boolean enabled;
// Signals that all the prerequisites have been fulfilled and the monitoring task can be scheduled.
private volatile boolean prerequisitesFulfilled;
// Ensures that only one monitoring task will be in progress at any moment in time.
// It removes the need to synchronize scheduling since at the event that there are two
// monitoring tasks scheduled, one of them will be no-op.
private final AtomicBoolean inProgress = new AtomicBoolean();
// Keeps the latest health state that was successfully reported.
private volatile DiskHealthInfo lastReportedDiskHealthInfo = null;

public LocalHealthMonitor(Settings settings, ClusterService clusterService, NodeService nodeService, ThreadPool threadPool) {
this.threadPool = threadPool;
this.monitorInterval = POLL_INTERVAL_SETTING.get(settings);
this.enabled = HealthNodeTaskExecutor.ENABLED_SETTING.get(settings);
this.clusterService = clusterService;
this.diskCheck = new DiskCheck(nodeService);
clusterService.addListener(this);
ClusterSettings clusterSettings = clusterService.getClusterSettings();
clusterSettings.addSettingsUpdateConsumer(POLL_INTERVAL_SETTING, this::setMonitorInterval);
clusterSettings.addSettingsUpdateConsumer(HealthNodeTaskExecutor.ENABLED_SETTING, this::setEnabled);
}

void setMonitorInterval(TimeValue monitorInterval) {
this.monitorInterval = monitorInterval;
maybeScheduleNow();
}

void setEnabled(boolean enabled) {
this.enabled = enabled;
maybeScheduleNow();
}

/**
* We always check if the prerequisites are fulfilled and if the health node
* is enabled before we schedule a monitoring task.
*/
private void maybeScheduleNextRun(TimeValue time) {
if (prerequisitesFulfilled && enabled) {
threadPool.scheduleUnlessShuttingDown(time, ThreadPool.Names.MANAGEMENT, this::monitorHealth);
}
}

// Helper method that starts the monitoring without a delay.
private void maybeScheduleNow() {
maybeScheduleNextRun(TimeValue.timeValueMillis(1));
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (prerequisitesFulfilled == false) {
prerequisitesFulfilled = event.state().nodesIfRecovered().getMinNodeVersion().onOrAfter(Version.V_8_5_0)
&& HealthMetadata.getFromClusterState(event.state()) != null;
maybeScheduleNow();
}
}

// Visible for testing
void monitorHealth() {
if (inProgress.compareAndSet(false, true)) {
ClusterState clusterState = clusterService.state();
HealthMetadata healthMetadata = HealthMetadata.getFromClusterState(clusterState);
assert healthMetadata != null : "health metadata should have been initialized.";
DiskHealthInfo previousHealth = this.lastReportedDiskHealthInfo;
DiskHealthInfo currentHealth = diskCheck.getHealth(healthMetadata, clusterState);
if (currentHealth.equals(previousHealth) == false) {
logger.debug("Health status changed from {} to {}", previousHealth, currentHealth);
this.lastReportedDiskHealthInfo = currentHealth;
}
inProgress.set(false);
// Scheduling happens after the flag inProgress is false, this ensures that
// if the feature is enabled after the following schedule statement, the setEnabled
// method will be able to schedule the next run, and it will not be a no-op.
// We prefer to err towards an extra scheduling than miss the enabling of this feature alltogether.
maybeScheduleNextRun(monitorInterval);
}
}

DiskHealthInfo getLastReportedDiskHealthInfo() {
return lastReportedDiskHealthInfo;
}

/**
* Determines the disk health of this node by checking if it exceeds the thresholds defined in the health metadata.
*/
static class DiskCheck {
private final NodeService nodeService;

DiskCheck(NodeService nodeService) {
this.nodeService = nodeService;
}

DiskHealthInfo getHealth(HealthMetadata healthMetadata, ClusterState clusterState) {
DiscoveryNode node = clusterState.getNodes().getLocalNode();
HealthMetadata.Disk diskMetadata = healthMetadata.getDiskMetadata();
DiskUsage usage = getDiskUsage();
if (usage == null) {
return new DiskHealthInfo(HealthStatus.UNKNOWN, DiskHealthInfo.Cause.NODE_HAS_NO_DISK_STATS);
}

ByteSizeValue totalBytes = ByteSizeValue.ofBytes(usage.getTotalBytes());

if (node.isDedicatedFrozenNode()) {
long frozenFloodStageThreshold = diskMetadata.getFreeBytesFrozenFloodStageWatermark(totalBytes).getBytes();
if (usage.getFreeBytes() < frozenFloodStageThreshold) {
logger.debug("flood stage disk watermark [{}] exceeded on {}", frozenFloodStageThreshold, usage);
return new DiskHealthInfo(HealthStatus.RED, DiskHealthInfo.Cause.FROZEN_NODE_OVER_FLOOD_STAGE_THRESHOLD);
}
return new DiskHealthInfo(HealthStatus.GREEN);
}

long floodStageThreshold = diskMetadata.getFreeBytesFloodStageWatermark(totalBytes).getBytes();
if (usage.getFreeBytes() < floodStageThreshold) {
return new DiskHealthInfo(HealthStatus.RED, DiskHealthInfo.Cause.NODE_OVER_THE_FLOOD_STAGE_THRESHOLD);
}

long highThreshold = diskMetadata.getFreeBytesHighWatermark(totalBytes).getBytes();
if (usage.getFreeBytes() < highThreshold && hasRelocatingShards(clusterState, node.getId()) == false) {
return new DiskHealthInfo(HealthStatus.YELLOW, DiskHealthInfo.Cause.NODE_OVER_HIGH_THRESHOLD);
}
return new DiskHealthInfo(HealthStatus.GREEN);
}

private DiskUsage getDiskUsage() {
NodeStats nodeStats = nodeService.stats(
CommonStatsFlags.NONE,
false,
false,
false,
false,
true,
false,
false,
false,
false,
false,
false,
false,
false,
false
);
return DiskUsage.findLeastAvailablePath(nodeStats);
}

private boolean hasRelocatingShards(ClusterState clusterState, String nodeId) {
return clusterState.getRoutingNodes().node(nodeId).shardsWithState(ShardRoutingState.RELOCATING).isEmpty() == false;
}
}
}
5 changes: 5 additions & 0 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import org.elasticsearch.health.HealthIndicatorService;
import org.elasticsearch.health.HealthService;
import org.elasticsearch.health.metadata.HealthMetadataService;
import org.elasticsearch.health.node.LocalHealthMonitor;
import org.elasticsearch.health.node.selection.HealthNode;
import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor;
import org.elasticsearch.http.HttpServerTransport;
Expand Down Expand Up @@ -945,6 +946,9 @@ protected Node(
HealthMetadataService healthMetadataService = HealthNode.isEnabled()
? new HealthMetadataService(clusterService, settings)
: null;
LocalHealthMonitor localHealthMonitor = HealthNode.isEnabled()
? new LocalHealthMonitor(settings, clusterService, nodeService, threadPool)
: null;

FileSettingsService fileSettingsService = new FileSettingsService(
clusterService,
Expand Down Expand Up @@ -1038,6 +1042,7 @@ protected Node(
if (HealthNode.isEnabled()) {
b.bind(HealthNodeTaskExecutor.class).toInstance(healthNodeTaskExecutor);
b.bind(HealthMetadataService.class).toInstance(healthMetadataService);
b.bind(LocalHealthMonitor.class).toInstance(localHealthMonitor);
}
b.bind(Tracer.class).toInstance(tracer);
b.bind(FileSettingsService.class).toInstance(fileSettingsService);
Expand Down

0 comments on commit d828c2a

Please sign in to comment.