/
routeractor_pool.go
73 lines (63 loc) · 1.77 KB
/
routeractor_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package router
import (
"sync"
"time"
"github.com/AsynkronIT/protoactor-go/actor"
)
type poolRouterActor struct {
props *actor.Props
config RouterConfig
state State
wg *sync.WaitGroup
}
func (a *poolRouterActor) Receive(context actor.Context) {
switch m := context.Message().(type) {
case *actor.Started:
a.config.OnStarted(context, a.props, a.state)
a.wg.Done()
case *AddRoutee:
r := a.state.GetRoutees()
if r.Contains(m.PID) {
return
}
context.Watch(m.PID)
r.Add(m.PID)
a.state.SetRoutees(r)
case *RemoveRoutee:
r := a.state.GetRoutees()
if !r.Contains(m.PID) {
return
}
context.Unwatch(m.PID)
r.Remove(m.PID)
a.state.SetRoutees(r)
// sleep for 1ms before sending the poison pill
// This is to give some time to the routee actor receive all
// the messages. Specially due to the synchronization conditions in
// consistent hash router, where a copy of hmc can be obtained before
// the update and cause messages routed to a dead routee if there is no
// delay. This is a best effort approach and 1ms seems to be acceptable
// in terms of both delay it cause to the router actor and the time it
// provides for the routee to receive messages before it dies.
time.Sleep(time.Millisecond * 1)
context.Send(m.PID, &actor.PoisonPill{})
case *BroadcastMessage:
msg := m.Message
sender := context.Sender()
a.state.GetRoutees().ForEach(func(i int, pid *actor.PID) {
context.RequestWithCustomSender(pid, msg, sender)
})
case *GetRoutees:
r := a.state.GetRoutees()
routees := make([]*actor.PID, r.Len())
r.ForEach(func(i int, pid *actor.PID) {
routees[i] = pid
})
context.Respond(&Routees{routees})
case *actor.Terminated:
r := a.state.GetRoutees()
if r.Remove(m.Who) {
a.state.SetRoutees(r)
}
}
}