Skip to content

Commit

Permalink
Enable the health node and the disk health indicator #84811 (#90085)
Browse files Browse the repository at this point in the history
  • Loading branch information
gmarouli committed Sep 20, 2022
1 parent 6d3d894 commit 4901cf8
Show file tree
Hide file tree
Showing 16 changed files with 89 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,6 @@ public void execute(Task t) {
// TODO: remove this once cname is prepended to transport.publish_address by default in 8.0
test.systemProperty("es.transport.cname_in_publish_address", "true");

// TODO: remove this once the disk usage indicator feature is finished #84811
test.systemProperty("es.health_node_feature_flag_enabled", true);

// Set netty system properties to the properties we configure in jvm.options
test.systemProperty("io.netty.noUnsafe", "true");
test.systemProperty("io.netty.noKeySetOptimization", "true");
Expand Down
2 changes: 0 additions & 2 deletions docs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ testClusters.matching { it.name == "yamlRestTest"}.configureEach {
setting 'xpack.license.self_generated.type', 'trial'
setting 'indices.lifecycle.history_index_enabled', 'false'
keystorePassword 'keystore-password'
// TODO: remove this once the disk usage indicator feature is finished #84811
requiresFeature 'es.health_node_feature_flag_enabled', Version.fromString("8.4.0")
}

// enable regexes in painless so our tests don't complain about example snippets that use them
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/90085.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 90085
summary: "Enable the health node and the disk health indicator #84811"
area: Health
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.contains;
Expand Down Expand Up @@ -81,18 +82,24 @@ public void testGeoIpSystemFeaturesMigration() throws Exception {
}
}

