Skip to content

Commit

Permalink
File Settings Service (#88329)
Browse files Browse the repository at this point in the history
Adds a file watcher service for applying Elasticsearch
settings though a file. The saved cluster state through this
method is immutable through the REST API.
  • Loading branch information
grcevski committed Jul 26, 2022
1 parent 0986d8b commit f8a8df4
Show file tree
Hide file tree
Showing 13 changed files with 949 additions and 64 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/88329.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 88329
summary: File Settings Service
area: Infra/Core
type: feature
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.reservedstate.service;

import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata;
import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata;
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
import org.elasticsearch.test.ESIntegTestCase;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING;
import static org.elasticsearch.test.NodeRoles.dataOnlyNode;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
public class FileSettingsServiceIT extends ESIntegTestCase {

private AtomicLong versionCounter = new AtomicLong(1);

private static String testJSON = """
{
"metadata": {
"version": "%s",
"compatibility": "8.4.0"
},
"state": {
"cluster_settings": {
"indices.recovery.max_bytes_per_sec": "50mb"
}
}
}""";

private static String testErrorJSON = """
{
"metadata": {
"version": "%s",
"compatibility": "8.4.0"
},
"state": {
"not_cluster_settings": {
"search.allow_expensive_queries": "false"
}
}
}""";

private void assertMasterNode(Client client, String node) {
assertThat(
client.admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(),
equalTo(node)
);
}

private void writeJSONFile(String node, String json) throws Exception {
long version = versionCounter.incrementAndGet();

FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node);

Files.createDirectories(fileSettingsService.operatorSettingsDir());
Files.write(fileSettingsService.operatorSettingsFile(), Strings.format(json, version).getBytes(StandardCharsets.UTF_8));
}

private CountDownLatch setupClusterStateListener(String node) {
ClusterService clusterService = internalCluster().clusterService(node);
CountDownLatch savedClusterState = new CountDownLatch(1);
clusterService.addListener(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
ReservedStateMetadata reservedState = event.state().metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE);
if (reservedState != null) {
ReservedStateHandlerMetadata handlerMetadata = reservedState.handlers().get(ReservedClusterSettingsAction.NAME);
if (handlerMetadata == null) {
fail("Should've found cluster settings in this metadata");
}
assertThat(handlerMetadata.keys(), contains("indices.recovery.max_bytes_per_sec"));
clusterService.removeListener(this);
savedClusterState.countDown();
}
}
});

return savedClusterState;
}

private void assertClusterStateSaveOK(CountDownLatch savedClusterState) throws Exception {
boolean awaitSuccessful = savedClusterState.await(20, TimeUnit.SECONDS);
assertTrue(awaitSuccessful);

final ClusterStateResponse clusterStateResponse = client().admin().cluster().state(new ClusterStateRequest()).actionGet();

assertThat(
clusterStateResponse.getState().metadata().persistentSettings().get(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey()),
equalTo("50mb")
);

ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder().put(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "1234kb")
);
assertEquals(
"java.lang.IllegalArgumentException: Failed to process request "
+ "[org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest/unset] "
+ "with errors: [[indices.recovery.max_bytes_per_sec] set as read-only by [file_settings]]",
expectThrows(ExecutionException.class, () -> client().admin().cluster().updateSettings(req).get()).getMessage()
);
}

public void testSettingsApplied() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
logger.info("--> start data node / non master node");
String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s"));
FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode);

assertFalse(dataFileSettingsService.watching());

logger.info("--> start master node");
final String masterNode = internalCluster().startMasterOnlyNode();
assertMasterNode(internalCluster().nonMasterClient(), masterNode);
var savedClusterState = setupClusterStateListener(masterNode);

FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);

assertTrue(masterFileSettingsService.watching());
assertFalse(dataFileSettingsService.watching());

writeJSONFile(masterNode, testJSON);
assertClusterStateSaveOK(savedClusterState);
}

public void testSettingsAppliedOnStart() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
logger.info("--> start data node / non master node");
String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s"));
FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode);

