From fa4e016f93c64863838b954e3f17f9ef3770f245 Mon Sep 17 00:00:00 2001 From: Yufei Cai Date: Mon, 15 Feb 2021 18:19:39 +0100 Subject: [PATCH] [#964] fix PoliciesServiceGlobalCommandRegistryTest and PoliciesRootActorTest. Signed-off-by: Yufei Cai --- .../services/policies/starter/PoliciesRootActor.java | 2 +- .../PoliciesServiceGlobalCommandRegistryTest.java | 2 ++ .../services/utils/pubsub/DittoProtocolSubImpl.java | 3 ++- .../utils/pubsub/PolicyNotificationPubSubFactory.java | 10 ++++++---- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/services/policies/starter/src/main/java/org/eclipse/ditto/services/policies/starter/PoliciesRootActor.java b/services/policies/starter/src/main/java/org/eclipse/ditto/services/policies/starter/PoliciesRootActor.java index 122312209d..7939216a35 100755 --- a/services/policies/starter/src/main/java/org/eclipse/ditto/services/policies/starter/PoliciesRootActor.java +++ b/services/policies/starter/src/main/java/org/eclipse/ditto/services/policies/starter/PoliciesRootActor.java @@ -71,7 +71,7 @@ private PoliciesRootActor(final PoliciesConfig policiesConfig, // Start distributed data replicator even though it is not used for now. // TODO: use the DistributedSub for notification publication - PolicyNotificationPubSubFactory.of(actorSystem).startDistributedPub(); + PolicyNotificationPubSubFactory.of(getContext(), actorSystem).startDistributedPub(); final Props policySupervisorProps = PolicySupervisorActor.props(pubSubMediator, snapshotAdapter); diff --git a/services/policies/starter/src/test/java/org/eclipse/ditto/services/policies/starter/PoliciesServiceGlobalCommandRegistryTest.java b/services/policies/starter/src/test/java/org/eclipse/ditto/services/policies/starter/PoliciesServiceGlobalCommandRegistryTest.java index d9e2bd9312..5b27c57774 100644 --- a/services/policies/starter/src/test/java/org/eclipse/ditto/services/policies/starter/PoliciesServiceGlobalCommandRegistryTest.java +++ b/services/policies/starter/src/test/java/org/eclipse/ditto/services/policies/starter/PoliciesServiceGlobalCommandRegistryTest.java @@ -15,6 +15,7 @@ import org.eclipse.ditto.services.models.policies.commands.sudo.SudoRetrievePolicy; import org.eclipse.ditto.services.models.streaming.SudoStreamPids; import org.eclipse.ditto.services.utils.health.RetrieveHealth; +import org.eclipse.ditto.services.utils.pubsub.api.PublishSignal; import org.eclipse.ditto.services.utils.test.GlobalCommandRegistryTestCases; import org.eclipse.ditto.signals.commands.cleanup.CleanupPersistence; import org.eclipse.ditto.signals.commands.common.Shutdown; @@ -48,6 +49,7 @@ public PoliciesServiceGlobalCommandRegistryTest() { CleanupPersistence.class, RetrieveHealth.class, PurgeEntities.class, + PublishSignal.class, // added due to ditto-model-placeholders CreateSubscription.class, diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/DittoProtocolSubImpl.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/DittoProtocolSubImpl.java index 3bff0e9d6d..5450ea2c09 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/DittoProtocolSubImpl.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/DittoProtocolSubImpl.java @@ -56,7 +56,8 @@ static DittoProtocolSubImpl of(final ActorSystem system, final DistributedAcks d LiveSignalPubSubFactory.of(system, distributedAcks).startDistributedSub(); final DistributedSub twinEventSub = ThingEventPubSubFactory.readSubjectsOnly(system, distributedAcks).startDistributedSub(); - final DistributedSub policyNotificationSub = PolicyNotificationPubSubFactory.of(system).startDistributedSub(); + final DistributedSub policyNotificationSub = + PolicyNotificationPubSubFactory.of(system, system).startDistributedSub(); return new DittoProtocolSubImpl(liveSignalSub, twinEventSub, policyNotificationSub, distributedAcks); } diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/PolicyNotificationPubSubFactory.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/PolicyNotificationPubSubFactory.java index ecd1b7959e..c73d9be66b 100644 --- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/PolicyNotificationPubSubFactory.java +++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/PolicyNotificationPubSubFactory.java @@ -15,6 +15,7 @@ import org.eclipse.ditto.services.utils.pubsub.extractors.AckExtractor; import org.eclipse.ditto.signals.notifications.policies.PolicyNotification; +import akka.actor.ActorRefFactory; import akka.actor.ActorSystem; /** @@ -28,18 +29,19 @@ public final class PolicyNotificationPubSubFactory extends AbstractPubSubFactory AckExtractor.of(PolicyNotification::getEntityId, PolicyNotification::getDittoHeaders); @SuppressWarnings({"unchecked"}) - private PolicyNotificationPubSubFactory(final ActorSystem actorSystem) { - super(actorSystem, actorSystem, (Class>) (Object) PolicyNotification.class, + private PolicyNotificationPubSubFactory(final ActorRefFactory actorRefFactory, final ActorSystem actorSystem) { + super(actorRefFactory, actorSystem, (Class>) (Object) PolicyNotification.class, new PolicyNotificationTopicExtractor(), PROVIDER, ACK_EXTRACTOR, DistributedAcks.empty()); } /** * Create a pubsub factory for thing events ignoring shard ID topics. * + * @param actorRefFactory the factory with which to create the sub-supervisor actor.l * @param system the actor system. * @return the thing event pub-sub factory. */ - public static PolicyNotificationPubSubFactory of(final ActorSystem system) { - return new PolicyNotificationPubSubFactory(system); + public static PolicyNotificationPubSubFactory of(final ActorRefFactory actorRefFactory, final ActorSystem system) { + return new PolicyNotificationPubSubFactory(actorRefFactory, system); } }