Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed Data API for Akka Typed #23647

Merged
merged 7 commits into from
Sep 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.typed.cluster.ddata.javadsl;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.Optional;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;

import akka.actor.ActorSystem;
import akka.cluster.Cluster;
import akka.cluster.ddata.GCounter;
import akka.cluster.ddata.GCounterKey;
import akka.cluster.ddata.Key;
import akka.cluster.ddata.ReplicatedData;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.javadsl.TestKit;
import akka.typed.ActorRef;
import akka.typed.Behavior;
import akka.typed.cluster.ddata.javadsl.Replicator.Command;
import akka.typed.javadsl.Actor;
import akka.typed.javadsl.Adapter;
import akka.typed.javadsl.Actor.MutableBehavior;
import akka.typed.javadsl.ActorContext;

public class ReplicatorTest extends JUnitSuite {

static interface ClientCommand {
}

static final class Increment implements ClientCommand {
}

static final class GetValue implements ClientCommand {
final ActorRef<Integer> replyTo;

GetValue(ActorRef<Integer> replyTo) {
this.replyTo = replyTo;
}
}

static final class GetCachedValue implements ClientCommand {
final ActorRef<Integer> replyTo;

GetCachedValue(ActorRef<Integer> replyTo) {
this.replyTo = replyTo;
}
}

static interface InternalMsg extends ClientCommand {
}

static final class InternalUpdateResponse<A extends ReplicatedData> implements InternalMsg {
final Replicator.UpdateResponse<A> rsp;

public InternalUpdateResponse(Replicator.UpdateResponse<A> rsp) {
this.rsp = rsp;
}
}

static final class InternalGetResponse<A extends ReplicatedData> implements InternalMsg {
final Replicator.GetResponse<A> rsp;

public InternalGetResponse(Replicator.GetResponse<A> rsp) {
this.rsp = rsp;
}
}

static final class InternalChanged<A extends ReplicatedData> implements InternalMsg {
final Replicator.Changed<A> chg;

public InternalChanged(Replicator.Changed<A> chg) {
this.chg = chg;
}
}

static final Key<GCounter> Key = GCounterKey.create("counter");

static class Client extends MutableBehavior<ClientCommand> {
private final ActorRef<Replicator.Command> replicator;
private final Cluster node;
final ActorRef<Replicator.UpdateResponse<GCounter>> updateResponseAdapter;
final ActorRef<Replicator.GetResponse<GCounter>> getResponseAdapter;
final ActorRef<Replicator.Changed<GCounter>> changedAdapter;

private int cachedValue = 0;

public Client(ActorRef<Command> replicator, Cluster node, ActorContext<ClientCommand> ctx) {
this.replicator = replicator;
this.node = node;

updateResponseAdapter = ctx.spawnAdapter(m -> new InternalUpdateResponse<>(m));

getResponseAdapter = ctx.spawnAdapter(m -> new InternalGetResponse<>(m));

changedAdapter = ctx.spawnAdapter(m -> new InternalChanged<>(m));

replicator.tell(new Replicator.Subscribe<>(Key, changedAdapter));
}

public static Behavior<ClientCommand> create(ActorRef<Command> replicator, Cluster node) {
return Actor.mutable(ctx -> new Client(replicator, node, ctx));
}

@Override
public Actor.Receive<ClientCommand> createReceive() {
return receiveBuilder()
.onMessage(Increment.class, cmd -> {
replicator.tell(
new Replicator.Update<GCounter>(Key, GCounter.empty(), Replicator.writeLocal(), updateResponseAdapter,
curr -> curr.increment(node, 1)));
return this;
})
.onMessage(InternalUpdateResponse.class, msg -> {
return this;
})
.onMessage(GetValue.class, cmd -> {
replicator.tell(
new Replicator.Get<GCounter>(Key, Replicator.readLocal(), getResponseAdapter, Optional.of(cmd.replyTo)));
return this;
})
.onMessage(GetCachedValue.class, cmd -> {
cmd.replyTo.tell(cachedValue);
return this;
})
.onMessage(InternalGetResponse.class, msg -> {
if (msg.rsp instanceof Replicator.GetSuccess) {
int value = ((Replicator.GetSuccess<?>) msg.rsp).get(Key).getValue().intValue();
ActorRef<Integer> replyTo = (ActorRef<Integer>) msg.rsp.request().get();
replyTo.tell(value);
} else {
// not dealing with failures
}
return this;
})
.onMessage(InternalChanged.class, msg -> {
GCounter counter = (GCounter) msg.chg.get(Key);
cachedValue = counter.getValue().intValue();
return this;
})
.build();
}
}


static Config config = ConfigFactory.parseString(
"akka.actor.provider = cluster \n" +
"akka.remote.netty.tcp.port = 0 \n" +
"akka.remote.artery.canonical.port = 0 \n");

@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("ReplicatorTest",
config);

private final ActorSystem system = actorSystemResource.getSystem();

akka.typed.ActorSystem<?> typedSystem() {
return Adapter.toTyped(system);
}



@Test
public void shouldHaveApiForUpdateAndGet() {
TestKit probe = new TestKit(system);
akka.cluster.ddata.ReplicatorSettings settings = ReplicatorSettings.create(typedSystem());
ActorRef<Replicator.Command> replicator =
Adapter.spawnAnonymous(system, Replicator.behavior(settings));
ActorRef<ClientCommand> client =
Adapter.spawnAnonymous(system, Client.create(replicator, Cluster.get(system)));

client.tell(new Increment());
client.tell(new GetValue(Adapter.toTyped(probe.getRef())));
probe.expectMsg(1);
}

@Test
public void shouldHaveApiForSubscribe() {
TestKit probe = new TestKit(system);
akka.cluster.ddata.ReplicatorSettings settings = ReplicatorSettings.create(typedSystem());
ActorRef<Replicator.Command> replicator =
Adapter.spawnAnonymous(system, Replicator.behavior(settings));
ActorRef<ClientCommand> client =
Adapter.spawnAnonymous(system, Client.create(replicator, Cluster.get(system)));

client.tell(new Increment());
client.tell(new Increment());
probe.awaitAssert(() -> {
client.tell(new GetCachedValue(Adapter.toTyped(probe.getRef())));
probe.expectMsg(2);
return null;
});

client.tell(new Increment());
probe.awaitAssert(() -> {
client.tell(new GetCachedValue(Adapter.toTyped(probe.getRef())));
probe.expectMsg(3);
return null;
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster.ddata.scaladsl

import scala.concurrent.duration._

import akka.cluster.Cluster

import akka.cluster.ddata.GCounter
import akka.cluster.ddata.GCounterKey
import akka.cluster.ddata.ReplicatedData
import akka.typed.ActorRef
import akka.typed.ActorSystem
import akka.typed.Behavior
import akka.typed.TypedSpec
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.AskPattern._
import akka.typed.scaladsl.adapter._
import akka.typed.testkit.TestKitSettings
import akka.typed.testkit.scaladsl._
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.Eventually
import akka.util.Timeout
import akka.typed.cluster.ddata.scaladsl.Replicator._
import akka.actor.Scheduler
import scala.concurrent.Future

object ReplicatorSpec {

val config = ConfigFactory.parseString("""
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
""")

sealed trait ClientCommand
final case object Increment extends ClientCommand
final case class GetValue(replyTo: ActorRef[Int]) extends ClientCommand
final case class GetCachedValue(replyTo: ActorRef[Int]) extends ClientCommand
private sealed trait InternalMsg extends ClientCommand
private case class InternalUpdateResponse[A <: ReplicatedData](rsp: Replicator.UpdateResponse[A]) extends InternalMsg
private case class InternalGetResponse[A <: ReplicatedData](rsp: Replicator.GetResponse[A]) extends InternalMsg
private case class InternalChanged[A <: ReplicatedData](chg: Replicator.Changed[A]) extends InternalMsg

val Key = GCounterKey("counter")

def client(replicator: ActorRef[Replicator.Command])(implicit cluster: Cluster): Behavior[ClientCommand] =
Actor.deferred[ClientCommand] { ctx ⇒
val updateResponseAdapter: ActorRef[Replicator.UpdateResponse[GCounter]] =
ctx.spawnAdapter(InternalUpdateResponse.apply)

val getResponseAdapter: ActorRef[Replicator.GetResponse[GCounter]] =
ctx.spawnAdapter(InternalGetResponse.apply)

val changedAdapter: ActorRef[Replicator.Changed[GCounter]] =
ctx.spawnAdapter(InternalChanged.apply)

replicator ! Replicator.Subscribe(Key, changedAdapter)

def behavior(cachedValue: Int): Behavior[ClientCommand] = {
Actor.immutable[ClientCommand] { (ctx, msg) ⇒
msg match {
case Increment ⇒
replicator ! Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal, updateResponseAdapter)(_ + 1)
Actor.same

case GetValue(replyTo) ⇒
replicator ! Replicator.Get(Key, Replicator.ReadLocal, getResponseAdapter, Some(replyTo))
Actor.same

case GetCachedValue(replyTo) ⇒
replicator ! Replicator.Get(Key, Replicator.ReadLocal, getResponseAdapter, Some(replyTo))
Actor.same

case internal: InternalMsg ⇒ internal match {
case InternalUpdateResponse(_) ⇒ Actor.same // ok

case InternalGetResponse(rsp @ Replicator.GetSuccess(Key, Some(replyTo: ActorRef[Int] @unchecked))) ⇒
val value = rsp.get(Key).value.toInt
replyTo ! value
Actor.same

case InternalGetResponse(rsp) ⇒
Actor.unhandled // not dealing with failures

case InternalChanged(chg @ Replicator.Changed(Key)) ⇒
val value = chg.get(Key).value.intValue
behavior(value)
}
}
}
}

behavior(cachedValue = 0)
}

object CompileOnlyTest {
def shouldHaveConvenienceForAsk(): Unit = {
val replicator: ActorRef[Replicator.Command] = ???
implicit val timeout = Timeout(3.seconds)
implicit val scheduler: Scheduler = ???
implicit val cluster: Cluster = ???

val reply1: Future[GetResponse[GCounter]] = replicator ? Replicator.Get(Key, Replicator.ReadLocal)

val reply2: Future[UpdateResponse[GCounter]] =
replicator ? Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal)(_ + 1)

val reply3: Future[DeleteResponse[GCounter]] = replicator ? Replicator.Delete(Key, Replicator.WriteLocal)

val reply4: Future[ReplicaCount] = replicator ? Replicator.GetReplicaCount()

// supress unused compiler warnings
println("" + reply1 + reply2 + reply3 + reply4)
}
}

}

class ReplicatorSpec extends TypedSpec(ReplicatorSpec.config) with Eventually {
import ReplicatorSpec._

trait RealTests extends StartSupport {
implicit def system: ActorSystem[TypedSpec.Command]
implicit val testSettings = TestKitSettings(system)
val settings = ReplicatorSettings(system)
implicit val cluster = Cluster(system.toUntyped)

def `have API for Update and Get`(): Unit = {
val replicator = start(Replicator.behavior(settings))
val c = start(client(replicator))

val probe = TestProbe[Int]
c ! Increment
c ! GetValue(probe.ref)
probe.expectMsg(1)
}

def `have API for Subscribe`(): Unit = {
val replicator = start(Replicator.behavior(settings))
val c = start(client(replicator))

val probe = TestProbe[Int]
c ! Increment
c ! Increment
eventually {
c ! GetCachedValue(probe.ref)
probe.expectMsg(2)
}
c ! Increment
eventually {
c ! GetCachedValue(probe.ref)
probe.expectMsg(3)
}
}

def `have an extension`(): Unit = {
val replicator = DistributedData(system).replicator
val c = start(client(replicator))

val probe = TestProbe[Int]
c ! Increment
c ! GetValue(probe.ref)
probe.expectMsg(1)
}

}

object `A ReplicatorBehavior (real, adapted)` extends RealTests with AdaptedSystem
}