This repository has been archived by the owner on Apr 18, 2024. It is now read-only.
/
ShoppingCart.scala
111 lines (88 loc) · 3.42 KB
/
ShoppingCart.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package sample.distributeddata
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.LWWMap
import akka.cluster.ddata.LWWMapKey
object ShoppingCart {
import akka.cluster.ddata.Replicator._
def props(userId: String): Props = Props(new ShoppingCart(userId))
case object GetCart
final case class AddItem(item: LineItem)
final case class RemoveItem(productId: String)
final case class Cart(items: Set[LineItem])
final case class LineItem(productId: String, title: String, quantity: Int)
//#read-write-majority
private val timeout = 3.seconds
private val readMajority = ReadMajority(timeout)
private val writeMajority = WriteMajority(timeout)
//#read-write-majority
}
class ShoppingCart(userId: String) extends Actor {
import ShoppingCart._
import akka.cluster.ddata.Replicator._
val replicator = DistributedData(context.system).replicator
implicit val cluster = Cluster(context.system)
val DataKey = LWWMapKey[String, LineItem]("cart-" + userId)
def receive = receiveGetCart
.orElse[Any, Unit](receiveAddItem)
.orElse[Any, Unit](receiveRemoveItem)
.orElse[Any, Unit](receiveOther)
//#get-cart
def receiveGetCart: Receive = {
case GetCart =>
replicator ! Get(DataKey, readMajority, Some(sender()))
case g @ GetSuccess(DataKey, Some(replyTo: ActorRef)) =>
val data = g.get(DataKey)
val cart = Cart(data.entries.values.toSet)
replyTo ! cart
case NotFound(DataKey, Some(replyTo: ActorRef)) =>
replyTo ! Cart(Set.empty)
case GetFailure(DataKey, Some(replyTo: ActorRef)) =>
// ReadMajority failure, try again with local read
replicator ! Get(DataKey, ReadLocal, Some(replyTo))
}
//#get-cart
//#add-item
def receiveAddItem: Receive = {
case cmd @ AddItem(item) =>
val update = Update(DataKey, LWWMap.empty[String, LineItem], writeMajority, Some(cmd)) {
cart => updateCart(cart, item)
}
replicator ! update
}
//#add-item
def updateCart(data: LWWMap[String, LineItem], item: LineItem): LWWMap[String, LineItem] =
data.get(item.productId) match {
case Some(LineItem(_, _, existingQuantity)) =>
data + (item.productId -> item.copy(quantity = existingQuantity + item.quantity))
case None => data + (item.productId -> item)
}
//#remove-item
def receiveRemoveItem: Receive = {
case cmd @ RemoveItem(productId) =>
// Try to fetch latest from a majority of nodes first, since ORMap
// remove must have seen the item to be able to remove it.
replicator ! Get(DataKey, readMajority, Some(cmd))
case GetSuccess(DataKey, Some(RemoveItem(productId))) =>
replicator ! Update(DataKey, LWWMap(), writeMajority, None) {
_ - productId
}
case GetFailure(DataKey, Some(RemoveItem(productId))) =>
// ReadMajority failed, fall back to best effort local value
replicator ! Update(DataKey, LWWMap(), writeMajority, None) {
_ - productId
}
case NotFound(DataKey, Some(RemoveItem(productId))) =>
// nothing to remove
}
//#remove-item
def receiveOther: Receive = {
case _: UpdateSuccess[_] | _: UpdateTimeout[_] =>
// UpdateTimeout, will eventually be replicated
case e: UpdateFailure[_] => throw new IllegalStateException("Unexpected failure: " + e)
}
}