forked from asynkron/protoactor-go
/
endpoint_watcher.go
143 lines (124 loc) · 3.94 KB
/
endpoint_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package remote
import (
reflect "reflect"
"github.com/aergoio/aergo-actor/actor"
)
func newEndpointWatcher(address string) actor.Producer {
return func() actor.Actor {
return &endpointWatcher{
address: address,
}
}
}
type endpointWatcher struct {
address string
watched map[string]*actor.PIDSet //key is the watching PID string, value is the watched PID
}
func (state *endpointWatcher) initialize() {
plog.Info().Str("address", state.address).Msg("Started EndpointWatcher")
state.watched = make(map[string]*actor.PIDSet)
}
func (state *endpointWatcher) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *actor.Started:
state.initialize()
case *remoteTerminate:
//delete the watch entries
if pidSet, ok := state.watched[msg.Watcher.Id]; ok {
pidSet.Remove(msg.Watchee)
if pidSet.Len() == 0 {
delete(state.watched, msg.Watcher.Id)
}
}
terminated := &actor.Terminated{
Who: msg.Watchee,
AddressTerminated: false,
}
ref, ok := actor.ProcessRegistry.GetLocal(msg.Watcher.Id)
if ok {
ref.SendSystemMessage(msg.Watcher, terminated)
}
case *EndpointConnectedEvent:
//Already connected, pass
case *EndpointTerminatedEvent:
plog.Info().Str("address", state.address).Msg("EndpointWatcher handling terminated")
for id, pidSet := range state.watched {
//try to find the watcher ID in the local actor registry
ref, ok := actor.ProcessRegistry.GetLocal(id)
if ok {
pidSet.ForEach(func(i int, pid actor.PID) {
//create a terminated event for the Watched actor
terminated := &actor.Terminated{
Who: &pid,
AddressTerminated: true,
}
watcher := actor.NewLocalPID(id)
//send the address Terminated event to the Watcher
ref.SendSystemMessage(watcher, terminated)
})
}
}
//Clear watcher's map
state.watched = make(map[string]*actor.PIDSet)
ctx.SetBehavior(state.Terminated)
ctx.Self().Stop()
case *remoteWatch:
//add watchee to watcher's map
if pidSet, ok := state.watched[msg.Watcher.Id]; ok {
pidSet.Add(msg.Watchee)
} else {
state.watched[msg.Watcher.Id] = actor.NewPIDSet(msg.Watchee)
}
//recreate the Watch command
w := &actor.Watch{
Watcher: msg.Watcher,
}
//pass it off to the remote PID
SendMessage(msg.Watchee, nil, w, nil, -1)
case *remoteUnwatch:
//delete the watch entries
if pidSet, ok := state.watched[msg.Watcher.Id]; ok {
pidSet.Remove(msg.Watchee)
if pidSet.Len() == 0 {
delete(state.watched, msg.Watcher.Id)
}
}
//recreate the Unwatch command
uw := &actor.Unwatch{
Watcher: msg.Watcher,
}
//pass it off to the remote PID
SendMessage(msg.Watchee, nil, uw, nil, -1)
case actor.SystemMessage, actor.AutoReceiveMessage:
//ignore
default:
plog.Error().Str("address", state.address).Interface("msg", msg).Msg("EndpointWatcher received unknown message")
}
}
func (state *endpointWatcher) Terminated(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *remoteWatch:
//try to find the watcher ID in the local actor registry
ref, ok := actor.ProcessRegistry.GetLocal(msg.Watcher.Id)
if ok {
//create a terminated event for the Watched actor
terminated := &actor.Terminated{
Who: msg.Watchee,
AddressTerminated: true,
}
//send the address Terminated event to the Watcher
ref.SendSystemMessage(msg.Watcher, terminated)
}
case *EndpointConnectedEvent:
plog.Info().Str("address", state.address).Msg("EndpointWatcher handling restart")
ctx.SetBehavior(state.Receive)
case *remoteTerminate, *EndpointTerminatedEvent, *remoteUnwatch:
// pass
plog.Error().Str("address", state.address).Interface("msg", msg).Msg("EndpointWatcher receive message for already terminated endpoint")
case actor.SystemMessage, actor.AutoReceiveMessage:
//ignore
default:
plog.Error().Str("address", state.address).Str("type", reflect.TypeOf(msg).String()).
Interface("msg", msg).Msg("EndpointWatcher received unknown message")
}
}