/
eventnodelistener.go
91 lines (77 loc) · 2.22 KB
/
eventnodelistener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package service
import (
"context"
"log"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/docker-flow/docker-flow-swarm-listener/metrics"
)
// NodeListening listens to node events
type NodeListening interface {
ListenForNodeEvents(eventChan chan<- Event)
}
// NodeListener listens for docker node events
type NodeListener struct {
dockerClient *client.Client
log *log.Logger
}
// NewNodeListener creates a `NodeListener``
func NewNodeListener(c *client.Client, logger *log.Logger) *NodeListener {
return &NodeListener{dockerClient: c, log: logger}
}
// ListenForNodeEvents listens for events and places them on channels
func (s NodeListener) ListenForNodeEvents(
eventChan chan<- Event) {
go func() {
filter := filters.NewArgs()
filter.Add("type", "node")
msgStream, msgErrs := s.dockerClient.Events(
context.Background(), types.EventsOptions{Filters: filter})
for {
select {
case msg := <-msgStream:
if !s.validEventNode(msg) {
continue
}
eventType := s.getEventType(msg)
eventChan <- Event{
Type: eventType,
ID: msg.Actor.ID,
TimeNano: msg.TimeNano,
ConsultCache: true,
}
case err := <-msgErrs:
s.log.Printf("%v, Restarting docker event stream", err)
metrics.RecordError("ListenForNodeEvents")
time.Sleep(time.Second)
// Reopen event stream
msgStream, msgErrs = s.dockerClient.Events(
context.Background(), types.EventsOptions{Filters: filter})
}
}
}()
}
// validEventNode returns true when event is valid (should be passed through)
// this will still allow through 4-5 events from changing a worker node
// to a manager node or vise versa.
func (s NodeListener) validEventNode(msg events.Message) bool {
if msg.Action == "remove" {
return true
}
if name, ok := msg.Actor.Attributes["name"]; !ok || len(name) == 0 {
return false
}
return true
}
func (s NodeListener) getEventType(msg events.Message) EventType {
if msg.Action == "remove" {
return EventTypeRemove
}
if name := msg.Actor.Attributes["state.new"]; name == "down" {
return EventTypeRemove
}
return EventTypeCreate
}