Skip to content

Commit

Permalink
fix tests, add test for shutdown command handling
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Apr 20, 2022
1 parent 527684d commit cd51816
Showing 1 changed file with 95 additions and 2 deletions.
Expand Up @@ -16,13 +16,18 @@

import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.eclipse.ditto.base.api.common.Shutdown;
import org.eclipse.ditto.base.api.common.ShutdownReasonFactory;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.internal.utils.akka.ActorSystemResource;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
Expand Down Expand Up @@ -55,6 +60,7 @@
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.MergeHub;
Expand Down Expand Up @@ -126,7 +132,7 @@ public void recoverLastWriteModel() {
inletProbe.ensureSubscription();
inletProbe.request(16);
inletProbe.expectNoMessage();
underTest.tell(ReceiveTimeout.getInstance(), ActorRef.noSender());
underTest.tell(ThingUpdater.ShutdownTrigger.DELETE, ActorRef.noSender());
expectTerminated(underTest, TEN_SECOND);
inletProbe.expectNoMessage();
}};
Expand Down Expand Up @@ -155,6 +161,93 @@ public void updateFromEvent() {
}};
}

@Test
public void handleShutdownDueToNamespacePurging() {
new TestKit(system) {{
// GIVEN: ThingUpdater recovers with a write model of revision 1234
final Props props =
ThingUpdater.props(flow, id -> Source.single(getThingWriteModel()), SEARCH_CONFIG, testActor());
final ActorRef underTest = watch(childActorOf(props, ACTOR_NAME));

expectMsgClass(DistributedPubSubMediator.Subscribe.class);

// WHEN: A shutdown command arrives
final var shutdown =
Shutdown.getInstance(ShutdownReasonFactory.getPurgeNamespaceReason(THING_ID.getNamespace()),
DittoHeaders.empty());
underTest.tell(shutdown, ActorRef.noSender());

// THEN: expect actor is terminated
expectTerminated(underTest, scala.concurrent.duration.Duration.apply(3, TimeUnit.SECONDS));
}};
}

@Test
public void handleShutdownDueToEntitiesPurging() {
new TestKit(system) {{
// GIVEN: ThingUpdater recovers with a write model of revision 1234
final Props props =
ThingUpdater.props(flow, id -> Source.single(getThingWriteModel()), SEARCH_CONFIG, testActor());
final ActorRef underTest = watch(childActorOf(props, ACTOR_NAME));

expectMsgClass(DistributedPubSubMediator.Subscribe.class);

// WHEN: A shutdown command arrives
final List<EntityId> purgedEntities = List.of(ThingId.of("some:id"), THING_ID, ThingId.of("other:id"));
final var shutdown =
Shutdown.getInstance(ShutdownReasonFactory.getPurgeEntitiesReason(purgedEntities),
DittoHeaders.empty());
underTest.tell(shutdown, ActorRef.noSender());

// THEN: expect actor is terminated
expectTerminated(underTest, scala.concurrent.duration.Duration.apply(3, TimeUnit.SECONDS));
}};
}


@Test
public void ignoreIrrelevantShutdownDueToNamespacePurging() {
new TestKit(system) {{
// GIVEN: ThingUpdater recovers with a write model of revision 1234
final Props props =
ThingUpdater.props(flow, id -> Source.single(getThingWriteModel()), SEARCH_CONFIG, testActor());
final ActorRef underTest = watch(childActorOf(props, ACTOR_NAME));

expectMsgClass(DistributedPubSubMediator.Subscribe.class);

// WHEN: A shutdown command arrives
final var shutdown =
Shutdown.getInstance(ShutdownReasonFactory.getPurgeNamespaceReason("foo"),
DittoHeaders.empty());
underTest.tell(shutdown, ActorRef.noSender());

// THEN: expect actor is not terminated
expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS));
}};
}

@Test
public void ignoreIrrelevantShutdownDueToEntitiesPurging() {
new TestKit(system) {{
// GIVEN: ThingUpdater recovers with a write model of revision 1234
final Props props =
ThingUpdater.props(flow, id -> Source.single(getThingWriteModel()), SEARCH_CONFIG, testActor());
final ActorRef underTest = watch(childActorOf(props, ACTOR_NAME));

expectMsgClass(DistributedPubSubMediator.Subscribe.class);

// WHEN: A shutdown command arrives
final List<EntityId> purgedEntities = List.of(ThingId.of("some:id"), ThingId.of("other:id"));
final var shutdown =
Shutdown.getInstance(ShutdownReasonFactory.getPurgeEntitiesReason(purgedEntities),
DittoHeaders.empty());
underTest.tell(shutdown, ActorRef.noSender());

// THEN: expect actor is not terminated
expectNoMessage(FiniteDuration.apply(1, TimeUnit.SECONDS));
}};
}

@Test
public void combineUpdatesFrom2Events() {
new TestKit(system) {{
Expand Down Expand Up @@ -182,7 +275,7 @@ public void combineUpdatesFrom2Events() {
assertThat(data.lastWriteModel()).isEqualTo(getThingWriteModel());

// THEN: no other updates are sent
underTest.tell(ReceiveTimeout.getInstance(), ActorRef.noSender());
underTest.tell(ThingUpdater.ShutdownTrigger.DELETE, ActorRef.noSender());
outletProbe.ensureSubscription();
outletProbe.expectRequest();
outletProbe.sendError(new IllegalStateException("Expected exception"));
Expand Down

0 comments on commit cd51816

Please sign in to comment.