/
discoverd_event_handler.go
105 lines (94 loc) · 2.63 KB
/
discoverd_event_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package agent
import (
"bytes"
"github.com/bluefw/blued/discoverd"
"github.com/bluefw/blued/discoverd/api"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/serf/serf"
"log"
)
const (
RSCommand = "rs"
URSCommand = "us"
QRPCAddrCommand = "qr"
)
type DiscoverdEventHandler struct {
discoverd *discoverd.Discoverd
config *Config
logger *log.Logger
}
func NewDiscoverdEventHandler(ds *discoverd.Discoverd, config *Config, logger *log.Logger) EventHandler {
return &DiscoverdEventHandler{
discoverd: ds,
config: config,
logger: logger,
}
}
func (h *DiscoverdEventHandler) HandleEvent(e serf.Event) {
var err error
switch event := e.(type) {
case serf.MemberEvent:
if event.EventType() == serf.EventMemberFailed {
err = h.onMemberFaild(event)
} else if event.EventType() == serf.EventMemberLeave {
err = h.onMemberLeave(event)
}
case serf.UserEvent:
err = h.onUserEvent(event)
case *serf.Query:
err = h.onQuery(event)
default:
h.logger.Printf("[INFO] ds.event: Unknown event type: %s", e.EventType().String())
}
if err != nil {
h.logger.Printf("[ERR] ds.event: Failed to handle event %v:%v", e, err)
}
}
func (h *DiscoverdEventHandler) onMemberFaild(e serf.MemberEvent) error {
h.logger.Printf("[INFO] ds.event: Handle member faild event.")
for _, m := range e.Members {
h.discoverd.RemoveRouterByHost(m.Name)
}
return nil
}
func (h *DiscoverdEventHandler) onMemberLeave(e serf.MemberEvent) error {
h.logger.Printf("[INFO] ds.event: Handle member leave event.")
for _, m := range e.Members {
h.discoverd.RemoveRouterByHost(m.Name)
}
return nil
}
func (h *DiscoverdEventHandler) onUserEvent(e serf.UserEvent) error {
switch e.Name {
case RSCommand:
var ias api.InnerAppService
dec := codec.NewDecoder(bytes.NewReader(e.Payload), &codec.MsgpackHandle{})
if err := dec.Decode(&ias); err != nil {
return err
}
h.registerService(&ias)
case URSCommand:
var addr string
dec := codec.NewDecoder(bytes.NewReader(e.Payload), &codec.MsgpackHandle{})
if err := dec.Decode(&addr); err != nil {
return err
}
h.unregisterService(addr)
}
return nil
}
func (h *DiscoverdEventHandler) onQuery(e *serf.Query) error {
h.logger.Printf("[INFO] rpc:%s,e.Name:%s ", h.config.RPCAddr, e.Name)
switch e.Name {
case QRPCAddrCommand:
h.logger.Printf("[INFO] rpc:%s ", h.config.RPCAddr)
e.Respond([]byte(h.config.RPCAddr))
}
return nil
}
func (h *DiscoverdEventHandler) registerService(ias *api.InnerAppService) {
h.discoverd.AddRouter(ias.NodeAddr.Node, ias.NodeAddr.Addr, ias.Services)
}
func (h *DiscoverdEventHandler) unregisterService(addr string) {
h.discoverd.RemoveRouter(addr)
}