-
Notifications
You must be signed in to change notification settings - Fork 30
/
Collector.go
261 lines (241 loc) · 8.09 KB
/
Collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
package livestatus
import (
"fmt"
"github.com/griesbacher/nagflux/collector"
"github.com/griesbacher/nagflux/config"
"github.com/griesbacher/nagflux/data"
"github.com/griesbacher/nagflux/logging"
"github.com/kdar/factorlog"
"regexp"
"time"
)
//Collector fetches data from livestatus.
type Collector struct {
quit chan bool
jobs collector.ResultQueues
livestatusConnector *Connector
log *factorlog.FactorLog
logQuery string
}
const (
//Updateinterval on livestatus data for Icinga2.
intervalToCheckLivestatus = time.Duration(2) * time.Minute
QueryLivestatusVersion = `GET status
Columns: livestatus_version
OutputFormat: csv
`
//QueryIcinga2ForNotifications livestatusquery for notifications with Icinga2 Livestatus.
QueryIcinga2ForNotifications = `GET log
Columns: type time contact_name message
Filter: type ~ .*NOTIFICATION
Filter: time < %d
Negate:
OutputFormat: csv
`
//QueryNagiosForNotifications livestatusquery for notifications with nagioslike Livestatus.
QueryNagiosForNotifications = `GET log
Columns: type time contact_name message
Filter: type ~ .*NOTIFICATION
Filter: time > %d
OutputFormat: csv
`
//QueryForComments livestatusquery for comments
QueryForComments = `GET comments
Columns: host_name service_display_name comment entry_time author entry_type
Filter: entry_time > %d
OutputFormat: csv
`
//QueryForDowntimes livestatusquery for downtimes
QueryForDowntimes = `GET downtimes
Columns: host_name service_display_name comment entry_time author end_time
Filter: entry_time > %d
OutputFormat: csv
`
//Nagios nagioslike Livestatus
Nagios = iota
//Icinga2 icinga2like Livestatus
Icinga2 = iota
//Naemon naemonlike Livestatus
Naemon = iota
)
//NewLivestatusCollector constructor, which also starts it immediately.
func NewLivestatusCollector(jobs collector.ResultQueues, livestatusConnector *Connector, detectVersion bool) *Collector {
live := &Collector{make(chan bool, 2), jobs, livestatusConnector, logging.GetLogger(), QueryNagiosForNotifications}
if detectVersion {
switch getLivestatusVersion(live) {
case Nagios:
live.log.Info("Livestatus type: Nagios")
live.logQuery = QueryNagiosForNotifications
case Icinga2:
live.log.Info("Livestatus type: Icinga2")
live.logQuery = QueryIcinga2ForNotifications
case Naemon:
live.log.Info("Livestatus type: Naemon")
live.logQuery = QueryNagiosForNotifications
}
}
go live.run()
return live
}
//Stop signals the collector to stop.
func (live *Collector) Stop() {
live.quit <- true
<-live.quit
live.log.Debug("LivestatusCollector stoped")
}
//Loop which checks livestats for data or waits to quit.
func (live Collector) run() {
live.queryData()
for {
select {
case <-live.quit:
live.quit <- true
return
case <-time.After(intervalToCheckLivestatus):
live.queryData()
}
}
}
//Queries livestatus and returns the data to the gobal queue
func (live Collector) queryData() {
printables := make(chan collector.Printable)
finished := make(chan bool)
go live.requestPrintablesFromLivestatus(live.logQuery, true, printables, finished)
go live.requestPrintablesFromLivestatus(QueryForComments, true, printables, finished)
go live.requestPrintablesFromLivestatus(QueryForDowntimes, true, printables, finished)
jobsFinished := 0
for jobsFinished < 3 {
select {
case job := <-printables:
for _, j := range live.jobs {
j <- job
}
case <-finished:
jobsFinished++
case <-time.After(intervalToCheckLivestatus):
live.log.Warn("Livestatus timed out... (Collector.queryData())")
}
}
}
func (live Collector) requestPrintablesFromLivestatus(query string, addTimestampToQuery bool, printables chan collector.Printable, outerFinish chan bool) {
queryWithTimestamp := query
if addTimestampToQuery {
queryWithTimestamp = addTimestampToLivestatusQuery(query)
}
csv := make(chan []string)
finished := make(chan bool)
go live.livestatusConnector.connectToLivestatus(queryWithTimestamp, csv, finished)
for {
select {
case line := <-csv:
switch query {
case QueryNagiosForNotifications:
if printable := live.handleQueryForNotifications(line); printable != nil {
printables <- printable
} else {
live.log.Warn("The notification type is unknown:" + line[0])
}
case QueryIcinga2ForNotifications:
if printable := live.handleQueryForNotifications(line); printable != nil {
printables <- printable
} else {
live.log.Warn("The notification type is unknown:" + line[0])
}
case QueryForComments:
if len(line) == 6 {
printables <- CommentData{collector.AllFilterable, Data{line[0], line[1], line[2], line[3], line[4]}, line[5]}
} else {
live.log.Warn("QueryForComments out of range", line)
}
case QueryForDowntimes:
if len(line) == 6 {
printables <- DowntimeData{collector.AllFilterable, Data{line[0], line[1], line[2], line[3], line[4]}, line[5]}
} else {
live.log.Warn("QueryForDowntimes out of range", line)
}
case QueryLivestatusVersion:
if len(line) == 1 {
printables <- collector.SimplePrintable{Filterable: collector.AllFilterable, Text: line[0], Datatype: data.InfluxDB}
} else {
live.log.Warn("QueryLivestatusVersion out of range", line)
}
default:
live.log.Fatal("Found unknown query type" + query)
}
case result := <-finished:
outerFinish <- result
return
case <-time.After(intervalToCheckLivestatus / 3):
live.log.Warn("connectToLivestatus timed out")
}
}
}
func addTimestampToLivestatusQuery(query string) string {
return fmt.Sprintf(query, time.Now().Add(intervalToCheckLivestatus/100*-150).Unix())
}
func (live Collector) handleQueryForNotifications(line []string) *NotificationData {
switch line[0] {
case "HOST NOTIFICATION":
if len(line) == 10 {
//Custom: host_name, "", message, timestamp, author, notification_type, state
return &NotificationData{collector.AllFilterable, Data{line[4], "", line[9], line[1], line[8]}, line[0], line[5]}
} else if len(line) == 9 {
return &NotificationData{collector.AllFilterable, Data{line[4], "", line[7], line[1], line[2]}, line[0], line[5]}
} else if len(line) == 8 {
return &NotificationData{collector.AllFilterable, Data{line[4], "", line[7], line[1], line[2]}, line[0], line[5]}
} else {
live.log.Warn("HOST NOTIFICATION, undefinded linelenght: ", len(line), " Line:", line)
}
case "SERVICE NOTIFICATION":
if len(line) == 11 {
//Custom
return &NotificationData{collector.AllFilterable, Data{line[4], line[5], line[10], line[1], line[9]}, line[0], line[6]}
} else if len(line) == 10 || len(line) == 9 {
return &NotificationData{collector.AllFilterable, Data{line[4], line[5], line[8], line[1], line[2]}, line[0], line[6]}
} else {
live.log.Warn("SERVICE NOTIFICATION, undefinded linelenght: ", len(line), " Line:", line)
}
}
return nil
}
func getLivestatusVersion(live *Collector) int {
printables := make(chan collector.Printable, 1)
finished := make(chan bool, 1)
var version string
live.requestPrintablesFromLivestatus(QueryLivestatusVersion, false, printables, finished)
i := 0
oneMinute := time.Duration(1) * time.Minute
roundsToWait := config.GetConfig().Livestatus.MinutesToWait
Loop:
for roundsToWait != 0 {
select {
case versionPrintable := <-printables:
version = versionPrintable.PrintForInfluxDB("0")
break Loop
case <-time.After(oneMinute):
if i < roundsToWait {
go live.requestPrintablesFromLivestatus(QueryLivestatusVersion, false, printables, finished)
} else {
break Loop
}
i++
case fin := <-finished:
if !fin {
live.log.Infof(
"Could not detect livestatus version, waiting for %s %d times( %d/%d )...",
oneMinute, roundsToWait, i, roundsToWait,
)
}
}
}
live.log.Info("Livestatus version: ", version)
if icinga2, _ := regexp.MatchString(`^r[\d\.-]+$`, version); icinga2 {
return Icinga2
} else if nagios, _ := regexp.MatchString(`^[\d\.]+p[\d\.]+$`, version); nagios {
return Nagios
} else if neamon, _ := regexp.MatchString(`^[\d\.]+(-naemon)?$`, version); neamon {
return Naemon
}
live.log.Warn("Could not detect livestatus type, with version: ", version, ". Asuming Nagios")
return -1
}