-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
listener1_2.go
79 lines (66 loc) · 1.68 KB
/
listener1_2.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package agent
import (
"encoding/gob"
"net"
"sync"
"github.com/cilium/cilium/pkg/monitor/agent/listener"
"github.com/cilium/cilium/pkg/monitor/payload"
)
// listenerv1_2 implements the cilium-node-monitor API protocol compatible with
// cilium 1.2
// cleanupFn is called on exit
type listenerv1_2 struct {
conn net.Conn
queue chan *payload.Payload
cleanupFn func(listener.MonitorListener)
// Used to prevent queue from getting closed multiple times.
once sync.Once
}
func newListenerv1_2(c net.Conn, queueSize int, cleanupFn func(listener.MonitorListener)) *listenerv1_2 {
ml := &listenerv1_2{
conn: c,
queue: make(chan *payload.Payload, queueSize),
cleanupFn: cleanupFn,
}
go ml.drainQueue()
return ml
}
func (ml *listenerv1_2) Enqueue(pl *payload.Payload) {
select {
case ml.queue <- pl:
default:
log.Debug("Per listener queue is full, dropping message")
}
}
// drainQueue encodes and sends monitor payloads to the listener. It is
// intended to be a goroutine.
func (ml *listenerv1_2) drainQueue() {
defer func() {
ml.cleanupFn(ml)
}()
enc := gob.NewEncoder(ml.conn)
for pl := range ml.queue {
if err := pl.EncodeBinary(enc); err != nil {
switch {
case listener.IsDisconnected(err):
log.Debug("Listener disconnected")
return
default:
log.WithError(err).Warn("Removing listener due to write failure")
return
}
}
}
}
func (ml *listenerv1_2) Version() listener.Version {
return listener.Version1_2
}
// Close closes the underlying socket and payload queue.
func (ml *listenerv1_2) Close() {
ml.once.Do(func() {
ml.conn.Close()
close(ml.queue)
})
}