-
Notifications
You must be signed in to change notification settings - Fork 29
/
monitor.go
219 lines (201 loc) · 5.99 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
// Package monitor package handle the logging, collection and computation of
// statistical data. Every application can send some Measure (for the moment,
// we mostly measure the CPU time but it can be applied later for any kind of
// measures). The Monitor receives them and updates a Stats struct. This Stats
// struct can hold many different kinds of Measurements (the measure of a
// specific action such as "round time" or "verify time" etc). These
// measurements contain Values which compute the actual min/max/dev/avg values.
//
// The Proxy allows to relay Measure from
// clients to the listening Monitor. A starter feature is also the DataFilter
// which can apply some filtering rules to the data before making any
// statistics about them.
package monitor
import (
"encoding/json"
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"go.dedis.ch/onet/v3/log"
"golang.org/x/xerrors"
)
// This file handles the collection of measurements, aggregates them and
// write CSV file reports
// Sink is the address where to listen for the monitor. The endpoint can be a
// monitor.Proxy or a direct connection with measure.go
const Sink = "0.0.0.0"
// DefaultSinkPort is the default port where a monitor will listen and a proxy
// will contact the monitor.
const DefaultSinkPort = 10000
// Monitor struct is used to collect measures and make the statistics about
// them. It takes a stats object so it update that in a concurrent-safe manner
// for each new measure it receives.
type Monitor struct {
listener net.Listener
listenerLock *sync.Mutex
// Current conections
conns map[string]net.Conn
// and the mutex to play with it
mutexConn sync.Mutex
// Current stats
stats *Stats
buckets *BucketStats
// channel to give new measures
measures chan *singleMeasure
// channel to notify the end of a connection
// send the name of the connection when finishd
done chan string
SinkPort uint16
sinkPortChan chan uint16
}
// NewMonitor returns a new monitor given the stats
func NewMonitor(stats *Stats) *Monitor {
return &Monitor{
conns: make(map[string]net.Conn),
stats: stats,
buckets: newBucketStats(),
SinkPort: DefaultSinkPort,
measures: make(chan *singleMeasure),
done: make(chan string),
listenerLock: new(sync.Mutex),
sinkPortChan: make(chan uint16, 1),
}
}
// InsertBucket creates a bucket at the given index that will use the rules
// to filter the incoming measures
func (m *Monitor) InsertBucket(index int, rules []string, stats *Stats) {
m.buckets.Set(index, rules, stats)
}
// Listen will start listening for incoming connections on this address
// It needs the stats struct pointer to update when measures come
// Return an error if something went wrong during the connection setup
func (m *Monitor) Listen() error {
ln, err := net.Listen("tcp", Sink+":"+strconv.Itoa(int(m.SinkPort)))
if err != nil {
return xerrors.Errorf("Error while monitor is binding address: %v", err)
}
if m.SinkPort == 0 {
_, p, _ := net.SplitHostPort(ln.Addr().String())
var p2 uint16
fmt.Sscanf(p, "%d", &p2)
m.sinkPortChan <- p2
}
m.listenerLock.Lock()
m.listener = ln
m.listenerLock.Unlock()
log.Lvl2("Monitor listening for stats on", Sink, ":", m.SinkPort)
finished := false
go func() {
for {
if finished {
break
}
conn, err := ln.Accept()
if err != nil {
operr, ok := err.(*net.OpError)
// We cant accept anymore we closed the listener
if ok && operr.Op == "accept" {
break
}
log.Lvl2("Error while monitor accept connection:", operr)
continue
}
log.Lvl3("Monitor: new connection from", conn.RemoteAddr().String())
m.mutexConn.Lock()
m.conns[conn.RemoteAddr().String()] = conn
go m.handleConnection(conn)
m.mutexConn.Unlock()
}
}()
for !finished {
select {
// new stats
case measure := <-m.measures:
m.update(measure)
// end of a peer conn
case peer := <-m.done:
m.mutexConn.Lock()
log.Lvl3("Connections left:", len(m.conns))
delete(m.conns, peer)
// end of monitoring,
if len(m.conns) == 0 {
m.listenerLock.Lock()
if err := m.listener.Close(); err != nil {
log.Lvl2("Couldn't close listener:",
err)
}
m.listener = nil
finished = true
m.listenerLock.Unlock()
}
m.mutexConn.Unlock()
}
}
log.Lvl2("Monitor finished waiting")
m.mutexConn.Lock()
m.conns = make(map[string]net.Conn)
m.mutexConn.Unlock()
return nil
}
// Stop will close every connections it has
// And will stop updating the stats
func (m *Monitor) Stop() {
log.Lvl2("Monitor Stop")
m.listenerLock.Lock()
if m.listener != nil {
if err := m.listener.Close(); err != nil {
log.Error("Couldn't close listener:", err)
}
}
m.listenerLock.Unlock()
m.mutexConn.Lock()
for _, c := range m.conns {
if err := c.Close(); err != nil {
log.Error("Couldn't close connection:", err)
}
}
m.mutexConn.Unlock()
}
// handleConnection will decode the data received and aggregates it into its
// stats
func (m *Monitor) handleConnection(conn net.Conn) {
dec := json.NewDecoder(conn)
nerr := 0
for {
measure := &singleMeasure{}
if err := dec.Decode(measure); err != nil {
// if end of connection
if err == io.EOF || strings.Contains(err.Error(), "closed") {
break
}
// otherwise log it
log.Lvl2("Error: monitor decoding from", conn.RemoteAddr().String(), ":", err)
nerr++
if nerr > 1 {
log.Lvl2("Monitor: too many errors from", conn.RemoteAddr().String(), ": Abort.")
break
}
}
log.Lvlf3("Monitor: received a Measure from %s: %+v", conn.RemoteAddr().String(), measure)
// Special case where the measurement is indicating a FINISHED step
switch strings.ToLower(measure.Name) {
case "end":
log.Lvl3("Finishing monitor")
break
default:
m.measures <- measure
}
}
m.done <- conn.RemoteAddr().String()
}
// updateBucket will add that specific measure to all the bucket
// that match the network address.
func (m *Monitor) update(meas *singleMeasure) {
// global stats
m.stats.Update(meas)
// per bucket stats if defined
m.buckets.Update(meas)
}