Skip to content

Commit

Permalink
Subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Sep 19, 2017
1 parent c649bad commit 2fdaaae
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import akka.testkit.javadsl.TestKit;
import akka.typed.ActorRef;
import akka.typed.Behavior;
import akka.typed.cluster.ddata.javadsl.Replicator.Command;
import akka.typed.javadsl.Actor;
import akka.typed.javadsl.Adapter;
import akka.typed.javadsl.Actor.MutableBehavior;
import akka.typed.javadsl.ActorContext;

public class ReplicatorTest extends JUnitSuite {

Expand All @@ -40,6 +43,14 @@ static final class GetValue implements ClientCommand {
}
}

static final class GetCachedValue implements ClientCommand {
final ActorRef<Integer> replyTo;

GetCachedValue(ActorRef<Integer> replyTo) {
this.replyTo = replyTo;
}
}

static interface InternalMsg extends ClientCommand {
}

Expand All @@ -59,44 +70,80 @@ public InternalGetResponse(Replicator.GetResponse<A> rsp) {
}
}

static final class InternalChanged<A extends ReplicatedData> implements InternalMsg {
final Replicator.Changed<A> chg;

public InternalChanged(Replicator.Changed<A> chg) {
this.chg = chg;
}
}

static final Key<GCounter> Key = GCounterKey.create("counter");

