Skip to content

Commit

Permalink
Make StopShardedActor java-serializable for tests; restructure ShardR…
Browse files Browse the repository at this point in the history
…egionCreatorTest.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Aug 15, 2022
1 parent bb3549f commit 5d86bb2
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.internal.utils.cluster;

import java.io.Serializable;

import javax.annotation.concurrent.Immutable;

/**
Expand All @@ -20,4 +22,4 @@
* @since 3.0.0
*/
@Immutable
public record StopShardedActor() {}
public record StopShardedActor() implements Serializable {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.modify.DeleteThing;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.typesafe.config.Config;
Expand All @@ -36,8 +38,7 @@
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ShardRegion;
import akka.japi.pf.ReceiveBuilder;
import akka.testkit.TestKit;
import scala.concurrent.duration.Duration;
import akka.testkit.javadsl.TestKit;

/**
* Tests {@link ShardRegionCreator}.
Expand All @@ -49,26 +50,29 @@ public final class ShardRegionCreatorTest {
private final ActorSystem system1 = ActorSystem.create("system", CONFIG);
private final ActorSystem system2 = ActorSystem.create("system", CONFIG);

@Before
public void setUpCluster() throws Exception {
final var latch = new CountDownLatch(2);
final var cluster1 = Cluster.get(system1);
final var cluster2 = Cluster.get(system2);
cluster1.registerOnMemberUp(latch::countDown);
cluster2.registerOnMemberUp(latch::countDown);
cluster1.join(cluster1.selfAddress());
cluster2.join(cluster1.selfAddress());
latch.await();
}

@After
public void terminateActorSystems() {
system1.terminate();
system2.terminate();
TestKit.shutdownActorSystem(system1);
TestKit.shutdownActorSystem(system2);
}

@Test
public void testHandOffMessage() throws Exception {
new TestKit(system2) {{
// GIVEN: 2 actor systems form a cluster with shard regions started on both
final var latch = new CountDownLatch(2);
final var cluster1 = Cluster.get(system1);
final var cluster2 = Cluster.get(system2);
cluster1.registerOnMemberUp(latch::countDown);
cluster2.registerOnMemberUp(latch::countDown);
cluster1.join(cluster1.selfAddress());
cluster2.join(cluster1.selfAddress());
latch.await();

final var props = Props.create(MessageForwarder.class, testActor());
final var props = Props.create(MessageForwarder.class, getRef());
final var shardName = "shard";
final var role = "dc-default";
final var extractor = new DummyExtractor();
Expand All @@ -78,7 +82,7 @@ public void testHandOffMessage() throws Exception {

// GIVEN: a sharded actor is started
final var signal = DeleteThing.of(ThingId.of("thing:id"), DittoHeaders.empty());
proxy1.tell(signal, testActor());
proxy1.tell(signal, getRef());
final var firstShardedActor = expectMsgClass(ActorRef.class);
expectMsgClass(DeleteThing.class);

Expand All @@ -96,14 +100,14 @@ public void testHandOffMessage() throws Exception {
expectMsgClass(StopShardedActor.class);

// THEN: the next message to the sharded actor is buffered
proxy1.tell(signal, testActor());
proxy1.tell(signal, getRef());

// WHEN: the sharded actor stops
firstShardedActor.tell(PoisonPill.getInstance(), testActor());
firstShardedActor.tell(PoisonPill.getInstance(), getRef());

// THEN: a new sharded actor for the same entity starts in the remaining shard region
final var activeSystem = startedInSystem1 ? system2 : system1;
final var secondShardedActor = expectMsgClass(Duration.apply(10, "s"), ActorRef.class);
final var secondShardedActor = expectMsgClass(Duration.ofSeconds(10), ActorRef.class);
assertThat(isShardedActorIn(secondShardedActor, activeSystem)).isTrue();

// THEN: the buffered message is processed by the new sharded actor
Expand All @@ -112,19 +116,10 @@ public void testHandOffMessage() throws Exception {
}

@Test
public void testSelfRestart() throws Exception {
public void testSelfRestart() {
new TestKit(system2) {{
// GIVEN: 2 actor systems form a cluster with shard regions started on both
final var latch = new CountDownLatch(2);
final var cluster1 = Cluster.get(system1);
final var cluster2 = Cluster.get(system2);
cluster1.registerOnMemberUp(latch::countDown);
cluster2.registerOnMemberUp(latch::countDown);
cluster1.join(cluster1.selfAddress());
cluster2.join(cluster1.selfAddress());
latch.await();

final var props = Props.create(MessageForwarder.class, testActor());
final var props = Props.create(MessageForwarder.class, getRef());
final var shardName = "shard";
final var role = "dc-default";
final var extractor = new DummyExtractor();
Expand All @@ -134,7 +129,7 @@ public void testSelfRestart() throws Exception {

// GIVEN: a sharded actor is started
final var signal = DeleteThing.of(ThingId.of("thing:id"), DittoHeaders.empty());
proxy1.tell(signal, testActor());
proxy1.tell(signal, getRef());
final var firstShardedActor = expectMsgClass(ActorRef.class);
expectMsgClass(DeleteThing.class);

Expand All @@ -152,13 +147,13 @@ public void testSelfRestart() throws Exception {
expectMsgClass(StopShardedActor.class);

// WHEN: the sharded actor queues another message to self before stoppin
firstShardedActor.tell(MessageForwarder.MESSAGE_SHARD, testActor());
firstShardedActor.tell(MessageForwarder.MESSAGE_SHARD, getRef());
expectMsg(MessageForwarder.MESSAGE_SHARD);
firstShardedActor.tell(PoisonPill.getInstance(), testActor());
firstShardedActor.tell(PoisonPill.getInstance(), getRef());

// THEN: a new sharded actor for the same entity starts in the remaining shard region
final var activeSystem = startedInSystem1 ? system2 : system1;
final var secondShardedActor = expectMsgClass(Duration.apply(10, "s"), ActorRef.class);
final var secondShardedActor = expectMsgClass(Duration.ofSeconds(10), ActorRef.class);
assertThat(isShardedActorIn(secondShardedActor, activeSystem)).isTrue();
expectMsg(MessageForwarder.RESTART_TRIGGER);
}};
Expand Down

0 comments on commit 5d86bb2

Please sign in to comment.