Skip to content

Commit

Permalink
Make ThingUpdater participate in shard hand-off; test self restart of…
Browse files Browse the repository at this point in the history
… sharded actors.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Aug 11, 2022
1 parent daa0001 commit bb3549f
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,59 @@ public void testHandOffMessage() throws Exception {
}};
}

@Test
public void testSelfRestart() throws Exception {
new TestKit(system2) {{
// GIVEN: 2 actor systems form a cluster with shard regions started on both
final var latch = new CountDownLatch(2);
final var cluster1 = Cluster.get(system1);
final var cluster2 = Cluster.get(system2);
cluster1.registerOnMemberUp(latch::countDown);
cluster2.registerOnMemberUp(latch::countDown);
cluster1.join(cluster1.selfAddress());
cluster2.join(cluster1.selfAddress());
latch.await();

final var props = Props.create(MessageForwarder.class, testActor());
final var shardName = "shard";
final var role = "dc-default";
final var extractor = new DummyExtractor();
final var shard1 = ShardRegionCreator.start(system1, shardName, props, extractor, role);
final var shard2 = ShardRegionCreator.start(system2, shardName, props, extractor, role);
final var proxy1 = ClusterSharding.get(system1).startProxy(shardName, Optional.of(role), extractor);

// GIVEN: a sharded actor is started
final var signal = DeleteThing.of(ThingId.of("thing:id"), DittoHeaders.empty());
proxy1.tell(signal, testActor());
final var firstShardedActor = expectMsgClass(ActorRef.class);
expectMsgClass(DeleteThing.class);

final var startedInSystem1 = isShardedActorIn(firstShardedActor, system1);
final var startedInSystem2 = isShardedActorIn(firstShardedActor, system2);
assertThat(startedInSystem1)
.describedAs("Sharded actor should start in exactly 1 actor system")
.isNotEqualTo(startedInSystem2);

// WHEN: the shard region containing the started actor is shut down
final var shardOfFirstActor = startedInSystem1 ? shard1 : shard2;
shardOfFirstActor.tell(ShardRegion.GracefulShutdown$.MODULE$, ActorRef.noSender());

// THEN: the sharded actor receives the hand-off message
expectMsgClass(StopShardedActor.class);

// WHEN: the sharded actor queues another message to self before stoppin
firstShardedActor.tell(MessageForwarder.MESSAGE_SHARD, testActor());
expectMsg(MessageForwarder.MESSAGE_SHARD);
firstShardedActor.tell(PoisonPill.getInstance(), testActor());

// THEN: a new sharded actor for the same entity starts in the remaining shard region
final var activeSystem = startedInSystem1 ? system2 : system1;
final var secondShardedActor = expectMsgClass(Duration.apply(10, "s"), ActorRef.class);
assertThat(isShardedActorIn(secondShardedActor, activeSystem)).isTrue();
expectMsg(MessageForwarder.RESTART_TRIGGER);
}};
}

private static boolean isShardedActorIn(final ActorRef shardedActor, final ActorSystem system) {
final var relativePath =
shardedActor.path().elements().drop(1).reduce((x, y) -> x + "/" + y).toString();
Expand Down Expand Up @@ -142,6 +195,9 @@ public String shardId(final Object message) {

private static final class MessageForwarder extends AbstractActor {

private static final Object MESSAGE_SHARD = "message shard";
private static final Object RESTART_TRIGGER = "restart trigger";

private final ActorRef receiver;

private MessageForwarder(final ActorRef receiver) {
Expand All @@ -156,6 +212,12 @@ public void preStart() {
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.matchEquals(MESSAGE_SHARD, m -> {
ClusterSharding.get(getContext().getSystem())
.shardRegion("shard")
.tell(RESTART_TRIGGER, getSelf());
getSender().tell(MESSAGE_SHARD, getSelf());
})
.matchAny(message -> receiver.tell(message, getSelf()))
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;

import org.eclipse.ditto.internal.utils.cluster.ShardRegionCreator;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionExtractor;
import org.eclipse.ditto.policies.api.PoliciesMessagingConstants;
import org.eclipse.ditto.things.api.ThingsMessagingConstants;
Expand Down Expand Up @@ -115,11 +116,7 @@ public ActorRef getSearchUpdaterShardRegion(final int numberOfShards,
* @return the shard region.
*/
public ActorRef createShardRegion(final int shards, final Props props, final String name, final String role) {
final ClusterSharding clusterSharding = ClusterSharding.get(actorSystem);
final ClusterShardingSettings shardingSettings =
ClusterShardingSettings.create(actorSystem).withRole(role);
final ShardRegionExtractor shardRegionExtractor = ShardRegionExtractor.of(shards, actorSystem);
return clusterSharding.start(name, props, shardingSettings, shardRegionExtractor);
return ShardRegionCreator.start(actorSystem, name, props, shards, role);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLogger;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.StopShardedActor;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartedTimer;
Expand Down Expand Up @@ -226,6 +227,7 @@ private FSMStateFunctionBuilder<State, Data> unhandled() {
private FSMStateFunctionBuilder<State, Data> recovering() {
return matchEvent(AbstractWriteModel.class, this::recoveryComplete)
.event(Throwable.class, this::recoveryFailed)
.event(StopShardedActor.class, this::shutdown)
.event(ShutdownTrigger.class, this::shutdown)
.event(SHUTDOWN_CLASS, this::shutdownNow)
.event(DistributedPubSubMediator.SubscribeAck.class, this::subscribeAck)
Expand All @@ -242,6 +244,7 @@ private FSMStateFunctionBuilder<State, Data> ready() {
.event(PolicyReferenceTag.class, this::onPolicyReferenceTag)
.event(SudoUpdateThing.class, this::updateThing)
.eventEquals(Control.TICK, this::tick)
.event(StopShardedActor.class, this::shutdown)
.event(ShutdownTrigger.class, this::shutdown)
.event(SHUTDOWN_CLASS, this::shutdownNow)
.event(DistributedPubSubMediator.SubscribeAck.class, this::subscribeAck);
Expand All @@ -250,6 +253,7 @@ private FSMStateFunctionBuilder<State, Data> ready() {
private FSMStateFunctionBuilder<State, Data> persisting() {
return matchEvent(Result.class, this::onResult)
.event(Done.class, this::onDone)
.event(StopShardedActor.class, this::shutdown)
.event(ShutdownTrigger.class, this::shutdown)
.event(SHUTDOWN_CLASS, this::shutdownNow)
.event(DistributedPubSubMediator.SubscribeAck.class, this::subscribeAck)
Expand Down

0 comments on commit bb3549f

Please sign in to comment.