-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
Copy pathSnapshotStore.scala
180 lines (157 loc) · 6.01 KB
/
SnapshotStore.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
/*
* Copyright (C) 2009-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.snapshot
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.actor._
import akka.pattern.CircuitBreaker
import akka.pattern.pipe
import akka.persistence._
/**
* Abstract snapshot store.
*/
trait SnapshotStore extends Actor with ActorLogging {
import SnapshotProtocol._
private val extension = Persistence(context.system)
private val publish = extension.settings.internal.publishPluginCommands
private val breaker = {
val cfg = extension.configFor(self)
val maxFailures = cfg.getInt("circuit-breaker.max-failures")
val callTimeout = cfg.getDuration("circuit-breaker.call-timeout", MILLISECONDS).millis
val resetTimeout = cfg.getDuration("circuit-breaker.reset-timeout", MILLISECONDS).millis
CircuitBreaker(context.system.scheduler, maxFailures, callTimeout, resetTimeout)
}
final def receive = receiveSnapshotStore.orElse[Any, Unit](receivePluginInternal)
final val receiveSnapshotStore: Actor.Receive = {
val eventStream = context.system.eventStream // used from Future callbacks
implicit val ec: ExecutionContext = context.dispatcher
{
case LoadSnapshot(persistenceId, criteria, toSequenceNr) =>
if (criteria == SnapshotSelectionCriteria.None) {
senderPersistentActor() ! LoadSnapshotResult(snapshot = None, toSequenceNr)
} else {
breaker
.withCircuitBreaker(loadAsync(persistenceId, criteria.limit(toSequenceNr)))
.map { sso =>
LoadSnapshotResult(sso, toSequenceNr)
}
.recover {
case e => LoadSnapshotFailed(e)
}
.pipeTo(senderPersistentActor())
}
case SaveSnapshot(metadata, snapshot) =>
val md = metadata.copy(timestamp = System.currentTimeMillis)
breaker
.withCircuitBreaker(saveAsync(md, snapshot))
.map { _ =>
SaveSnapshotSuccess(md)
}
.recover {
case e => SaveSnapshotFailure(metadata, e)
}
.to(self, senderPersistentActor())
case evt: SaveSnapshotSuccess =>
try tryReceivePluginInternal(evt)
finally senderPersistentActor() ! evt // sender is persistentActor
case evt @ SaveSnapshotFailure(metadata, _) =>
try {
tryReceivePluginInternal(evt)
breaker.withCircuitBreaker(deleteAsync(metadata))
} finally senderPersistentActor() ! evt // sender is persistentActor
case d @ DeleteSnapshot(metadata) =>
breaker
.withCircuitBreaker(deleteAsync(metadata))
.map {
case _ => DeleteSnapshotSuccess(metadata)
}
.recover {
case e => DeleteSnapshotFailure(metadata, e)
}
.pipeTo(self)(senderPersistentActor())
.onComplete {
case _ => if (publish) eventStream.publish(d)
}
case evt: DeleteSnapshotSuccess =>
try tryReceivePluginInternal(evt)
finally senderPersistentActor() ! evt
case evt: DeleteSnapshotFailure =>
try tryReceivePluginInternal(evt)
finally senderPersistentActor() ! evt
case d @ DeleteSnapshots(persistenceId, criteria) =>
breaker
.withCircuitBreaker(deleteAsync(persistenceId, criteria))
.map {
case _ => DeleteSnapshotsSuccess(criteria)
}
.recover {
case e => DeleteSnapshotsFailure(criteria, e)
}
.pipeTo(self)(senderPersistentActor())
.onComplete {
case _ => if (publish) eventStream.publish(d)
}
case evt: DeleteSnapshotsFailure =>
try tryReceivePluginInternal(evt)
finally senderPersistentActor() ! evt // sender is persistentActor
case evt: DeleteSnapshotsSuccess =>
try tryReceivePluginInternal(evt)
finally senderPersistentActor() ! evt
}
}
/** Documents intent that the sender() is expected to be the PersistentActor */
@inline private final def senderPersistentActor(): ActorRef = sender()
private def tryReceivePluginInternal(evt: Any): Unit =
if (receivePluginInternal.isDefinedAt(evt)) receivePluginInternal(evt)
//#snapshot-store-plugin-api
/**
* Plugin API: asynchronously loads a snapshot.
*
* If the future `Option` is `None` then all events will be replayed,
* i.e. there was no snapshot. If snapshot could not be loaded the `Future`
* should be completed with failure. That is important because events may
* have been deleted and just replaying the events might not result in a valid
* state.
*
* This call is protected with a circuit-breaker.
*
* @param persistenceId id of the persistent actor.
* @param criteria selection criteria for loading.
*/
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]
/**
* Plugin API: asynchronously saves a snapshot.
*
* This call is protected with a circuit-breaker.
*
* @param metadata snapshot metadata.
* @param snapshot snapshot.
*/
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]
/**
* Plugin API: deletes the snapshot identified by `metadata`.
*
* This call is protected with a circuit-breaker.
*
* @param metadata snapshot metadata.
*/
def deleteAsync(metadata: SnapshotMetadata): Future[Unit]
/**
* Plugin API: deletes all snapshots matching `criteria`.
*
* This call is protected with a circuit-breaker.
*
* @param persistenceId id of the persistent actor.
* @param criteria selection criteria for deleting.
*/
def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit]
/**
* Plugin API
* Allows plugin implementers to use `f pipeTo self` and
* handle additional messages for implementing advanced features
*/
def receivePluginInternal: Actor.Receive = Actor.emptyBehavior
//#snapshot-store-plugin-api
}