Skip to content

Commit

Permalink
Delete SearchActorTest#terminateStreams; log blocked namespaces repli…
Browse files Browse the repository at this point in the history
…cator.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 16, 2022
1 parent c39d275 commit 15dbf62
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.internal.utils.persistentactors.PersistencePingActor;
import org.eclipse.ditto.internal.utils.persistentactors.cleanup.PersistenceCleanupActor;
import org.eclipse.ditto.internal.utils.pubsubpolicies.PolicyAnnouncementPubSubFactory;
import org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub;

import com.typesafe.config.Config;
Expand Down Expand Up @@ -94,7 +93,7 @@ private ConnectivityRootActor(final ConnectivityConfig connectivityConfig,
// because of serialization issues the single BaseClientActors "get" the extension themselves
// it must however be started here in order to already participate in Ditto pub/sub, even if no connection is
// available!
BlockedNamespaces.of(actorSystem);
log.info("Started blocked namespaces replicator <{}>", BlockedNamespaces.of(actorSystem).getReplicator());
DittoProtocolSub.get(actorSystem);

final MongoReadJournal mongoReadJournal = MongoReadJournal.newInstance(actorSystem);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package org.eclipse.ditto.thingsearch.service.starter.actors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;

import java.util.List;
Expand All @@ -26,10 +25,8 @@
import org.eclipse.ditto.internal.utils.akka.ActorSystemResource;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.thingsearch.api.commands.sudo.StreamThings;
import org.eclipse.ditto.thingsearch.api.commands.sudo.SudoCountThings;
import org.eclipse.ditto.thingsearch.model.signals.commands.ThingSearchCommand;
import org.eclipse.ditto.thingsearch.model.signals.commands.exceptions.SubscriptionAbortedException;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.CountThings;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.CountThingsResponse;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.QueryThings;
Expand All @@ -50,11 +47,8 @@
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.pf.PFBuilder;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.RemoteStreamRefActorTerminatedException;
import akka.stream.SourceRef;
import akka.stream.SystemMaterializer;
import akka.stream.javadsl.Source;
import akka.testkit.TestProbe;
Expand Down Expand Up @@ -86,7 +80,8 @@ public void unbindAndStopWithoutQuery() {
final var underTest = childActorOf(props, SearchActor.ACTOR_NAME);

final var expectedSubscribe =
DistPubSubAccess.subscribeViaGroup(ThingSearchCommand.TYPE_PREFIX, SearchActor.ACTOR_NAME, underTest);
DistPubSubAccess.subscribeViaGroup(ThingSearchCommand.TYPE_PREFIX, SearchActor.ACTOR_NAME,
underTest);
expectMsg(expectedSubscribe);
reply(new DistributedPubSubMediator.SubscribeAck(expectedSubscribe));

Expand All @@ -112,7 +107,8 @@ public void waitForQueries() {
final var underTest = childActorOf(props, SearchActor.ACTOR_NAME);

final var expectedSubscribe =
DistPubSubAccess.subscribeViaGroup(ThingSearchCommand.TYPE_PREFIX, SearchActor.ACTOR_NAME, underTest);
DistPubSubAccess.subscribeViaGroup(ThingSearchCommand.TYPE_PREFIX, SearchActor.ACTOR_NAME,
underTest);
expectMsg(expectedSubscribe);
reply(new DistributedPubSubMediator.SubscribeAck(expectedSubscribe));

Expand Down Expand Up @@ -159,42 +155,6 @@ public void waitForQueries() {
}};
}

@Test
public void terminateStreams() {
final var system = actorSystemResource.getActorSystem();
new TestKit(system) {{
final var props = SearchActor.props(queryParser, persistence, getRef());
final var underTest = childActorOf(props, SearchActor.ACTOR_NAME);

final var expectedSubscribe =
DistPubSubAccess.subscribeViaGroup(ThingSearchCommand.TYPE_PREFIX, SearchActor.ACTOR_NAME, underTest);
expectMsg(expectedSubscribe);
reply(new DistributedPubSubMediator.SubscribeAck(expectedSubscribe));

final var serviceRequestsDone = SearchActor.Control.SERVICE_REQUESTS_DONE;
use(p -> p.findAllUnlimited(any(), any(), any()));

final var stream =
StreamThings.of(null, null, null, null, DittoHeaders.empty());

underTest.tell(stream, getRef());
final SourceRef<?> sourceRef = expectMsgClass(SourceRef.class);
final var elementProbe = TestProbe.apply(system);
sourceRef.getSource()
.<Object>map(x -> x)
.recover(new PFBuilder<Throwable, Object>().matchAny(x -> x).build())
.runForeach(e -> elementProbe.ref().tell(e, ActorRef.noSender()), system);

underTest.tell(serviceRequestsDone, getRef());
final Throwable e = elementProbe.expectMsgClass(RemoteStreamRefActorTerminatedException.class);
assertThat(e.getMessage()).contains(SubscriptionAbortedException.MESSAGE);
expectMsg(Done.getInstance());

// terminate actor in order not to prevent actor system shutdown
underTest.tell(PoisonPill.getInstance(), getRef());
}};
}

private ActorRef use(final Consumer<ThingsSearchPersistence> stubberConsumer) {
final akka.japi.function.Function<Object, Optional<CompletionStrategy>> completionStrategy =
msg -> Optional.of(msg).filter("complete"::equals).map(m -> CompletionStrategy.draining());
Expand Down

0 comments on commit 15dbf62

Please sign in to comment.