static Behavior<ClientCommand> client(ActorRef<Replicator.Command<?>> replicator, Cluster node) {
return Actor.deferred(c -> {
static class Client extends MutableBehavior<ClientCommand> {
private final ActorRef<Replicator.Command<?>> replicator;
private final Cluster node;
final ActorRef<Replicator.UpdateResponse<GCounter>> updateResponseAdapter;
final ActorRef<Replicator.GetResponse<GCounter>> getResponseAdapter;
final ActorRef<Replicator.Changed<GCounter>> changedAdapter;

private int cachedValue = 0;

public Client(ActorRef<Command<?>> replicator, Cluster node, ActorContext<ClientCommand> ctx) {
this.replicator = replicator;
this.node = node;

final ActorRef<Replicator.UpdateResponse<GCounter>> updateResponseAdapter =
c.spawnAdapter(m -> new InternalUpdateResponse<>(m));
updateResponseAdapter = ctx.spawnAdapter(m -> new InternalUpdateResponse<>(m));

final ActorRef<Replicator.GetResponse<GCounter>> getResponseAdapter =
c.spawnAdapter(m -> new InternalGetResponse<>(m));
getResponseAdapter = ctx.spawnAdapter(m -> new InternalGetResponse<>(m));

return Actor.immutable(ClientCommand.class)
.onMessage(Increment.class, (ctx, cmd) -> {
changedAdapter = ctx.spawnAdapter(m -> new InternalChanged<>(m));

replicator.tell(new Replicator.Subscribe<>(Key, changedAdapter));
}

public static Behavior<ClientCommand> create(ActorRef<Command<?>> replicator, Cluster node) {
return Actor.mutable(ctx -> new Client(replicator, node, ctx));
}

@Override
public Actor.Receive<ClientCommand> createReceive() {
return receiveBuilder()
.onMessage(Increment.class, cmd -> {
replicator.tell(
new Replicator.Update<GCounter>(Key, GCounter.empty(), Replicator.writeLocal(), updateResponseAdapter,
curr -> curr.increment(node, 1)));
return Actor.same();
return this;
})
.onMessage(InternalUpdateResponse.class, (ctx, msg) -> {
return Actor.same();
.onMessage(InternalUpdateResponse.class, msg -> {
return this;
})
.onMessage(GetValue.class, (ctx, cmd) -> {
.onMessage(GetValue.class, cmd -> {
replicator.tell(
new Replicator.Get<GCounter>(Key, Replicator.readLocal(), getResponseAdapter, Optional.of(cmd.replyTo)));
return Actor.same();
return this;
})
.onMessage(GetCachedValue.class, cmd -> {
cmd.replyTo.tell(cachedValue);
return this;
})
.onMessage(InternalGetResponse.class, (ctx, msg) -> {
.onMessage(InternalGetResponse.class, msg -> {
if (msg.rsp instanceof Replicator.GetSuccess) {
int value = ((Replicator.GetSuccess<?>) msg.rsp).get(Key).getValue().intValue();
ActorRef<Integer> replyTo = (ActorRef<Integer>) msg.rsp.request().get();
replyTo.tell(value);
} else {
// not dealing with failures
}
return Actor.same();
return this;
})
.onMessage(InternalChanged.class, msg -> {
GCounter counter = (GCounter) msg.chg.get(Key);
cachedValue = counter.getValue().intValue();
return this;
})
.build();
});
}
}


Expand All @@ -118,17 +165,42 @@ akka.typed.ActorSystem<?> typedSystem() {


@Test
public void apiPrototype() {
public void shouldHaveApiForUpdateAndGet() {
TestKit probe = new TestKit(system);
akka.cluster.ddata.ReplicatorSettings settings = ReplicatorSettings.apply(typedSystem());
ActorRef<Replicator.Command<?>> replicator =
Adapter.spawn(system, Replicator.behavior(settings), "replicator");
Adapter.spawnAnonymous(system, Replicator.behavior(settings));
ActorRef<ClientCommand> client =
Adapter.spawnAnonymous(system, client(replicator, Cluster.get(system)));
Adapter.spawnAnonymous(system, Client.create(replicator, Cluster.get(system)));

client.tell(new Increment());
client.tell(new GetValue(Adapter.toTyped(probe.getRef())));
probe.expectMsg(1);
}

@Test
public void shouldHaveApiForSubscribe() {
TestKit probe = new TestKit(system);
akka.cluster.ddata.ReplicatorSettings settings = ReplicatorSettings.apply(typedSystem());
ActorRef<Replicator.Command<?>> replicator =
Adapter.spawnAnonymous(system, Replicator.behavior(settings));
ActorRef<ClientCommand> client =
Adapter.spawnAnonymous(system, Client.create(replicator, Cluster.get(system)));

client.tell(new Increment());
client.tell(new Increment());
probe.awaitAssert(() -> {
client.tell(new GetCachedValue(Adapter.toTyped(probe.getRef())));
probe.expectMsg(2);
return null;
});

client.tell(new Increment());
probe.awaitAssert(() -> {
client.tell(new GetCachedValue(Adapter.toTyped(probe.getRef())));
probe.expectMsg(3);
return null;
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import akka.typed.scaladsl.adapter._
import akka.typed.testkit.TestKitSettings
import akka.typed.testkit.scaladsl._
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.Eventually

object ReplicatorSpec {

Expand All @@ -32,9 +33,11 @@ object ReplicatorSpec {
sealed trait ClientCommand
final case object Increment extends ClientCommand
final case class GetValue(replyTo: ActorRef[Int]) extends ClientCommand
final case class GetCachedValue(replyTo: ActorRef[Int]) extends ClientCommand
private sealed trait InternalMsg extends ClientCommand
private case class InternalUpdateResponse[A <: ReplicatedData](rsp: Replicator.UpdateResponse[A]) extends InternalMsg
private case class InternalGetResponse[A <: ReplicatedData](rsp: Replicator.GetResponse[A]) extends InternalMsg
private case class InternalChanged[A <: ReplicatedData](chg: Replicator.Changed[A]) extends InternalMsg

val Key = GCounterKey("counter")

Expand All @@ -46,33 +49,51 @@ object ReplicatorSpec {
val getResponseAdapter: ActorRef[Replicator.GetResponse[GCounter]] =
ctx.spawnAdapter(InternalGetResponse.apply)

Actor.immutable[ClientCommand] { (ctx, msg)
msg match {
case Increment
replicator ! Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal)(_ + 1)(updateResponseAdapter)
Actor.same
val changedAdapter: ActorRef[Replicator.Changed[GCounter]] =
ctx.spawnAdapter(InternalChanged.apply)

case GetValue(replyTo)
replicator ! Replicator.Get(Key, Replicator.ReadLocal, Some(replyTo))(getResponseAdapter)
Actor.same
replicator ! Replicator.Subscribe(Key, changedAdapter)

case internal: InternalMsg internal match {
case InternalUpdateResponse(_) Actor.same // ok
def behavior(cachedValue: Int): Behavior[ClientCommand] = {
Actor.immutable[ClientCommand] { (ctx, msg)
msg match {
case Increment
replicator ! Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal)(_ + 1)(updateResponseAdapter)
Actor.same

case GetValue(replyTo)
replicator ! Replicator.Get(Key, Replicator.ReadLocal, Some(replyTo))(getResponseAdapter)
Actor.same

case InternalGetResponse(rsp @ Replicator.GetSuccess(Key, Some(replyTo: ActorRef[Int] @unchecked)))
val value = rsp.get(Key).value.toInt
replyTo ! value
case GetCachedValue(replyTo)
replicator ! Replicator.Get(Key, Replicator.ReadLocal, Some(replyTo))(getResponseAdapter)
Actor.same

case InternalGetResponse(rsp)
Actor.unhandled // not dealing with failures
case internal: InternalMsg internal match {
case InternalUpdateResponse(_) Actor.same // ok

case InternalGetResponse(rsp @ Replicator.GetSuccess(Key, Some(replyTo: ActorRef[Int] @unchecked)))
val value = rsp.get(Key).value.toInt
replyTo ! value
Actor.same

case InternalGetResponse(rsp)
Actor.unhandled // not dealing with failures

case InternalChanged(chg @ Replicator.Changed(Key))
val value = chg.get(Key).value.intValue
behavior(value)
}
}
}
}

behavior(cachedValue = 0)
}

}

class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) {
class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) with Eventually {
import ReplicatorSpec._

trait RealTests extends StartSupport {
Expand All @@ -81,10 +102,8 @@ class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) {
val settings = ReplicatorSettings(system)
implicit val cluster = Cluster(system.toUntyped)

def `API prototype`(): Unit = {

def `have API for Update and Get`(): Unit = {
val replicator = start(Replicator.behavior(settings))

val c = start(client(replicator))

val probe = TestProbe[Int]
Expand All @@ -93,6 +112,24 @@ class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) {
probe.expectMsg(1)
}

def `have API for Subscribe`(): Unit = {
val replicator = start(Replicator.behavior(settings))
val c = start(client(replicator))

val probe = TestProbe[Int]
c ! Increment
c ! Increment
eventually {
c ! GetCachedValue(probe.ref)
probe.expectMsg(2)
}
c ! Increment
eventually {
c ! GetCachedValue(probe.ref)
probe.expectMsg(3)
}
}

}

object `A ReplicatorBehavior (real, adapted)` extends RealTests with AdaptedSystem
Expand Down

0 comments on commit 2fdaaae

Please sign in to comment.