/
DurableStateStoreBySliceQuery.scala
73 lines (64 loc) · 2.78 KB
/
DurableStateStoreBySliceQuery.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/*
* Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.query.typed.scaladsl
import scala.collection.immutable
import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.persistence.query.DurableStateChange
import akka.persistence.query.Offset
import akka.persistence.state.scaladsl.DurableStateStore
import akka.stream.scaladsl.Source
/**
* Query API for reading durable state objects.
*
* For Java API see [[DurableStateStoreBySliceQuery]].
*
* API May Change
*/
@ApiMayChange
trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
/**
* Get a source of the most recent changes made to objects with the given slice range since the passed in offset.
*
* A slice is deterministically defined based on the persistence id. The purpose is to evenly distribute all
* persistence ids over the slices.
*
* Note that this only returns the most recent change to each object, if an object has been updated multiple times
* since the offset, only the most recent of those changes will be part of the stream.
*
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* [[akka.persistence.query.DeletedDurableState]].
*/
def currentChangesBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[DurableStateChange[A], NotUsed]
/**
* Get a source of the most recent changes made to objects of the given slice range since the passed in offset.
*
* A slice is deterministically defined based on the persistence id. The purpose is to evenly distribute all
* persistence ids over the slices.
*
* The returned source will never terminate, it effectively watches for changes to the objects and emits changes as
* they happen.
*
* Not all changes that occur are guaranteed to be emitted, this call only guarantees that eventually, the most recent
* change for each object since the offset will be emitted. In particular, multiple updates to a given object in quick
* succession are likely to be skipped, with only the last update resulting in a change from this source.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* [[akka.persistence.query.DeletedDurableState]].
*/
def changesBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[DurableStateChange[A], NotUsed]
def sliceForPersistenceId(persistenceId: String): Int
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
}