Skip to content

Commit

Permalink
Add subsystem health check to liveness checks
Browse files Browse the repository at this point in the history
  • Loading branch information
dimabarbul committed Jan 18, 2024
1 parent 0ad1528 commit 2690786
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 0 deletions.
3 changes: 3 additions & 0 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,9 @@ pekko {
"pekko-contrib-mongodb-persistence-connection-snapshots"
]
}
management.health-checks.liveness-checks {
subsystem-health = "org.eclipse.ditto.internal.utils.health.SubsystemHealthCheck"
}
}

include "ditto-protocol-subscriber.conf"
Expand Down
4 changes: 4 additions & 0 deletions gateway/service/src/main/resources/gateway.conf
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@ pekko {
management.health-checks.readiness-checks {
gateway-http-readiness = "org.eclipse.ditto.gateway.service.health.GatewayHttpReadinessCheck"
}

management.health-checks.liveness-checks {
subsystem-health = "org.eclipse.ditto.internal.utils.health.SubsystemHealthCheck"
}
}

include "ditto-protocol-subscriber.conf"
Expand Down
5 changes: 5 additions & 0 deletions internal/utils/health/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.health;

import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.pattern.Patterns;

/**
* Health check supplier for Pekko Management checking whether the root actor's health checking actor reports success.
* The health check will report failure if health checking actor replies with status DOWN or times out.
*/
public final class SubsystemHealthCheck implements Supplier<CompletionStage<Boolean>> {

/**
* The message to ask the health checking actor.
*/
private static final RetrieveHealth RETRIEVE_HEALTH_ASK_MESSAGE = RetrieveHealth.newInstance();

/**
* Default timeout for waiting for response from health checking actor.
*/
private static final Duration TIMEOUT = Duration.ofSeconds(30);

/**
* Health checking actor selection path.
*/
private static final String HEALTH_CHECKING_ACTOR_PATH =
"/user/*Root/" + DefaultHealthCheckingActorFactory.ACTOR_NAME;

private final ActorSelection healthCheckingActor;
private final Duration timeout;

/**
* Constructs subsystem health check with default timeout.
*
* @param system actor system to check health of
*/
public SubsystemHealthCheck(final ActorSystem system) {
this(system, TIMEOUT);
}

/**
* Constructs subsystem health check with custom timeout.
*
* @param system actor system to check health of
* @param timeout timeout
*/
public SubsystemHealthCheck(final ActorSystem system, final Duration timeout) {
healthCheckingActor = system.actorSelection(HEALTH_CHECKING_ACTOR_PATH);
this.timeout = timeout;
}

@Override
public CompletionStage<Boolean> get() {
return Patterns.ask(healthCheckingActor, RETRIEVE_HEALTH_ASK_MESSAGE, timeout)
.handle((answer, throwable) -> answer instanceof StatusInfo statusInfo
&& statusInfo.getStatus() != StatusInfo.Status.DOWN);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.health;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;

import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class SubsystemHealthCheckTest {

private static final String TEST_ACTOR_READY = "ready";
private static final Duration HEALTH_CHECK_TIMEOUT = Duration.ofMillis(500);

/**
* Name of root actor in tests. Must end with "Root" to be recognized by {@link SubsystemHealthCheck}.
*/
private static final String ROOT_ACTOR_NAME = "testRoot";

private ActorSystem actorSystem;
private SubsystemHealthCheck subsystemHealthCheck;

@Before
public void setUp() {
actorSystem = ActorSystem.create();
subsystemHealthCheck = new SubsystemHealthCheck(actorSystem, HEALTH_CHECK_TIMEOUT);
}

@After
public void tearDown() {
if (actorSystem != null) {
actorSystem.terminate();
}
}

@Test
public void testCheckReturnsTrueWhenHealthCheckingActorReturnsUp() {
new TestKit(actorSystem) {{
final var statusInfo = StatusInfo.fromStatus(StatusInfo.Status.UP);
actorSystem.actorOf(TestRootActor.props(statusInfo, getRef()), ROOT_ACTOR_NAME);

expectMsg(TEST_ACTOR_READY);

assertHealthCheckResult(subsystemHealthCheck, true);
}};
}

@Test
public void testCheckReturnsTrueWhenHealthCheckingActorReturnsUnknown() {
new TestKit(actorSystem) {{
final var statusInfo = StatusInfo.fromStatus(StatusInfo.Status.UNKNOWN);
actorSystem.actorOf(TestRootActor.props(statusInfo, getRef()), ROOT_ACTOR_NAME);

expectMsg(TEST_ACTOR_READY);

assertHealthCheckResult(subsystemHealthCheck, true);
}};
}

@Test
public void testCheckReturnsFalseWhenHealthCheckingActorReturnsDown() {
new TestKit(actorSystem) {{
final var statusInfo = StatusInfo.fromStatus(StatusInfo.Status.DOWN);
actorSystem.actorOf(TestRootActor.props(statusInfo, getRef()), ROOT_ACTOR_NAME);

expectMsg(TEST_ACTOR_READY);

assertHealthCheckResult(subsystemHealthCheck, false);
}};
}

@Test
public void testCheckReturnsFalseWhenHealthCheckingActorDoesNotExist() {
new TestKit(actorSystem) {{
assertHealthCheckResult(subsystemHealthCheck, false);
}};
}

private static void assertHealthCheckResult(SubsystemHealthCheck subsystemHealthCheck, boolean expected) {
final var healthCheckResultFuture = subsystemHealthCheck.get();
Awaitility.await()
.atMost(Duration.ofNanos(2 * HEALTH_CHECK_TIMEOUT.toNanos()))
.until(() -> healthCheckResultFuture.toCompletableFuture().isDone());
assertThat(healthCheckResultFuture.toCompletableFuture())
.isCompletedWithValue(expected);
}

private static class TestRootActor extends AbstractActor {

private TestRootActor(final StatusInfo statusInfo, final ActorRef actorToNotify) {
getContext().actorOf(TestHealthCheckingActor.props(statusInfo, actorToNotify),
DefaultHealthCheckingActorFactory.ACTOR_NAME);
}

public static Props props(final StatusInfo statusInfo, final ActorRef actorToNotify) {
return Props.create(TestRootActor.class, statusInfo, actorToNotify);
}

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.matchAny(this::unhandled)
.build();
}

}

private static class TestHealthCheckingActor extends AbstractActor {

private final StatusInfo statusInfo;
private final ActorRef actorToNotify;

private TestHealthCheckingActor(final StatusInfo statusInfo, final ActorRef actorToNotify) {
this.statusInfo = statusInfo;
this.actorToNotify = actorToNotify;
}

public static Props props(final StatusInfo statusInfo, final ActorRef actorToNotify) {
return Props.create(TestHealthCheckingActor.class, statusInfo, actorToNotify);
}

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(RetrieveHealth.class, this::returnStatusInfo)
.build();
}

@Override
public void preStart() {
actorToNotify.tell(TEST_ACTOR_READY, getSelf());
}

private void returnStatusInfo(final RetrieveHealth command) {
getSender().tell(this.statusInfo, getSelf());
}
}

}
4 changes: 4 additions & 0 deletions policies/service/src/main/resources/policies.conf
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ pekko {
"pekko-contrib-mongodb-persistence-policies-snapshots"
]
}

management.health-checks.liveness-checks {
subsystem-health = "org.eclipse.ditto.internal.utils.health.SubsystemHealthCheck"
}
}

pekko-contrib-mongodb-persistence-policies-journal {
Expand Down
4 changes: 4 additions & 0 deletions things/service/src/main/resources/things.conf
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ pekko {
]
}

management.health-checks.liveness-checks {
subsystem-health = "org.eclipse.ditto.internal.utils.health.SubsystemHealthCheck"
}

}

pekko-contrib-mongodb-persistence-things-journal {
Expand Down
4 changes: 4 additions & 0 deletions thingsearch/service/src/main/resources/search.conf
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ pekko {
"blocked-namespaces-aware",
]
}

management.health-checks.liveness-checks {
subsystem-health = "org.eclipse.ditto.internal.utils.health.SubsystemHealthCheck"
}
}

search-dispatcher {
Expand Down

0 comments on commit 2690786

Please sign in to comment.