-
Notifications
You must be signed in to change notification settings - Fork 3
/
EventJournal.scala
115 lines (90 loc) · 3.29 KB
/
EventJournal.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package zio.es
import java.util.concurrent.ConcurrentHashMap
import zio._
import zio.stream._
class Aggregate[-E, +S] private[es] (
key: String,
aggState: Ref[S],
aggregations: (S, E) => Task[S],
persist: (String, E) => Task[Unit]
) {
val state: UIO[S] = aggState.get
def appendAll(evt: Iterable[E]): Task[Aggregate[E, S]] =
ZIO.foreach(evt)(append).map(_.head)
def append(evt: E): Task[Aggregate[E, S]] =
for {
_ <- persist(key, evt)
_ <- appendNoPersist(evt)
} yield this
private[es] def appendNoPersist(evt: E): Task[Aggregate[E, S]] =
for {
curState <- state
modified <- aggregations(curState, evt)
_ <- aggState.set(modified)
} yield this
}
trait SerializableEvent[E] extends Serializable {
def toBytes(evt: E): Array[Byte]
def fromBytes(bytes: Array[Byte]): E
}
object SerializableEvent {
implicit class SerializableEventOps[E](se: SerializableEvent[E]) {
def toBytesZ(evt: E): Task[Array[Byte]] = Task(se.toBytes(evt))
def fromBytesZ(bytes: Array[Byte]): Task[E] = Task(se.fromBytes(bytes))
}
}
trait EventJournal[E] { self =>
/**
* Write event to journal (no loaded aggregates will be updated)
*/
def persistEvent(key: String, event: E): Task[Unit]
/**
* Load event stream from journal
*/
def loadEvents(key: String): Stream[Throwable, E]
/**
* Create new empty aggregate
*/
def create[S](key: String, behaviour: AggregateBehaviour[E, S]): Task[Aggregate[E, S]] =
for {
initialStateRef <- Ref.make(behaviour.initialState)
} yield new Aggregate[E, S](key, initialStateRef, behaviour.aggregations, persistEvent)
/**
* Load aggregate from event journal
*/
def load[S](key: String, behaviour: AggregateBehaviour[E, S]): Task[Aggregate[E, S]] =
for {
agg <- create[S](key, behaviour)
res <- loadEvents(key).foldM(agg)(_ appendNoPersist _)
} yield res
/**
* Stream of all entity ids stored
*/
def allIds: Stream[Throwable, String]
/**
* Stream of all entities stored
*/
def allEntries[S](behaviour: AggregateBehaviour[E, S]): Stream[Throwable, Aggregate[E, S]] =
allIds.mapM(load(_, behaviour))
}
object EventJournal {
private class InMemory[E](knownIds: Ref[Set[String]]) extends EventJournal[E] {
private[this] val store: ConcurrentHashMap[String, Vector[E]] = new ConcurrentHashMap()
private def getEventsFor(key: String): Task[Vector[E]] =
Task(store.computeIfAbsent(key, _ => Vector.empty[E]))
private def updateEventsFor(key: String, events: Vector[E]): Task[Unit] = Task(store.put(key, events)).unit
def persistEvent(key: String, event: E): Task[Unit] =
for {
events <- getEventsFor(key)
_ <- updateEventsFor(key, events :+ event)
} yield ()
def loadEvents(key: String): Stream[Throwable, E] = Stream.fromIteratorEffect(getEventsFor(key).map(_.toIterator))
def allIds: Stream[Throwable, String] = Stream.unwrap(knownIds.get.map(res => Stream.fromIterable(res)))
}
def inMemory[E]: Task[EventJournal[E]] =
Ref
.make(Set.empty[String])
.flatMap(stateRef => Task(new InMemory[E](stateRef)))
def aggregate[E, S](initial: S)(aggregations: (S, E) => Task[S]): Task[AggregateBehaviour[E, S]] =
ZIO.succeed(new AggregateBehaviour(initial, aggregations))
}