This repository has been archived by the owner on May 18, 2021. It is now read-only.
/
plugin.go
98 lines (83 loc) · 2.73 KB
/
plugin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package persistence
/**
This is derived from vendor/github.com/AsynkronIT/protoactor-go/persistence/plugin.go
This has been modified to support propagating event indices to plugins
= */
import (
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/persistence"
"github.com/golang/protobuf/proto"
)
type persistent interface {
init(provider Provider, context actor.Context)
isSnapshotRequested() bool
sendSnapshotRequest()
PersistReceive(message proto.Message)
PersistSnapshot(snapshot proto.Message)
Recovering() bool
Name() string
}
// Mixin is the persistence mixin for actors
type Mixin struct {
eventIndex int
providerState ProviderState
name string
receiver receiver
recovering bool
snapshotRequested bool
}
// Recovering indicates if this actor is recovering (in which all messages are replays) or not
func (mixin *Mixin) Recovering() bool {
return mixin.recovering
}
// Name is the actors' persistence name
func (mixin *Mixin) Name() string {
return mixin.name
}
// PersistReceive saves an event to the actors journal
func (mixin *Mixin) PersistReceive(message proto.Message) {
mixin.providerState.PersistEvent(mixin.Name(), mixin.eventIndex, message)
mixin.eventIndex++
if mixin.eventIndex%mixin.providerState.GetSnapshotInterval() == 0 {
// Do not invoke Receive since this will overwrite actor's context
// if PersistReceive is called inside actor's Receive method.
// mixin.receiver.Receive(&persistence.RequestSnapshot{})
mixin.snapshotRequested = true
}
}
// PersistSnapshot overwrites an actor's current snapshot
func (mixin *Mixin) PersistSnapshot(snapshot proto.Message) {
mixin.providerState.PersistSnapshot(mixin.Name(), mixin.eventIndex, snapshot)
}
func (mixin *Mixin) init(provider Provider, context actor.Context) {
if mixin.providerState == nil {
mixin.providerState = provider.GetState()
}
receiver := context.(receiver)
mixin.name = context.Self().Id
mixin.eventIndex = 0
mixin.receiver = receiver
mixin.recovering = true
mixin.snapshotRequested = false
mixin.providerState.Restart()
if snapshot, eventIndex, ok := mixin.providerState.GetSnapshot(mixin.Name()); ok {
mixin.eventIndex = eventIndex
receiver.Receive(snapshot)
}
mixin.providerState.GetEvents(mixin.Name(), mixin.eventIndex, func(index int, e interface{}) {
receiver.Receive(e)
mixin.eventIndex = index + 1
})
mixin.recovering = false
receiver.Receive(&persistence.ReplayComplete{})
}
func (mixin *Mixin) isSnapshotRequested() bool {
return mixin.snapshotRequested
}
func (mixin *Mixin) sendSnapshotRequest() {
mixin.snapshotRequested = false
mixin.receiver.Receive(&persistence.RequestSnapshot{})
}
type receiver interface {
Receive(message interface{})
}