Skip to content

Commit

Permalink
add validations for data node lifecycle functions (#17402)
Browse files Browse the repository at this point in the history
* publish data node status

* add paginated datanode service

* changelog

* adjust changelog

* adjust NodeImplTest

* add start/stop functionality

* changelog

* re-add secured annotation

* refactor initial separation of server node and data node

* fix backward compatibility

* refactor separation of server node and data node

* add timezone

* add new service tests

* add entity tests

* inject DataNodeDto NodeService

* remove clustereventsubscriber registration

* add validations

* add verification tests

---------

Co-authored-by: Ousmane SAMBA <ousmane@graylog.com>
  • Loading branch information
moesterheld and ousmaneo committed Dec 5, 2023
1 parent 48480f8 commit c8e34b4
Show file tree
Hide file tree
Showing 18 changed files with 498 additions and 48 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/pr-17402.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type = "a"
message = "Add start/stop lifecycle functions to data node. Add validations for triggering data node lifecycle functions."

issues = ["17383"]
pulls = ["17402"]
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.graylog2.cluster.nodes.NodeService;
import org.graylog2.cluster.preflight.DataNodeProvisioningStateChangeEvent;
import org.graylog2.datanode.DataNodeLifecycleEvent;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.plugin.system.NodeId;
import org.graylog2.security.CustomCAX509TrustManager;
import org.slf4j.Logger;
Expand All @@ -36,8 +35,6 @@
import javax.inject.Provider;
import javax.inject.Singleton;

import static com.google.common.base.Preconditions.checkNotNull;

@Singleton
public class OpensearchProcessService extends AbstractIdleService implements Provider<OpensearchProcess> {

Expand All @@ -53,7 +50,6 @@ public class OpensearchProcessService extends AbstractIdleService implements Pro
public OpensearchProcessService(final DatanodeConfiguration datanodeConfiguration,
final Provider<OpensearchConfiguration> configurationProvider,
final EventBus eventBus,
final ClusterEventBus clusterEventBus,
final CustomCAX509TrustManager trustManager,
final NodeService<DataNodeDto> nodeService,
final Configuration configuration,
Expand All @@ -63,10 +59,9 @@ public OpensearchProcessService(final DatanodeConfiguration datanodeConfiguratio
this.nodeId = nodeId;
this.process = createOpensearchProcess(datanodeConfiguration, trustManager, configuration, nodeService);
eventBus.register(this);
checkNotNull(clusterEventBus).registerClusterEventSubscriber(this);
}

private OpensearchProcess createOpensearchProcess(final DatanodeConfiguration datanodeConfiguration, final CustomCAX509TrustManager trustManager, final Configuration configuration, final NodeService nodeService) {
private OpensearchProcess createOpensearchProcess(final DatanodeConfiguration datanodeConfiguration, final CustomCAX509TrustManager trustManager, final Configuration configuration, final NodeService<DataNodeDto> nodeService) {
final OpensearchProcessImpl process = new OpensearchProcessImpl(datanodeConfiguration, datanodeConfiguration.processLogsBufferSize(), trustManager, configuration, nodeService);
final ProcessWatchdog watchdog = new ProcessWatchdog(process, WATCHDOG_RESTART_ATTEMPTS);
process.addStateMachineTracer(watchdog);
Expand All @@ -90,6 +85,8 @@ public void handleNodeLifecycleEvent(DataNodeLifecycleEvent event) {
switch (event.trigger()) {
case REMOVE -> process.onRemove();
case RESET -> process.onReset();
case STOP -> this.shutDown();
case START -> this.startUp();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
*/
package org.graylog.datanode.rest;

import org.graylog.datanode.management.OpensearchProcess;
import org.graylog.datanode.rest.config.OnlyInSecuredNode;
import org.graylog2.datanode.DataNodeLifecycleEvent;
import org.graylog2.datanode.DataNodeLifecycleTrigger;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.plugin.system.NodeId;

import javax.inject.Inject;
import javax.ws.rs.DELETE;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
Expand All @@ -29,17 +33,39 @@
@Produces(MediaType.APPLICATION_JSON)
public class ManagementController {

private final OpensearchProcess openSearch;
private final ClusterEventBus clusterEventBus;
private final NodeId nodeId;


@Inject
public ManagementController(OpensearchProcess openSearch) {
this.openSearch = openSearch;
public ManagementController(ClusterEventBus clusterEventBus, NodeId nodeId) {
this.clusterEventBus = clusterEventBus;
this.nodeId = nodeId;
}

@DELETE
@OnlyInSecuredNode
public void remove() {
openSearch.onRemove();
postEvent(DataNodeLifecycleTrigger.REMOVE);
}

@POST
@Path("/start")
@OnlyInSecuredNode
public void start() {
postEvent(DataNodeLifecycleTrigger.START);
}

@POST
@Path("/stop")
@OnlyInSecuredNode
public void stop() {
postEvent(DataNodeLifecycleTrigger.STOP);
}

private void postEvent(DataNodeLifecycleTrigger trigger) {
DataNodeLifecycleEvent e = DataNodeLifecycleEvent.create(nodeId.getNodeId(), trigger);
clusterEventBus.post(e);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.integration;

import com.github.rholder.retry.RetryException;
import org.apache.commons.lang3.RandomStringUtils;
import org.graylog.datanode.configuration.variants.KeystoreInformation;
import org.graylog.datanode.restoperations.DatanodeRestApiWait;
import org.graylog.datanode.restoperations.DatanodeStatusChangeOperation;
import org.graylog.datanode.restoperations.RestOperationParameters;
import org.graylog.datanode.testinfra.DatanodeContainerizedBackend;
import org.graylog2.plugin.Tools;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Path;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.util.concurrent.ExecutionException;

import static org.graylog.datanode.testinfra.DatanodeContainerizedBackend.IMAGE_WORKING_DIR;
import static org.graylog.testing.completebackend.ContainerizedGraylogBackend.ROOT_PASSWORD_PLAINTEXT;

public class DatanodeLifecycleIT {
private static final Logger LOG = LoggerFactory.getLogger(DatanodeLifecycleIT.class);

@TempDir
static Path tempDir;
private DatanodeContainerizedBackend backend;
private KeyStore trustStore;
private String containerHostname;
private String restAdminUsername;

@BeforeEach
void setUp() throws IOException, GeneralSecurityException {

restAdminUsername = RandomStringUtils.randomAlphanumeric(10);

containerHostname = "graylog-datanode-host-" + RandomStringUtils.random(8, "0123456789abcdef");
// first generate a self-signed CA
KeystoreInformation ca = DatanodeSecurityTestUtils.generateCa(tempDir);
trustStore = DatanodeSecurityTestUtils.buildTruststore(ca);

// use the CA to generate transport certificate keystore
final KeystoreInformation transportCert = DatanodeSecurityTestUtils.generateTransportCert(tempDir, ca, containerHostname);
// use the CA to generate HTTP certificate keystore
final KeystoreInformation httpCert = DatanodeSecurityTestUtils.generateHttpCert(tempDir, ca, containerHostname, Tools.getLocalCanonicalHostname());

backend = new DatanodeContainerizedBackend(containerHostname, datanodeContainer -> {
// provide the keystore files to the docker container
datanodeContainer.withFileSystemBind(transportCert.location().toAbsolutePath().toString(), IMAGE_WORKING_DIR + "/config/datanode-transport-certificates.p12");
datanodeContainer.withFileSystemBind(httpCert.location().toAbsolutePath().toString(), IMAGE_WORKING_DIR + "/config/datanode-https-certificates.p12");

// configure transport security
datanodeContainer.withEnv("GRAYLOG_DATANODE_TRANSPORT_CERTIFICATE", "datanode-transport-certificates.p12");
datanodeContainer.withEnv("GRAYLOG_DATANODE_TRANSPORT_CERTIFICATE_PASSWORD", transportCert.passwordAsString());
datanodeContainer.withEnv("GRAYLOG_DATANODE_INSECURE_STARTUP", "false");

// configure http security
datanodeContainer.withEnv("GRAYLOG_DATANODE_HTTP_CERTIFICATE", "datanode-https-certificates.p12");
datanodeContainer.withEnv("GRAYLOG_DATANODE_HTTP_CERTIFICATE_PASSWORD", httpCert.passwordAsString());

// this is the interface that we bind opensearch to. It must be 0.0.0.0 if we want
// to be able to reach opensearch from outside the container and docker network (true?)
datanodeContainer.withEnv("GRAYLOG_DATANODE_HTTP_BIND_ADDRESS", "0.0.0.0");

// HOSTNAME is used to generate the SSL certificates and to communicate inside the
// container and docker network, where we do the hostname validation.
datanodeContainer.withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd.withName(containerHostname));
datanodeContainer.withEnv("GRAYLOG_DATANODE_HOSTNAME", containerHostname);

datanodeContainer.withEnv("GRAYLOG_DATANODE_SINGLE_NODE_ONLY", "true");

datanodeContainer.withEnv("GRAYLOG_DATANODE_ROOT_USERNAME", restAdminUsername);
}).start();
}

@AfterEach
void tearDown() {
backend.stop();
}

@Test
void testRestartByEventBus() throws ExecutionException, RetryException, InterruptedException {
final RestOperationParameters restParameters = RestOperationParameters.builder()
.port(backend.getDatanodeRestPort())
.truststore(trustStore)
.username(restAdminUsername)
.password(ROOT_PASSWORD_PLAINTEXT)
.build();
final DatanodeRestApiWait waitApi = new DatanodeRestApiWait(restParameters);
final DatanodeStatusChangeOperation statusApi = new DatanodeStatusChangeOperation(restParameters);

try {
waitApi.waitForAvailableStatus();
statusApi.triggerNodeStop();
waitApi.waitForStoppedStatus();
statusApi.triggerNodeStart();
waitApi.waitForAvailableStatus();
} catch (Exception e) {
LOG.warn(backend.getLogs());
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,10 @@ public ValidatableResponse waitForAvailableStatus() throws ExecutionException, R
);
}

public ValidatableResponse waitForStoppedStatus() throws ExecutionException, RetryException {
return waitForResponse("/",
input -> !input.extract().body().path("opensearch.node.state").equals("TERMINATED")
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,18 @@ public void triggerNodeRemoval() {
);
}

public void triggerNodeStop() {
validatedResponse("/management/stop", HttpMethod.POST, "",
"Could not trigger node stop",
r -> r.extract().statusCode() < 300
);
}

public void triggerNodeStart() {
validatedResponse("/management/start", HttpMethod.POST, "",
"Could not trigger node start",
r -> r.extract().statusCode() < 300
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,7 @@ public String getId() {
void initiateRenewalForNode(String nodeId);
List<DataNode> findNodes();

DataNodeDto addProvisioningInformation(DataNodeDto node);

List<DataNodeDto> addProvisioningInformation(Collection<DataNodeDto> nodes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,17 +221,20 @@ public List<DataNode> findNodes() {
}).toList();
}

@Override
public DataNodeDto addProvisioningInformation(DataNodeDto node) {
final var keystore = loadKeyStoreForNode(node);
final var certificate = keystore.flatMap(this::getCertificateForNode);
final var certValidUntil = certificate.map(cert -> cert.getNotAfter().toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime());
final var config = getDataNodeProvisioningConfig(node).orElseThrow(() -> new IllegalStateException("No config found for data node " + node.getNodeId()));
return node.toBuilder().setProvisioningInformation(new CertRenewalService.ProvisioningInformation(
config.state(), config.errorMsg(), certValidUntil.orElse(null)
)).build();
}

@Override
public List<DataNodeDto> addProvisioningInformation(Collection<DataNodeDto> nodes) {
return nodes.stream().map(node -> {
final var keystore = loadKeyStoreForNode(node);
final var certificate = keystore.flatMap(this::getCertificateForNode);
final var certValidUntil = certificate.map(cert -> cert.getNotAfter().toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime());
final var config = getDataNodeProvisioningConfig(node).orElseThrow(() -> new IllegalStateException("No config found for data node " + node.getNodeId()));
return node.toBuilder().setProvisioningInformation(new CertRenewalService.ProvisioningInformation(
config.state(), config.errorMsg(), certValidUntil.orElse(null)
)).build();
}).toList();
return nodes.stream().map(this::addProvisioningInformation).toList();
}

private Optional<DataNodeProvisioningConfig> getDataNodeProvisioningConfig(final Node node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class AuditEventTypes implements PluginAuditEventTypes {
public static final String DATANODE_API_REQUEST = PREFIX + "data_node:api_request";
public static final String DATANODE_REMOVE = PREFIX + "data_node:remove";
public static final String DATANODE_RESET = PREFIX + "data_node:reset";
public static final String DATANODE_STOP = PREFIX + "data_node:stop";
public static final String DATANODE_START = PREFIX + "data_node:start";
public static final String ES_INDEX_CLOSE = PREFIX + "es_index:close";
public static final String ES_INDEX_CREATE = PREFIX + "es_index:create";
public static final String ES_INDEX_DELETE = PREFIX + "es_index:delete";
Expand Down Expand Up @@ -191,6 +193,8 @@ public class AuditEventTypes implements PluginAuditEventTypes {
.add(DATANODE_API_REQUEST)
.add(DATANODE_REMOVE)
.add(DATANODE_RESET)
.add(DATANODE_STOP)
.add(DATANODE_START)
.add(DATANODE_API_REQUEST)
.add(ES_INDEX_CLOSE)
.add(ES_INDEX_CREATE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
package org.graylog2.datanode;

public enum DataNodeLifecycleTrigger {
REMOVE, RESET
REMOVE, RESET, STOP, START
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

import com.google.inject.ImplementedBy;
import org.graylog2.cluster.NodeNotFoundException;
import org.graylog2.cluster.nodes.DataNodeDto;

@ImplementedBy(DataNodeServiceImpl.class)
public interface DataNodeService {
void removeNode(String nodeId) throws NodeNotFoundException;
DataNodeDto removeNode(String nodeId) throws NodeNotFoundException;

void resetNode(String nodeId) throws NodeNotFoundException;
DataNodeDto resetNode(String nodeId) throws NodeNotFoundException;

DataNodeDto stopNode(String nodeId) throws NodeNotFoundException;

DataNodeDto startNode(String nodeId) throws NodeNotFoundException;
}
Loading

0 comments on commit c8e34b4

Please sign in to comment.