Skip to content

Commit

Permalink
Add PolicySupervisorActorTest; fix unit tests.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Aug 23, 2022
1 parent 25bea13 commit 4fb6cdf
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public static Props props(final ActorRef commandForwarder,

@Override
protected ConnectionId getEntityId() throws Exception {
return ConnectionId.of(URLDecoder.decode(getSelf().path().name(), StandardCharsets.UTF_8.name()));
return ConnectionId.of(URLDecoder.decode(getSelf().path().name(), StandardCharsets.UTF_8));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,14 @@ protected void becomeCreatedHandler() {
final CommandStrategy<C, S, K, E> commandStrategy = getCreatedStrategy();

final Receive receive = handleCleanups.orElse(ReceiveBuilder.create()
.match(commandStrategy.getMatchingClass(), commandStrategy::isDefined, this::handleByCommandStrategy)
.match(PersistEmptyEvent.class, this::handlePersistEmptyEvent)
.match(CheckForActivity.class, this::checkForActivity)
.match(PingCommand.class, this::processPingCommand)
.matchEquals(Control.TAKE_SNAPSHOT, this::takeSnapshotByInterval)
.match(SaveSnapshotSuccess.class, this::saveSnapshotSuccess)
.match(SaveSnapshotFailure.class, this::saveSnapshotFailure)
.build())
.match(commandStrategy.getMatchingClass(), commandStrategy::isDefined, this::handleByCommandStrategy)
.match(PersistEmptyEvent.class, this::handlePersistEmptyEvent)
.match(CheckForActivity.class, this::checkForActivity)
.match(PingCommand.class, this::processPingCommand)
.matchEquals(Control.TAKE_SNAPSHOT, this::takeSnapshotByInterval)
.match(SaveSnapshotSuccess.class, this::saveSnapshotSuccess)
.match(SaveSnapshotFailure.class, this::saveSnapshotFailure)
.build())
.orElse(matchAnyAfterInitialization());

getContext().become(receive);
Expand Down Expand Up @@ -417,11 +417,11 @@ protected void passivate() {

private Receive createDeletedBehavior() {
return handleCleanups.orElse(handleByDeletedStrategyReceiveBuilder()
.match(CheckForActivity.class, this::checkForActivity)
.matchEquals(Control.TAKE_SNAPSHOT, this::takeSnapshotByInterval)
.match(SaveSnapshotSuccess.class, this::saveSnapshotSuccess)
.match(SaveSnapshotFailure.class, this::saveSnapshotFailure)
.build())
.match(CheckForActivity.class, this::checkForActivity)
.matchEquals(Control.TAKE_SNAPSHOT, this::takeSnapshotByInterval)
.match(SaveSnapshotSuccess.class, this::saveSnapshotSuccess)
.match(SaveSnapshotFailure.class, this::saveSnapshotFailure)
.build())
.orElse(matchAnyWhenDeleted());
}

Expand Down Expand Up @@ -618,6 +618,7 @@ private void notAccessible(final WithDittoHeaders withDittoHeaders) {
final DittoRuntimeExceptionBuilder<?> builder = newNotAccessibleExceptionBuilder()
.dittoHeaders(withDittoHeaders.getDittoHeaders());
notifySender(builder.build());
reportSudoCommandDone(withDittoHeaders);
}

private void shutdown(final String shutdownLogTemplate, final I entityId) {
Expand All @@ -629,7 +630,7 @@ private boolean isEntityActive() {
return entity != null && !entityExistsAsDeleted();
}

private void reportSudoCommandDone(final Command<?> command) {
private void reportSudoCommandDone(final WithDittoHeaders command) {
if (command instanceof SudoCommand || command.getDittoHeaders().isSudo()) {
getSudoCommandDoneRecipient().tell(AbstractPersistenceSupervisor.Control.SUDO_COMMAND_DONE, getSelf());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ public Receive createReceive() {
unstashAll();
becomeActive(getShutdownBehaviour(entityId));
})
.match(StopShardedActor.class, this::stopBeforeRecovery)
.matchAny(this::handleMessagesDuringStartup)
.build();
}
Expand Down Expand Up @@ -350,11 +349,6 @@ protected <T> CompletionStage<Object> askTargetActor(final T message, final bool
});
}

private void stopBeforeRecovery(final StopShardedActor trigger) {
log.debug("Stopping before recovery due to shard region shutdown");
getContext().stop(getSelf());
}

private void stopAfterOngoingOps(final StopShardedActor trigger) {
if (opCounter == 0 && sudoOpCounter == 0) {
log.debug("Stopping: no ongoing ops.");
Expand All @@ -369,7 +363,7 @@ private FI.UnitApply<Control> decrementOpCounter(final FI.UnitApply<Control> mat
return control -> {
--opCounter;
matchProcessNextTwinMessageBehavior.apply(control);
if (opCounter == 0 && sudoOpCounter == 0) {
if (inCoordinatedShutdown && opCounter == 0 && sudoOpCounter == 0) {
log.debug("Stopping after waiting for ongoing ops.");
getContext().stop(getSelf());
}
Expand All @@ -378,7 +372,7 @@ private FI.UnitApply<Control> decrementOpCounter(final FI.UnitApply<Control> mat

private void decrementSudoOpCounter(final Control sudoCommandDone) {
--sudoOpCounter;
if (opCounter == 0 && sudoOpCounter == 0) {
if (inCoordinatedShutdown && opCounter == 0 && sudoOpCounter == 0) {
log.debug("Stopping after waiting for ongoing sudo ops.");
getContext().stop(getSelf());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.policies.service.persistence.actors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.internal.utils.cluster.StopShardedActor;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.internal.utils.pubsub.DistributedPub;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.policies.model.signals.announcements.PolicyAnnouncement;
import org.eclipse.ditto.policies.model.signals.commands.exceptions.PolicyNotAccessibleException;
import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicy;
import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicyResponse;
import org.eclipse.ditto.policies.model.signals.commands.query.RetrievePolicy;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;

import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;

/**
* Tests {@link PolicySupervisorActor}.
*/
public final class PolicySupervisorActorTest extends PersistenceActorTestBase {

@SuppressWarnings("unchecked")
@Mock
public DistributedPub<PolicyAnnouncement<?>> pub = mock(DistributedPub.class);

@Mock public BlockedNamespaces blockedNamespaces = mock(BlockedNamespaces.class);

@Before
public void setup() {
setUpBase();
}


@Test
public void stopNonexistentPolicy() {
new TestKit(actorSystem) {{
final PolicyId policyId = PolicyId.of("test.ns", "stopNonexistentPolicy");
final var props = PolicySupervisorActor.props(pubSubMediator, pub, blockedNamespaces);
final var underTest = watch(childActorOf(props, policyId.toString()));
underTest.tell(new StopShardedActor(), getRef());
expectTerminated(underTest);
}};
}

@Test
public void stopAfterRetrievingNonexistentPolicy() {
new TestKit(actorSystem) {{
final PolicyId policyId = PolicyId.of("test.ns", "retrieveNonexistentPolicy");
final var props = PolicySupervisorActor.props(pubSubMediator, pub, blockedNamespaces);
final var underTest = watch(childActorOf(props, policyId.toString()));
final var probe = TestProbe.apply(actorSystem);
final var retrievePolicy = RetrievePolicy.of(policyId, DittoHeaders.empty());
underTest.tell(retrievePolicy, probe.ref());
underTest.tell(new StopShardedActor(), getRef());
expectTerminated(underTest);
probe.expectMsgClass(PolicyNotAccessibleException.class);
}};
}

@Test
public void stopAfterRetrievingExistingPolicy() {
new TestKit(actorSystem) {{
final var policy = createPolicyWithRandomId();
final var policyId = policy.getEntityId().orElseThrow();
final var props = PolicySupervisorActor.props(pubSubMediator, pub, blockedNamespaces);
final var underTest = watch(childActorOf(props, policyId.toString()));
final var probe = TestProbe.apply(actorSystem);

final var createPolicy = CreatePolicy.of(policy, dittoHeadersV2);
underTest.tell(createPolicy, probe.ref());
final var response = probe.expectMsgClass(CreatePolicyResponse.class);
assertThat(response.getPolicyCreated().orElseThrow().getEntriesSet()).isEqualTo(policy.getEntriesSet());

final var retrievePolicy = RetrievePolicy.of(policyId, DittoHeaders.empty());
underTest.tell(retrievePolicy, probe.ref());
underTest.tell(new StopShardedActor(), getRef());
expectTerminated(underTest);
probe.expectMsgClass(PolicyNotAccessibleException.class);
}};
}
}

0 comments on commit 4fb6cdf

Please sign in to comment.