forked from asynkron/protoactor-go
/
endpoint_manager.go
100 lines (88 loc) · 2.65 KB
/
endpoint_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package remote
import (
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/eventstream"
"github.com/AsynkronIT/protoactor-go/mailbox"
)
var endpointManagerPID *actor.PID
func newEndpointManager(config *remoteConfig) actor.Producer {
return func() actor.Actor {
return &endpointManager{
config: config,
}
}
}
func subscribeEndpointManager() {
eventstream.
Subscribe(endpointManagerPID.Tell).
WithPredicate(func(m interface{}) bool {
_, ok := m.(*EndpointTerminatedEvent)
return ok
})
}
func spawnEndpointManager(config *remoteConfig) {
props := actor.
FromProducer(newEndpointManager(config)).
WithMailbox(mailbox.Bounded(config.endpointManagerQueueSize)).
WithSupervisor(actor.RestartingSupervisorStrategy())
endpointManagerPID = actor.Spawn(props)
}
type endpoint struct {
writer *actor.PID
watcher *actor.PID
}
type endpointManager struct {
connections map[string]*endpoint
config *remoteConfig
}
func (state *endpointManager) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *actor.Started:
state.connections = make(map[string]*endpoint)
plog.Debug("Started EndpointManager")
case *EndpointTerminatedEvent:
address := msg.Address
endpoint := state.ensureConnected(address, ctx)
endpoint.watcher.Tell(msg)
case *remoteTerminate:
address := msg.Watchee.Address
endpoint := state.ensureConnected(address, ctx)
endpoint.watcher.Tell(msg)
case *remoteWatch:
address := msg.Watchee.Address
endpoint := state.ensureConnected(address, ctx)
endpoint.watcher.Tell(msg)
case *remoteUnwatch:
address := msg.Watchee.Address
endpoint := state.ensureConnected(address, ctx)
endpoint.watcher.Tell(msg)
case *remoteDeliver:
address := msg.target.Address
endpoint := state.ensureConnected(address, ctx)
endpoint.writer.Tell(msg)
}
}
func (state *endpointManager) ensureConnected(address string, ctx actor.Context) *endpoint {
e, ok := state.connections[address]
if !ok {
e = &endpoint{
writer: state.spawnEndpointWriter(address, ctx),
watcher: state.spawnEndpointWatcher(address, ctx),
}
state.connections[address] = e
}
return e
}
func (state *endpointManager) spawnEndpointWriter(address string, ctx actor.Context) *actor.PID {
props := actor.
FromProducer(newEndpointWriter(address, state.config)).
WithMailbox(newEndpointWriterMailbox(state.config.endpointWriterBatchSize, state.config.endpointWriterQueueSize))
pid := ctx.Spawn(props)
return pid
}
func (state *endpointManager) spawnEndpointWatcher(address string, ctx actor.Context) *actor.PID {
props := actor.
FromProducer(newEndpointWatcher(address))
pid := ctx.Spawn(props)
return pid
}