assertFalse(dataFileSettingsService.watching());
var savedClusterState = setupClusterStateListener(dataNode);

// In internal cluster tests, the nodes share the config directory, so when we write with the data node path
// the master will pick it up on start
writeJSONFile(dataNode, testJSON);

logger.info("--> start master node");
final String masterNode = internalCluster().startMasterOnlyNode();
assertMasterNode(internalCluster().nonMasterClient(), masterNode);

FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);

assertTrue(masterFileSettingsService.watching());
assertFalse(dataFileSettingsService.watching());

assertClusterStateSaveOK(savedClusterState);
}

private CountDownLatch setupClusterStateListenerForError(String node) {
ClusterService clusterService = internalCluster().clusterService(node);
CountDownLatch savedClusterState = new CountDownLatch(1);
clusterService.addListener(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
ReservedStateMetadata reservedState = event.state().metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE);
if (reservedState != null) {
assertEquals(ReservedStateErrorMetadata.ErrorKind.PARSING, reservedState.errorMetadata().errorKind());
assertThat(reservedState.errorMetadata().errors(), allOf(notNullValue(), hasSize(1)));
assertThat(
reservedState.errorMetadata().errors().get(0),
containsString("Missing handler definition for content key [not_cluster_settings]")
);
clusterService.removeListener(this);
savedClusterState.countDown();
}
}
});

return savedClusterState;
}

private void assertClusterStateNotSaved(CountDownLatch savedClusterState) throws Exception {
boolean awaitSuccessful = savedClusterState.await(20, TimeUnit.SECONDS);
assertTrue(awaitSuccessful);

final ClusterStateResponse clusterStateResponse = client().admin().cluster().state(new ClusterStateRequest()).actionGet();

assertThat(clusterStateResponse.getState().metadata().persistentSettings().get("search.allow_expensive_queries"), nullValue());

ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder().put("search.allow_expensive_queries", "false")
);
// This should succeed, nothing was reserved
client().admin().cluster().updateSettings(req).get();
}

public void testErrorSaved() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
logger.info("--> start data node / non master node");
String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s"));
FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode);

assertFalse(dataFileSettingsService.watching());

logger.info("--> start master node");
final String masterNode = internalCluster().startMasterOnlyNode();
assertMasterNode(internalCluster().nonMasterClient(), masterNode);
var savedClusterState = setupClusterStateListenerForError(masterNode);

FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);

assertTrue(masterFileSettingsService.watching());
assertFalse(dataFileSettingsService.watching());

