-
Notifications
You must be signed in to change notification settings - Fork 0
/
events.go
137 lines (114 loc) · 3.49 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package cluster
import (
"context"
"sync"
"time"
lxd "github.com/lxc/lxd/client"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/endpoints"
"github.com/lxc/lxd/lxd/task"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
"github.com/lxc/lxd/shared/logger"
)
var listeners = map[string]*lxd.EventListener{}
var listenersLock sync.Mutex
// Events starts a task that continuously monitors the list of cluster nodes and
// maintains a pool of websocket connections against all of them, in order to
// get notified about events.
//
// Whenever an event is received the given callback is invoked.
func Events(endpoints *endpoints.Endpoints, cluster *db.Cluster, f func(int64, api.Event)) (task.Func, task.Schedule) {
// Update our pool of event listeners. Since database queries are
// blocking, we spawn the actual logic in a goroutine, to abort
// immediately when we receive the stop signal.
update := func(ctx context.Context) {
ch := make(chan struct{})
go func() {
eventsUpdateListeners(endpoints, cluster, f)
ch <- struct{}{}
}()
select {
case <-ch:
case <-ctx.Done():
}
}
schedule := task.Every(time.Second)
return update, schedule
}
func eventsUpdateListeners(endpoints *endpoints.Endpoints, cluster *db.Cluster, f func(int64, api.Event)) {
// Get the current cluster nodes.
var nodes []db.NodeInfo
var offlineThreshold time.Duration
err := cluster.Transaction(func(tx *db.ClusterTx) error {
var err error
nodes, err = tx.Nodes()
if err != nil {
return err
}
offlineThreshold, err = tx.NodeOfflineThreshold()
if err != nil {
return err
}
return nil
})
if err != nil {
logger.Warnf("Failed to get current cluster nodes: %v", err)
return
}
if len(nodes) == 1 {
return // Either we're not clustered or this is a single-node cluster
}
address := endpoints.NetworkAddress()
addresses := make([]string, len(nodes))
for i, node := range nodes {
addresses[i] = node.Address
// Don't bother trying to connect to offline nodes, or to ourselves.
if node.IsOffline(offlineThreshold) || node.Address == address {
continue
}
listenersLock.Lock()
listener, ok := listeners[node.Address]
// The node has already a listener associated to it.
if ok {
// Double check that the listener is still
// connected. If it is, just move on, other
// we'll try to connect again.
if listeners[node.Address].IsActive() {
listenersLock.Unlock()
continue
}
delete(listeners, node.Address)
}
listenersLock.Unlock()
listener, err := eventsConnect(node.Address, endpoints.NetworkCert())
if err != nil {
logger.Warnf("Failed to get events from node %s: %v", node.Address, err)
continue
}
logger.Debugf("Listening for events on node %s", node.Address)
listener.AddHandler(nil, func(event api.Event) { f(node.ID, event) })
listenersLock.Lock()
listeners[node.Address] = listener
listenersLock.Unlock()
}
listenersLock.Lock()
for address, listener := range listeners {
if !shared.StringInSlice(address, addresses) {
listener.Disconnect()
delete(listeners, address)
}
}
listenersLock.Unlock()
}
// Establish a client connection to get events from the given node.
func eventsConnect(address string, cert *shared.CertInfo) (*lxd.EventListener, error) {
client, err := Connect(address, cert, true)
if err != nil {
return nil, err
}
// Set the project to the special wildcard in order to get notified
// about all events across all projects.
client = client.UseProject("*")
return client.GetEvents()
}