Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
413 lines (342 sloc) 12.1 KB
/*
* Copyright (C) 2015-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.ddata
import scala.concurrent.duration._
import akka.actor.Actor
import akka.cluster.ddata._
import akka.testkit.AkkaSpec
import akka.testkit.TestProbe
import akka.actor.ActorRef
import akka.serialization.SerializationExtension
import jdocs.ddata
object DistributedDataDocSpec {
val config =
"""
akka.actor.provider = "cluster"
akka.remote.classic.netty.tcp.port = 0
#//#serializer-config
akka.actor {
serializers {
two-phase-set = "docs.ddata.protobuf.TwoPhaseSetSerializer"
}
serialization-bindings {
"docs.ddata.TwoPhaseSet" = two-phase-set
}
}
#//#serializer-config
#//#japi-serializer-config
akka.actor {
serializers {
twophaseset = "jdocs.ddata.protobuf.TwoPhaseSetSerializer"
}
serialization-bindings {
"jdocs.ddata.TwoPhaseSet" = twophaseset
}
}
#//#japi-serializer-config
"""
//#data-bot
import java.util.concurrent.ThreadLocalRandom
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.ORSet
import akka.cluster.ddata.ORSetKey
import akka.cluster.ddata.Replicator
import akka.cluster.ddata.Replicator._
object DataBot {
private case object Tick
}
class DataBot extends Actor with ActorLogging {
import DataBot._
val replicator = DistributedData(context.system).replicator
implicit val node = DistributedData(context.system).selfUniqueAddress
import context.dispatcher
val tickTask = context.system.scheduler.scheduleWithFixedDelay(5.seconds, 5.seconds, self, Tick)
val DataKey = ORSetKey[String]("key")
replicator ! Subscribe(DataKey, self)
def receive = {
case Tick =>
val s = ThreadLocalRandom.current().nextInt(97, 123).toChar.toString
if (ThreadLocalRandom.current().nextBoolean()) {
// add
log.info("Adding: {}", s)
replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_ :+ s)
} else {
// remove
log.info("Removing: {}", s)
replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_.remove(s))
}
case _: UpdateResponse[_] => // ignore
case c @ Changed(DataKey) =>
val data = c.get(DataKey)
log.info("Current elements: {}", data.elements)
}
override def postStop(): Unit = tickTask.cancel()
}
//#data-bot
}
class DistributedDataDocSpec extends AkkaSpec(DistributedDataDocSpec.config) {
import Replicator._
"demonstrate update" in {
val probe = TestProbe()
implicit val self = probe.ref
//#update
implicit val node = DistributedData(system).selfUniqueAddress
val replicator = DistributedData(system).replicator
val Counter1Key = PNCounterKey("counter1")
val Set1Key = GSetKey[String]("set1")
val Set2Key = ORSetKey[String]("set2")
val ActiveFlagKey = FlagKey("active")
replicator ! Update(Counter1Key, PNCounter(), WriteLocal)(_ :+ 1)
val writeTo3 = WriteTo(n = 3, timeout = 1.second)
replicator ! Update(Set1Key, GSet.empty[String], writeTo3)(_ + "hello")
val writeMajority = WriteMajority(timeout = 5.seconds)
replicator ! Update(Set2Key, ORSet.empty[String], writeMajority)(_ :+ "hello")
val writeAll = WriteAll(timeout = 5.seconds)
replicator ! Update(ActiveFlagKey, Flag.Disabled, writeAll)(_.switchOn)
//#update
probe.expectMsgType[UpdateResponse[_]] match {
//#update-response1
case UpdateSuccess(Counter1Key, req) => // ok
//#update-response1
case unexpected => fail("Unexpected response: " + unexpected)
}
probe.expectMsgType[UpdateResponse[_]] match {
//#update-response2
case UpdateSuccess(Set1Key, req) => // ok
case UpdateTimeout(Set1Key, req) =>
// write to 3 nodes failed within 1.second
//#update-response2
case UpdateSuccess(Set2Key, None) =>
case unexpected => fail("Unexpected response: " + unexpected)
}
}
"demonstrate update with request context" in {
import Actor.Receive
val probe = TestProbe()
implicit val self = probe.ref
def sender() = self
//#update-request-context
implicit val node = DistributedData(system).selfUniqueAddress
val replicator = DistributedData(system).replicator
val writeTwo = WriteTo(n = 2, timeout = 3.second)
val Counter1Key = PNCounterKey("counter1")
def receive: Receive = {
case "increment" =>
// incoming command to increase the counter
val upd = Update(Counter1Key, PNCounter(), writeTwo, request = Some(sender()))(_ :+ 1)
replicator ! upd
case UpdateSuccess(Counter1Key, Some(replyTo: ActorRef)) =>
replyTo ! "ack"
case UpdateTimeout(Counter1Key, Some(replyTo: ActorRef)) =>
replyTo ! "nack"
}
//#update-request-context
}
"demonstrate get" in {
val probe = TestProbe()
implicit val self = probe.ref
//#get
val replicator = DistributedData(system).replicator
val Counter1Key = PNCounterKey("counter1")
val Set1Key = GSetKey[String]("set1")
val Set2Key = ORSetKey[String]("set2")
val ActiveFlagKey = FlagKey("active")
replicator ! Get(Counter1Key, ReadLocal)
val readFrom3 = ReadFrom(n = 3, timeout = 1.second)
replicator ! Get(Set1Key, readFrom3)
val readMajority = ReadMajority(timeout = 5.seconds)
replicator ! Get(Set2Key, readMajority)
val readAll = ReadAll(timeout = 5.seconds)
replicator ! Get(ActiveFlagKey, readAll)
//#get
probe.expectMsgType[GetResponse[_]] match {
//#get-response1
case g @ GetSuccess(Counter1Key, req) =>
val value = g.get(Counter1Key).value
case NotFound(Counter1Key, req) => // key counter1 does not exist
//#get-response1
case unexpected => fail("Unexpected response: " + unexpected)
}
probe.expectMsgType[GetResponse[_]] match {
//#get-response2
case g @ GetSuccess(Set1Key, req) =>
val elements = g.get(Set1Key).elements
case GetFailure(Set1Key, req) =>
// read from 3 nodes failed within 1.second
case NotFound(Set1Key, req) => // key set1 does not exist
//#get-response2
case g @ GetSuccess(Set2Key, None) =>
val elements = g.get(Set2Key).elements
case unexpected => fail("Unexpected response: " + unexpected)
}
}
"demonstrate get with request context" in {
import Actor.Receive
val probe = TestProbe()
implicit val self = probe.ref
def sender() = self
//#get-request-context
implicit val node = DistributedData(system).selfUniqueAddress
val replicator = DistributedData(system).replicator
val readTwo = ReadFrom(n = 2, timeout = 3.second)
val Counter1Key = PNCounterKey("counter1")
def receive: Receive = {
case "get-count" =>
// incoming request to retrieve current value of the counter
replicator ! Get(Counter1Key, readTwo, request = Some(sender()))
case g @ GetSuccess(Counter1Key, Some(replyTo: ActorRef)) =>
val value = g.get(Counter1Key).value.longValue
replyTo ! value
case GetFailure(Counter1Key, Some(replyTo: ActorRef)) =>
replyTo ! -1L
case NotFound(Counter1Key, Some(replyTo: ActorRef)) =>
replyTo ! 0L
}
//#get-request-context
}
"demonstrate subscribe" in {
import Actor.Receive
val probe = TestProbe()
implicit val self = probe.ref
def sender() = self
//#subscribe
val replicator = DistributedData(system).replicator
val Counter1Key = PNCounterKey("counter1")
// subscribe to changes of the Counter1Key value
replicator ! Subscribe(Counter1Key, self)
var currentValue = BigInt(0)
def receive: Receive = {
case c @ Changed(Counter1Key) =>
currentValue = c.get(Counter1Key).value
case "get-count" =>
// incoming request to retrieve current value of the counter
sender() ! currentValue
}
//#subscribe
}
"demonstrate delete" in {
val probe = TestProbe()
implicit val self = probe.ref
//#delete
val replicator = DistributedData(system).replicator
val Counter1Key = PNCounterKey("counter1")
val Set2Key = ORSetKey[String]("set2")
replicator ! Delete(Counter1Key, WriteLocal)
val writeMajority = WriteMajority(timeout = 5.seconds)
replicator ! Delete(Set2Key, writeMajority)
//#delete
}
"demonstrate PNCounter" in {
def println(o: Any): Unit = ()
//#pncounter
implicit val node = DistributedData(system).selfUniqueAddress
val c0 = PNCounter.empty
val c1 = c0 :+ 1
val c2 = c1 :+ 7
val c3: PNCounter = c2.decrement(2)
println(c3.value) // 6
//#pncounter
}
"demonstrate PNCounterMap" in {
def println(o: Any): Unit = ()
//#pncountermap
implicit val node = DistributedData(system).selfUniqueAddress
val m0 = PNCounterMap.empty[String]
val m1 = m0.increment(node, "a", 7)
val m2 = m1.decrement(node, "a", 2)
val m3 = m2.increment(node, "b", 1)
println(m3.get("a")) // 5
m3.entries.foreach { case (key, value) => println(s"$key -> $value") }
//#pncountermap
}
"demonstrate GSet" in {
def println(o: Any): Unit = ()
//#gset
val s0 = GSet.empty[String]
val s1 = s0 + "a"
val s2 = s1 + "b" + "c"
if (s2.contains("a"))
println(s2.elements) // a, b, c
//#gset
}
"demonstrate ORSet" in {
def println(o: Any): Unit = ()
//#orset
implicit val node = DistributedData(system).selfUniqueAddress
val s0 = ORSet.empty[String]
val s1 = s0 :+ "a"
val s2 = s1 :+ "b"
val s3 = s2.remove("a")
println(s3.elements) // b
//#orset
}
"demonstrate ORMultiMap" in {
def println(o: Any): Unit = ()
//#ormultimap
implicit val node = DistributedData(system).selfUniqueAddress
val m0 = ORMultiMap.empty[String, Int]
val m1 = m0 :+ ("a" -> Set(1, 2, 3))
val m2 = m1.addBinding(node, "a", 4)
val m3 = m2.removeBinding(node, "a", 2)
val m4 = m3.addBinding(node, "b", 1)
println(m4.entries)
//#ormultimap
}
"demonstrate Flag" in {
def println(o: Any): Unit = ()
//#flag
val f0 = Flag.Disabled
val f1 = f0.switchOn
println(f1.enabled)
//#flag
}
"demonstrate LWWRegister" in {
def println(o: Any): Unit = ()
//#lwwregister
implicit val node = DistributedData(system).selfUniqueAddress
val r1 = LWWRegister.create("Hello")
val r2 = r1.withValueOf("Hi")
println(s"${r1.value} by ${r1.updatedBy} at ${r1.timestamp}")
//#lwwregister
r2.value should be("Hi")
}
"demonstrate LWWRegister with custom clock" in {
def println(o: Any): Unit = ()
//#lwwregister-custom-clock
case class Record(version: Int, name: String, address: String)
implicit val node = DistributedData(system).selfUniqueAddress
implicit val recordClock = new LWWRegister.Clock[Record] {
override def apply(currentTimestamp: Long, value: Record): Long =
value.version
}
val record1 = Record(version = 1, "Alice", "Union Square")
val r1 = LWWRegister(node, record1, recordClock)
val record2 = Record(version = 2, "Alice", "Madison Square")
val r2 = LWWRegister(node, record2, recordClock)
val r3 = r1.merge(r2)
println(r3.value)
//#lwwregister-custom-clock
r3.value.address should be("Madison Square")
}
"test TwoPhaseSetSerializer" in {
val s1 = TwoPhaseSet().add("a").add("b").add("c").remove("b")
s1.elements should be(Set("a", "c"))
val serializer = SerializationExtension(system).findSerializerFor(s1)
val blob = serializer.toBinary(s1)
val s2 = serializer.fromBinary(blob, None)
s1 should be(s1)
}
"test japi.TwoPhaseSetSerializer" in {
import akka.util.ccompat.JavaConverters._
val s1 = ddata.TwoPhaseSet.create().add("a").add("b").add("c").remove("b")
s1.getElements.asScala should be(Set("a", "c"))
val serializer = SerializationExtension(system).findSerializerFor(s1)
val blob = serializer.toBinary(s1)
val s2 = serializer.fromBinary(blob, None)
s1 should be(s1)
}
}
You can’t perform that action at this time.