@SuppressWarnings("unchecked")
private void testDatabasesLoaded() throws IOException {
Request getTaskState = new Request("GET", "/_cluster/state");
ObjectPath state = ObjectPath.createFromResponse(client().performRequest(getTaskState));

Map<String, Object> databases = null;
try {
databases = state.evaluate("metadata.persistent_tasks.tasks.0.task.geoip-downloader.state.databases");
} catch (Exception e) {
// ObjectPath doesn't like the 0 above if the list of tasks is empty, and it throws rather than returning null,
// catch that and throw an AssertionError instead (which assertBusy will handle)
List<?> tasks = state.evaluate("metadata.persistent_tasks.tasks");
// Short-circuit to avoid using steams if the list is empty
if (tasks.isEmpty()) {
fail();
}
Map<String, Object> databases = (Map<String, Object>) tasks.stream().map(task -> {
try {
return ObjectPath.evaluate(task, "task.geoip-downloader.state.databases");
} catch (IOException e) {
return null;
}
}).filter(Objects::nonNull).findFirst().orElse(null);

assertNotNull(databases);

for (String name : List.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@
import org.elasticsearch.health.RestGetHealthAction;
import org.elasticsearch.health.node.FetchHealthInfoCacheAction;
import org.elasticsearch.health.node.UpdateHealthInfoCacheAction;
import org.elasticsearch.health.node.selection.HealthNode;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.indices.SystemIndices;
Expand Down Expand Up @@ -703,10 +702,8 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(UpdateDesiredNodesAction.INSTANCE, TransportUpdateDesiredNodesAction.class);
actions.register(DeleteDesiredNodesAction.INSTANCE, TransportDeleteDesiredNodesAction.class);

if (HealthNode.isEnabled()) {
actions.register(UpdateHealthInfoCacheAction.INSTANCE, UpdateHealthInfoCacheAction.TransportAction.class);
actions.register(FetchHealthInfoCacheAction.INSTANCE, FetchHealthInfoCacheAction.TransportAction.class);
}
actions.register(UpdateHealthInfoCacheAction.INSTANCE, UpdateHealthInfoCacheAction.TransportAction.class);
actions.register(FetchHealthInfoCacheAction.INSTANCE, FetchHealthInfoCacheAction.TransportAction.class);

return unmodifiableMap(actions.getRegistry());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.health.metadata.HealthMetadataService;
import org.elasticsearch.health.node.selection.HealthNode;
import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.ingest.IngestMetadata;
Expand Down Expand Up @@ -176,10 +175,9 @@ public static List<Entry> getNamedWriteables() {
// Task Status (not Diffable)
entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new));

if (HealthNode.isEnabled()) {
entries.addAll(HealthNodeTaskExecutor.getNamedWriteables());
entries.addAll(HealthMetadataService.getNamedWriteables());
}
// Health API
entries.addAll(HealthNodeTaskExecutor.getNamedWriteables());
entries.addAll(HealthMetadataService.getNamedWriteables());
return entries;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.elasticsearch.gateway.PersistedClusterStateService;
import org.elasticsearch.health.node.LocalHealthMonitor;
import org.elasticsearch.health.node.action.TransportHealthNodeAction;
import org.elasticsearch.health.node.selection.HealthNode;
import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.index.IndexModule;
Expand Down Expand Up @@ -118,11 +117,8 @@

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Encapsulates all valid cluster level settings.
Expand Down Expand Up @@ -189,7 +185,7 @@ public void apply(Settings value, Settings current, Settings previous) {
}
}

public static Set<Setting<?>> BUILT_IN_CLUSTER_SETTINGS = Stream.of(
public static Set<Setting<?>> BUILT_IN_CLUSTER_SETTINGS = Set.of(
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING,
Expand Down Expand Up @@ -529,10 +525,10 @@ public void apply(Settings value, Settings current, Settings previous) {
CoordinationDiagnosticsService.NODE_HAS_MASTER_LOOKUP_TIMEFRAME_SETTING,
MasterHistory.MAX_HISTORY_AGE_SETTING,
ReadinessService.PORT,
HealthNode.isEnabled() ? HealthNodeTaskExecutor.ENABLED_SETTING : null,
HealthNode.isEnabled() ? LocalHealthMonitor.POLL_INTERVAL_SETTING : null,
HealthNode.isEnabled() ? TransportHealthNodeAction.HEALTH_NODE_TRANSPORT_ACTION_TIMEOUT : null
).filter(Objects::nonNull).collect(Collectors.toSet());
HealthNodeTaskExecutor.ENABLED_SETTING,
LocalHealthMonitor.POLL_INTERVAL_SETTING,
TransportHealthNodeAction.HEALTH_NODE_TRANSPORT_ACTION_TIMEOUT
);

static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList();

Expand Down
64 changes: 26 additions & 38 deletions server/src/main/java/org/elasticsearch/health/HealthService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.health.node.FetchHealthInfoCacheAction;
import org.elasticsearch.health.node.HealthInfo;
import org.elasticsearch.health.node.selection.HealthNode;

import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -101,43 +100,32 @@ public void getHealth(
.filter(result -> indicatorName == null || result.name().equals(indicatorName));

if (clusterHealthIsObtainable) {
if (HealthNode.isEnabled()) {
client.execute(FetchHealthInfoCacheAction.INSTANCE, new FetchHealthInfoCacheAction.Request(), new ActionListener<>() {
@Override
public void onResponse(FetchHealthInfoCacheAction.Response response) {
HealthInfo healthInfo = response.getHealthInfo();
validateResultsAndNotifyListener(
indicatorName,
Stream.concat(
filteredPreflightResults,
filteredIndicators.map(service -> service.calculate(explain, healthInfo))
).toList(),
listener
);
}

@Override
public void onFailure(Exception e) {
validateResultsAndNotifyListener(
indicatorName,
Stream.concat(
filteredPreflightResults,
filteredIndicators.map(service -> service.calculate(explain, HealthInfo.EMPTY_HEALTH_INFO))
).toList(),
listener
);
}
});
} else {
validateResultsAndNotifyListener(
indicatorName,
Stream.concat(
filteredPreflightResults,
filteredIndicators.map(service -> service.calculate(explain, HealthInfo.EMPTY_HEALTH_INFO))
).toList(),
listener
);
}

client.execute(FetchHealthInfoCacheAction.INSTANCE, new FetchHealthInfoCacheAction.Request(), new ActionListener<>() {
@Override
public void onResponse(FetchHealthInfoCacheAction.Response response) {
HealthInfo healthInfo = response.getHealthInfo();
validateResultsAndNotifyListener(
indicatorName,
Stream.concat(filteredPreflightResults, filteredIndicators.map(service -> service.calculate(explain, healthInfo)))
.toList(),
listener
);
}

@Override
public void onFailure(Exception e) {
validateResultsAndNotifyListener(
indicatorName,
Stream.concat(
filteredPreflightResults,
filteredIndicators.map(service -> service.calculate(explain, HealthInfo.EMPTY_HEALTH_INFO))
).toList(),
listener
);
}
});

} else {
// Mark remaining indicators as UNKNOWN
HealthIndicatorDetails unknownDetails = healthUnknownReason(preflightResults, explain);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public String getWriteableName() {

@Override
public Version getMinimalSupportedVersion() {
return Version.V_8_4_0;
return Version.V_8_5_0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ private void updateOnClusterStateChange(ClusterChangedEvent event) {
} else if (isMaster == false) {
readyToPublish = false;
}
// Wait until every node in the cluster is upgraded to 8.4.0 or later
if (event.state().nodesIfRecovered().getMinNodeVersion().onOrAfter(Version.V_8_4_0)) {
// Wait until every node in the cluster is upgraded to 8.5.0 or later
if (event.state().nodesIfRecovered().getMinNodeVersion().onOrAfter(Version.V_8_5_0)) {
if (readyToPublish) {
resetHealthMetadata("health-metadata-update-master-election");
readyToPublish = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@
*/
public class HealthNode extends AllocatedPersistentTask {

public static final boolean FEATURE_FLAG_ENABLED = "true".equals(System.getProperty("es.health_node_feature_flag_enabled"));

public static boolean isEnabled() {
return FEATURE_FLAG_ENABLED;
}

public static final String TASK_NAME = "health-node";

HealthNode(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public String getWriteableName() {

@Override
public Version getMinimalSupportedVersion() {
return Version.V_8_4_0;
return Version.V_8_5_0;
}

@Override
Expand Down
43 changes: 16 additions & 27 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@
import org.elasticsearch.health.node.DiskHealthIndicatorService;
import org.elasticsearch.health.node.HealthInfoCache;
import org.elasticsearch.health.node.LocalHealthMonitor;
import org.elasticsearch.health.node.selection.HealthNode;
import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.index.IndexSettingProvider;
Expand Down Expand Up @@ -522,9 +521,6 @@ protected Node(
SystemIndexMigrationExecutor.getNamedWriteables().stream()
).flatMap(Function.identity()).toList();
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
Stream<NamedXContentRegistry.Entry> healthNodeTaskNamedXContentParsers = HealthNode.isEnabled()
? HealthNodeTaskExecutor.getNamedXContentParsers().stream()
: Stream.empty();
NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(
Stream.of(
NetworkModule.getNamedXContents().stream(),
Expand All @@ -533,7 +529,7 @@ protected Node(
pluginsService.flatMap(Plugin::getNamedXContent),
ClusterModule.getNamedXWriteables().stream(),
SystemIndexMigrationExecutor.getNamedXContentParsers().stream(),
healthNodeTaskNamedXContentParsers
HealthNodeTaskExecutor.getNamedXContentParsers().stream()
).flatMap(Function.identity()).collect(toList())
);
final List<SystemIndices.Feature> features = pluginsService.filterPlugins(SystemIndexPlugin.class).stream().map(plugin -> {
Expand Down Expand Up @@ -915,12 +911,13 @@ protected Node(
metadataCreateIndexService,
settingsModule.getIndexScopedSettings()
);
final HealthNodeTaskExecutor healthNodeTaskExecutor = HealthNode.isEnabled()
? HealthNodeTaskExecutor.create(clusterService, persistentTasksService, settings, clusterService.getClusterSettings())
: null;
final List<PersistentTasksExecutor<?>> builtinTaskExecutors = HealthNode.isEnabled()
? List.of(systemIndexMigrationExecutor, healthNodeTaskExecutor)
: List.of(systemIndexMigrationExecutor);
final HealthNodeTaskExecutor healthNodeTaskExecutor = HealthNodeTaskExecutor.create(
clusterService,
persistentTasksService,
settings,
clusterService.getClusterSettings()
);
final List<PersistentTasksExecutor<?>> builtinTaskExecutors = List.of(systemIndexMigrationExecutor, healthNodeTaskExecutor);
final List<PersistentTasksExecutor<?>> pluginTaskExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
.stream()
.map(
Expand Down Expand Up @@ -962,13 +959,9 @@ protected Node(
masterHistoryService
);
HealthService healthService = createHealthService(clusterService, clusterModule, coordinationDiagnosticsService);
HealthMetadataService healthMetadataService = HealthNode.isEnabled()
? HealthMetadataService.create(clusterService, settings)
: null;
LocalHealthMonitor localHealthMonitor = HealthNode.isEnabled()
? LocalHealthMonitor.create(settings, clusterService, nodeService, threadPool, client)
: null;
HealthInfoCache nodeHealthOverview = HealthNode.isEnabled() ? HealthInfoCache.create(clusterService) : null;
HealthMetadataService healthMetadataService = HealthMetadataService.create(clusterService, settings);
LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(settings, clusterService, nodeService, threadPool, client);
HealthInfoCache nodeHealthOverview = HealthInfoCache.create(clusterService);

modules.add(b -> {
b.bind(Node.class).toInstance(this);
Expand Down Expand Up @@ -1053,12 +1046,10 @@ protected Node(
b.bind(HealthService.class).toInstance(healthService);
b.bind(MasterHistoryService.class).toInstance(masterHistoryService);
b.bind(CoordinationDiagnosticsService.class).toInstance(coordinationDiagnosticsService);
if (HealthNode.isEnabled()) {
b.bind(HealthNodeTaskExecutor.class).toInstance(healthNodeTaskExecutor);
b.bind(HealthMetadataService.class).toInstance(healthMetadataService);
b.bind(LocalHealthMonitor.class).toInstance(localHealthMonitor);
b.bind(HealthInfoCache.class).toInstance(nodeHealthOverview);
}
b.bind(HealthNodeTaskExecutor.class).toInstance(healthNodeTaskExecutor);
b.bind(HealthMetadataService.class).toInstance(healthMetadataService);
b.bind(LocalHealthMonitor.class).toInstance(localHealthMonitor);
b.bind(HealthInfoCache.class).toInstance(nodeHealthOverview);
b.bind(Tracer.class).toInstance(tracer);
b.bind(FileSettingsService.class).toInstance(fileSettingsService);
});
Expand Down Expand Up @@ -1171,9 +1162,7 @@ private HealthService createHealthService(
new ShardsAvailabilityHealthIndicatorService(clusterService, clusterModule.getAllocationService())
)
);
if (HealthNode.isEnabled()) {
serverHealthIndicatorServices.add(new DiskHealthIndicatorService(clusterService));
}
serverHealthIndicatorServices.add(new DiskHealthIndicatorService(clusterService));
var pluginHealthIndicatorServices = pluginsService.filterPlugins(HealthPlugin.class)
.stream()
.flatMap(plugin -> plugin.getHealthIndicatorServices().stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.health.node.DiskHealthInfo;
import org.elasticsearch.health.node.FetchHealthInfoCacheAction;
import org.elasticsearch.health.node.HealthInfo;
import org.elasticsearch.health.node.selection.HealthNode;
import org.elasticsearch.test.ESTestCase;

import java.util.Collections;
Expand Down Expand Up @@ -370,7 +369,7 @@ public String name() {

@Override
public HealthIndicatorResult calculate(boolean explain, HealthInfo healthInfo) {
if (expectedHealthInfo != null && HealthNode.isEnabled()) {
if (expectedHealthInfo != null) {
assertThat(healthInfo, equalTo(expectedHealthInfo));
}
return result;
Expand Down

0 comments on commit 4901cf8

Please sign in to comment.