writeJSONFile(masterNode, testErrorJSON);
assertClusterStateNotSaved(savedClusterState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ public void testThreadNames() throws Exception {
// or the ones that are occasionally come up from ESSingleNodeTestCase
if (threadName.contains("[node_s_0]") // TODO: this can't possibly be right! single node and integ test are unrelated!
|| threadName.contains("Keep-Alive-Timer")
|| threadName.contains("readiness-service")
|| threadName.contains("JVMCI-native") // GraalVM Compiler Thread
|| threadName.contains("readiness-service")) {
|| threadName.contains("file-settings-watcher")
|| threadName.contains("FileSystemWatchService")) {
continue;
}
String nodePrefix = "("
Expand Down
13 changes: 12 additions & 1 deletion server/src/main/java/org/elasticsearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.NamedRegistry;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.TypeLiteral;
Expand All @@ -274,6 +275,8 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.ActionPlugin.ActionHandler;
import org.elasticsearch.plugins.interceptor.RestInterceptorActionPlugin;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.service.ReservedClusterStateService;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestHeaderDefinition;
Expand Down Expand Up @@ -449,6 +452,7 @@ public class ActionModule extends AbstractModule {
private final RequestValidators<PutMappingRequest> mappingRequestValidators;
private final RequestValidators<IndicesAliasesRequest> indicesAliasesRequestRequestValidators;
private final ThreadPool threadPool;
private final ReservedClusterStateService reservedClusterStateService;

public ActionModule(
Settings settings,
Expand All @@ -462,7 +466,9 @@ public ActionModule(
CircuitBreakerService circuitBreakerService,
UsageService usageService,
SystemIndices systemIndices,
Tracer tracer
Tracer tracer,
ClusterService clusterService,
List<ReservedClusterStateHandler<?>> reservedStateHandlers
) {
this.settings = settings;
this.indexNameExpressionResolver = indexNameExpressionResolver;
Expand Down Expand Up @@ -513,6 +519,7 @@ public ActionModule(
);

restController = new RestController(headers, restInterceptor, nodeClient, circuitBreakerService, usageService, tracer);
reservedClusterStateService = new ReservedClusterStateService(clusterService, reservedStateHandlers);
}

public Map<String, ActionHandler<?, ?>> getActions() {
Expand Down Expand Up @@ -922,4 +929,8 @@ public ActionFilters getActionFilters() {
public RestController getRestController() {
return restController;
}

public ReservedClusterStateService getReservedClusterStateService() {
return reservedClusterStateService;
}
}
29 changes: 28 additions & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@
import org.elasticsearch.readiness.ReadinessService;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.ReservedClusterStateHandlerProvider;
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
import org.elasticsearch.reservedstate.service.FileSettingsService;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
Expand Down Expand Up @@ -705,6 +709,17 @@ protected Node(
)
).toList();

List<ReservedClusterStateHandler<?>> reservedStateHandlers = new ArrayList<>();

// add all reserved state handlers from server
reservedStateHandlers.add(new ReservedClusterSettingsAction(settingsModule.getClusterSettings()));

// add all reserved state handlers from plugins
List<? extends ReservedClusterStateHandlerProvider> pluginHandlers = pluginsService.loadServiceProviders(
ReservedClusterStateHandlerProvider.class
);
pluginHandlers.forEach(h -> reservedStateHandlers.addAll(h.handlers()));

ActionModule actionModule = new ActionModule(
settings,
clusterModule.getIndexNameExpressionResolver(),
Expand All @@ -717,7 +732,9 @@ protected Node(
circuitBreakerService,
usageService,
systemIndices,
tracer
tracer,
clusterService,
reservedStateHandlers
);
modules.add(actionModule);

Expand Down Expand Up @@ -929,6 +946,12 @@ protected Node(
? new HealthMetadataService(clusterService, settings)
: null;

FileSettingsService fileSettingsService = new FileSettingsService(
clusterService,
actionModule.getReservedClusterStateService(),
environment
);

modules.add(b -> {
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
Expand Down Expand Up @@ -1017,6 +1040,7 @@ protected Node(
b.bind(HealthMetadataService.class).toInstance(healthMetadataService);
}
b.bind(Tracer.class).toInstance(tracer);
b.bind(FileSettingsService.class).toInstance(fileSettingsService);
});

if (ReadinessService.enabled(environment)) {
Expand Down Expand Up @@ -1297,6 +1321,7 @@ public void onTimeout(TimeValue timeout) {
}
}

injector.getInstance(FileSettingsService.class).start();
injector.getInstance(HttpServerTransport.class).start();

if (WRITE_PORTS_FILE_SETTING.get(settings())) {
Expand Down Expand Up @@ -1334,6 +1359,7 @@ private Node stop() {
if (ReadinessService.enabled(environment)) {
injector.getInstance(ReadinessService.class).stop();
}
injector.getInstance(FileSettingsService.class).stop();
injector.getInstance(ResourceWatcherService.class).close();
injector.getInstance(HttpServerTransport.class).stop();

Expand Down Expand Up @@ -1417,6 +1443,7 @@ public synchronized void close() throws IOException {
if (ReadinessService.enabled(environment)) {
toClose.add(injector.getInstance(ReadinessService.class));
}
toClose.add(injector.getInstance(FileSettingsService.class));

for (LifecycleComponent plugin : pluginLifecycleComponents) {
toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));
Expand Down

0 comments on commit f8a8df4

Please sign in to comment.