/
PersistentActor.scala
413 lines (367 loc) · 17.2 KB
/
PersistentActor.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.persistence
import java.lang.{ Iterable ⇒ JIterable }
import akka.actor._
import akka.japi.Procedure
import akka.japi.Util
import com.typesafe.config.Config
import scala.util.control.NoStackTrace
abstract class RecoveryCompleted
/**
* Sent to a [[PersistentActor]] when the journal replay has been finished.
*/
@SerialVersionUID(1L)
case object RecoveryCompleted extends RecoveryCompleted {
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
/**
* Reply message to a successful [[Eventsourced#deleteMessages]] request.
*/
final case class DeleteMessagesSuccess(toSequenceNr: Long)
/**
* Reply message to a failed [[Eventsourced#deleteMessages]] request.
*/
final case class DeleteMessagesFailure(cause: Throwable, toSequenceNr: Long)
/**
* Recovery mode configuration object to be returned in [[PersistentActor#recovery]].
*
* By default recovers from latest snapshot replays through to the last available event (last sequenceId).
*
* Recovery will start from a snapshot if the persistent actor has previously saved one or more snapshots
* and at least one of these snapshots matches the specified `fromSnapshot` criteria.
* Otherwise, recovery will start from scratch by replaying all stored events.
*
* If recovery starts from a snapshot, the persistent actor is offered that snapshot with a [[SnapshotOffer]]
* message, followed by replayed messages, if any, that are younger than the snapshot, up to the
* specified upper sequence number bound (`toSequenceNr`).
*
* @param fromSnapshot criteria for selecting a saved snapshot from which recovery should start. Default
* is latest (= youngest) snapshot.
* @param toSequenceNr upper sequence number bound (inclusive) for recovery. Default is no upper bound.
* @param replayMax maximum number of messages to replay. Default is no limit.
*/
@SerialVersionUID(1L)
final case class Recovery(
fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest,
toSequenceNr: Long = Long.MaxValue,
replayMax: Long = Long.MaxValue)
object Recovery {
/**
* Java API
* @see [[Recovery]]
*/
def create() = Recovery()
/**
* Java API
* @see [[Recovery]]
*/
def create(toSequenceNr: Long) =
Recovery(toSequenceNr = toSequenceNr)
/**
* Java API
* @see [[Recovery]]
*/
def create(fromSnapshot: SnapshotSelectionCriteria) =
Recovery(fromSnapshot = fromSnapshot)
/**
* Java API
* @see [[Recovery]]
*/
def create(fromSnapshot: SnapshotSelectionCriteria, toSequenceNr: Long) =
Recovery(fromSnapshot, toSequenceNr)
/**
* Java API
* @see [[Recovery]]
*/
def create(fromSnapshot: SnapshotSelectionCriteria, toSequenceNr: Long, replayMax: Long) =
Recovery(fromSnapshot, toSequenceNr, replayMax)
/**
* Convenience method for skipping recovery in [[PersistentActor]].
* @see [[Recovery]]
*/
val none: Recovery = Recovery(toSequenceNr = 0L)
}
final class RecoveryTimedOut(message: String) extends RuntimeException(message) with NoStackTrace
/**
* This defines how to handle the current received message which failed to stash, when the size of
* Stash exceeding the capacity of Stash.
*/
sealed trait StashOverflowStrategy
/**
* Discard the message to [[akka.actor.DeadLetter]].
*/
case object DiscardToDeadLetterStrategy extends StashOverflowStrategy {
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
/**
* Throw [[akka.actor.StashOverflowException]], hence the persistent actor will starting recovery
* if guarded by default supervisor strategy.
* Be carefully if used together with persist/persistAll or has many messages needed
* to replay.
*/
case object ThrowOverflowExceptionStrategy extends StashOverflowStrategy {
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
/**
* Reply to sender with predefined response, and discard the received message silently.
* @param response the message replying to sender with
*/
final case class ReplyToStrategy(response: Any) extends StashOverflowStrategy
/**
* Implement this interface in order to configure the stashOverflowStrategy for
* the internal stash of persistent actor.
* An instance of this class must be instantiable using a no-arg constructor.
*/
trait StashOverflowStrategyConfigurator {
def create(config: Config): StashOverflowStrategy
}
final class ThrowExceptionConfigurator extends StashOverflowStrategyConfigurator {
override def create(config: Config) = ThrowOverflowExceptionStrategy
}
final class DiscardConfigurator extends StashOverflowStrategyConfigurator {
override def create(config: Config) = DiscardToDeadLetterStrategy
}
/**
* An persistent Actor - can be used to implement command or event sourcing.
*/
trait PersistentActor extends Eventsourced with PersistenceIdentity {
def receive = receiveCommand
}
/**
* Java API: an persistent actor - can be used to implement command or event sourcing.
*/
abstract class UntypedPersistentActor extends UntypedActor with Eventsourced with PersistenceIdentity {
final def onReceive(message: Any) = onReceiveCommand(message)
final def receiveRecover: Receive = {
case msg ⇒ onReceiveRecover(msg)
}
final def receiveCommand: Receive = {
case msg ⇒ onReceiveCommand(msg)
}
/**
* Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event. It is guaranteed that no new commands will be received by a persistent actor
* between a call to `persist` and the execution of its `handler`. This also holds for
* multiple `persist` calls per received command. Internally, this is achieved by stashing new
* commands and unstashing them when the `event` has been persisted and handled. The stash used
* for that is an internal stash which doesn't interfere with the inherited user stash.
*
* An event `handler` may close over persistent actor state and modify it. The `getSender()` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* Within an event handler, applications usually update persistent actor state using persisted event
* data, notify listeners and reply to command senders.
*
* If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
* unconditionally be stopped. The reason that it cannot resume when persist fails is that it
* is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
* state. Restarting on persistent failures will most likely fail anyway, since the journal
* is probably unavailable. It is better to stop the actor and after a back-off timeout start
* it again.
*
* @param event event to be persisted.
* @param handler handler for each persisted `event`
*/
def persist[A](event: A, handler: Procedure[A]): Unit =
persist(event)(event ⇒ handler(event))
/**
* Java API: asynchronously persists `events` in specified order. This is equivalent to calling
* `persist[A](event: A, handler: Procedure[A])` multiple times with the same `handler`,
* except that `events` are persisted atomically with this method.
*
* @param events events to be persisted.
* @param handler handler for each persisted `events`
*/
def persistAll[A](events: JIterable[A], handler: Procedure[A]): Unit =
persistAll(Util.immutableSeq(events))(event ⇒ handler(event))
@deprecated("use persistAll instead", "2.4")
def persist[A](events: JIterable[A], handler: Procedure[A]): Unit =
persistAll(events, handler)
/**
* JAVA API: asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event.
*
* Unlike `persist` the persistent actor will continue to receive incoming commands between the
* call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of
* of persist should be used when you favor throughput over the "command-2 only processed after
* command-1 effects' have been applied" guarantee, which is provided by the plain [[#persist]] method.
*
* An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
* unconditionally be stopped. The reason that it cannot resume when persist fails is that it
* is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
* state. Restarting on persistent failures will most likely fail anyway, since the journal
* is probably unavailable. It is better to stop the actor and after a back-off timeout start
* it again.
*
* @param event event to be persisted
* @param handler handler for each persisted `event`
*/
def persistAsync[A](event: A)(handler: Procedure[A]): Unit =
super[Eventsourced].persistAsync(event)(event ⇒ handler(event))
/**
* JAVA API: asynchronously persists `events` in specified order. This is equivalent to calling
* `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
* except that `events` are persisted atomically with this method.
*
* @param events events to be persisted
* @param handler handler for each persisted `events`
*/
def persistAllAsync[A](events: JIterable[A], handler: Procedure[A]): Unit =
super[Eventsourced].persistAllAsync(Util.immutableSeq(events))(event ⇒ handler(event))
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer,
* the corresponding handlers will be invoked in the same order as they were registered in.
*
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediately.
*
* If persistence of an earlier event fails, the persistent actor will stop, and the `handler`
* will not be run.
*
* @param event event to be handled in the future, when preceding persist operations have been processes
* @param handler handler for the given `event`
*/
def deferAsync[A](event: A)(handler: Procedure[A]): Unit =
super[Eventsourced].deferAsync(event)(event ⇒ handler(event))
/**
* Java API: recovery handler that receives persisted events during recovery. If a state snapshot
* has been captured and saved, this handler will receive a [[SnapshotOffer]] message
* followed by events that are younger than the offered snapshot.
*
* This handler must not have side-effects other than changing persistent actor state i.e. it
* should not perform actions that may fail, such as interacting with external services,
* for example.
*
* If there is a problem with recovering the state of the actor from the journal, the error
* will be logged and the actor will be stopped.
*
* @see [[Recovery]]
*/
@throws(classOf[Throwable])
def onReceiveRecover(msg: Any): Unit
/**
* Java API: command handler. Typically validates commands against current state (and/or by
* communication with other actors). On successful validation, one or more events are
* derived from a command and these events are then persisted by calling `persist`.
*/
@throws(classOf[Throwable])
def onReceiveCommand(msg: Any): Unit
}
/**
* Java API: an persistent actor - can be used to implement command or event sourcing.
*/
abstract class AbstractPersistentActor extends AbstractActor with PersistentActor with Eventsourced {
/**
* Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event. It is guaranteed that no new commands will be received by a persistent actor
* between a call to `persist` and the execution of its `handler`. This also holds for
* multiple `persist` calls per received command. Internally, this is achieved by stashing new
* commands and unstashing them when the `event` has been persisted and handled. The stash used
* for that is an internal stash which doesn't interfere with the inherited user stash.
*
* An event `handler` may close over persistent actor state and modify it. The `getSender()` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* Within an event handler, applications usually update persistent actor state using persisted event
* data, notify listeners and reply to command senders.
*
* If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
* unconditionally be stopped. The reason that it cannot resume when persist fails is that it
* is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
* state. Restarting on persistent failures will most likely fail anyway, since the journal
* is probably unavailable. It is better to stop the actor and after a back-off timeout start
* it again.
*
* @param event event to be persisted.
* @param handler handler for each persisted `event`
*/
def persist[A](event: A, handler: Procedure[A]): Unit =
persist(event)(event ⇒ handler(event))
/**
* Java API: asynchronously persists `events` in specified order. This is equivalent to calling
* `persist[A](event: A, handler: Procedure[A])` multiple times with the same `handler`,
* except that `events` are persisted atomically with this method.
*
* @param events events to be persisted.
* @param handler handler for each persisted `events`
*/
def persistAll[A](events: JIterable[A], handler: Procedure[A]): Unit =
persistAll(Util.immutableSeq(events))(event ⇒ handler(event))
@deprecated("use persistAll instead", "2.4")
def persist[A](events: JIterable[A], handler: Procedure[A]): Unit =
persistAll(events, handler)
/**
* Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event.
*
* Unlike `persist` the persistent actor will continue to receive incoming commands between the
* call to `persistAsync` and executing it's `handler`. This asynchronous, non-stashing, version of
* of persist should be used when you favor throughput over the strict ordering guarantees that `persist` guarantees.
*
* If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
* unconditionally be stopped. The reason that it cannot resume when persist fails is that it
* is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
* state. Restarting on persistent failures will most likely fail anyway, since the journal
* is probably unavailable. It is better to stop the actor and after a back-off timeout start
* it again.
*
* @param event event to be persisted
* @param handler handler for each persisted `event`
*/
def persistAsync[A](event: A, handler: Procedure[A]): Unit =
persistAsync(event)(event ⇒ handler(event))
/**
* Java API: asynchronously persists `events` in specified order. This is equivalent to calling
* `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
* except that `events` are persisted atomically with this method.
*
* @param events events to be persisted
* @param handler handler for each persisted `events`
*/
def persistAllAsync[A](events: JIterable[A], handler: Procedure[A]): Unit =
persistAllAsync(Util.immutableSeq(events))(event ⇒ handler(event))
@deprecated("use persistAllAsync instead", "2.4")
def persistAsync[A](events: JIterable[A], handler: Procedure[A]): Unit =
persistAllAsync(events, handler)
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer,
* the corresponding handlers will be invoked in the same order as they were registered in.
*
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediately.
*
* If persistence of an earlier event fails, the persistent actor will stop, and the `handler`
* will not be run.
*
* @param event event to be handled in the future, when preceding persist operations have been processes
* @param handler handler for the given `event`
*/
def deferAsync[A](event: A)(handler: Procedure[A]): Unit =
super.deferAsync(event)(event ⇒ handler(event))
override def receive = super[PersistentActor].receive
}