forked from Griesbacher/nagflux
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Worker.go
342 lines (316 loc) · 9.96 KB
/
Worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
package influx
import (
"bytes"
"crypto/tls"
"errors"
"io/ioutil"
"net/http"
"os"
"strconv"
"sync"
"time"
"github.com/ConSol/nagflux/collector"
"github.com/ConSol/nagflux/collector/nagflux"
"github.com/ConSol/nagflux/data"
"github.com/ConSol/nagflux/helper"
"github.com/ConSol/nagflux/logging"
"github.com/ConSol/nagflux/statistics"
"github.com/kdar/factorlog"
)
//Worker reads data from the queue and sends them to the influxdb.
type Worker struct {
workerID int
quit chan bool
quitInternal chan bool
jobs chan collector.Printable
connection string
dumpFile string
log *factorlog.FactorLog
version string
connector *Connector
httpClient http.Client
IsRunning bool
promServer statistics.PrometheusServer
target data.Target
stopReadingDataIfDown bool
}
const dataTimeout = time.Duration(5) * time.Second
var errorInterrupted = errors.New("Got interrupted")
var errorBadRequest = errors.New("400 Bad Request")
var errorHTTPClient = errors.New("Http Client got an error")
var errorFailedToSend = errors.New("Could not send data")
var error500 = errors.New("Error 500")
var mutex = &sync.Mutex{}
//WorkerGenerator generates a new Worker and starts it.
func WorkerGenerator(jobs chan collector.Printable, connection, dumpFile, version string,
connector *Connector, target data.Target, stopReadingDataIfDown bool) func(workerId int) *Worker {
return func(workerId int) *Worker {
//timeout := time.Duration(5 * time.Second)
timeout := connector.httpClient.Timeout
transport := &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
}
client := http.Client{Timeout: timeout, Transport: transport}
worker := &Worker{
workerID: workerId, quit: make(chan bool),
quitInternal: make(chan bool, 1), jobs: jobs,
connection: connection, dumpFile: nagflux.GenDumpfileName(dumpFile, target),
log: logging.GetLogger(), version: version,
connector: connector, httpClient: client, IsRunning: true, promServer: statistics.GetPrometheusServer(),
target: target, stopReadingDataIfDown: stopReadingDataIfDown,
}
go worker.run()
return worker
}
}
//Stop stops the worker
func (worker *Worker) Stop() {
worker.quitInternal <- true
worker.quit <- true
<-worker.quit
worker.IsRunning = false
worker.log.Debug("InfluxWorker(" + worker.target.Name + ") stopped")
}
//Tries to send data all the time.
func (worker Worker) run() {
var queries []collector.Printable
var query collector.Printable
for {
testConnector := false
switch {
case worker.stopReadingDataIfDown && !worker.connector.IsAlive():
testConnector = true
fallthrough
case worker.stopReadingDataIfDown && !worker.connector.DatabaseExists():
// wait for quit or test connector / database every 10 seconds
select {
case <-worker.quit:
worker.log.Debug("InfluxWorker(" + worker.target.Name + ") quitting...")
worker.quit <- true
return
case <-time.After(time.Duration(10) * time.Second):
if testConnector {
//Test Influxdb
test := worker.connector.TestIfIsAlive(worker.stopReadingDataIfDown)
worker.log.Trace("Retry TestIfIsAlive InfluxWorker(" + worker.target.Name + "): " + strconv.FormatBool(test))
} else {
//Test Database
test := worker.connector.TestDatabaseExists()
worker.log.Trace("Retry TestDatabaseExists InfluxWorker(" + worker.target.Name + "): " + strconv.FormatBool(test))
}
}
default:
// wait for quit or incoming jobs
select {
case <-worker.quit:
worker.log.Debug("InfluxWorker(" + worker.target.Name + ") quitting...")
worker.sendBuffer(queries)
worker.quit <- true
return
case query = <-worker.jobs:
test := query.TestTargetFilter(worker.target.Name)
worker.log.Trace("TestTargetFilter (" + worker.target.Name + "): " + strconv.FormatBool(test))
if test {
queries = append(queries, query)
if len(queries) == 500 {
worker.sendBuffer(queries)
queries = queries[:0]
}
}
case <-time.After(dataTimeout):
worker.sendBuffer(queries)
queries = queries[:0]
}
}
}
}
//Sends the given queries to the influxdb.
func (worker Worker) sendBuffer(queries []collector.Printable) {
if len(queries) == 0 {
return
}
var lineQueries []string
for _, query := range queries {
cast, castErr := worker.castJobToString(query)
if castErr == nil {
lineQueries = append(lineQueries, cast)
}
}
var dataToSend []byte
for _, lineQuery := range lineQueries {
dataToSend = append(dataToSend, []byte(lineQuery)...)
}
startTime := time.Now()
sendErr := worker.sendData([]byte(dataToSend), true)
if sendErr != nil {
worker.connector.TestIfIsAlive(worker.stopReadingDataIfDown)
worker.connector.TestDatabaseExists()
for i := 0; i < 2; i++ {
switch sendErr {
case errorBadRequest:
//Maybe just a few queries are wrong, so send them one by one and find the bad one
var badQueries []string
for _, lineQuery := range lineQueries {
queryErr := worker.sendData([]byte(lineQuery), false)
if queryErr != nil {
badQueries = append(badQueries, lineQuery)
}
}
worker.dumpErrorQueries("\n\nOne of the values is not clean..\n", badQueries)
sendErr = nil
case nil:
//Single point of exit
break
default:
if err := worker.waitForQuitOrGoOn(); err != nil {
//No error handling, because it's time to terminate
worker.dumpRemainingQueries(lineQueries)
sendErr = nil
}
//Resend Data
sendErr = worker.sendData([]byte(dataToSend), false)
}
}
if sendErr != nil {
//if there is still an error dump the queries and go on
worker.log.Infof("Dumping queries which couldn't be sent to: %s", worker.dumpFile)
worker.dumpQueries(worker.dumpFile, lineQueries)
}
}
worker.promServer.BytesSend.WithLabelValues("InfluxDB").Add(float64(len(lineQueries)))
timeDiff := float64(time.Since(startTime).Seconds() * 1000)
if timeDiff >= 0 {
worker.promServer.SendDuration.WithLabelValues("InfluxDB").Add(timeDiff)
}
}
//Reads the queries from the global queue and returns them as string.
func (worker Worker) readQueriesFromQueue() []string {
var queries []string
var query collector.Printable
stop := false
for !stop {
select {
case query = <-worker.jobs:
if query.TestTargetFilter(worker.target.Name) {
cast, err := worker.castJobToString(query)
if err == nil {
queries = append(queries, cast)
}
}
case <-time.After(time.Duration(200) * time.Millisecond):
stop = true
}
}
return queries
}
//sends the raw data to influxdb and returns an err if given.
func (worker Worker) sendData(rawData []byte, log bool) error {
if log {
worker.log.Debug("sendData (" + worker.target.Name + ")\n" + string(rawData))
}
req, err := http.NewRequest("POST", worker.connection, bytes.NewBuffer(rawData))
if err != nil {
worker.log.Warn(err)
}
req.Header.Set("User-Agent", "Nagflux")
resp, err := worker.httpClient.Do(req)
if err != nil {
worker.log.Warn(err)
return errorHTTPClient
}
defer resp.Body.Close()
worker.log.Debug(resp.Status)
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
//OK
return nil
} else if resp.StatusCode == 500 {
//Temporarily timeout
if log {
worker.logHTTPResponse(resp)
}
return error500
} else if resp.StatusCode == 400 {
//Bad Request
if log {
worker.logHTTPResponse(resp)
}
return errorBadRequest
}
//HTTP Error
if log {
worker.logHTTPResponse(resp)
}
return errorFailedToSend
}
//Logs a http response to warn.
func (worker Worker) logHTTPResponse(resp *http.Response) {
body, _ := ioutil.ReadAll(resp.Body)
worker.log.Warnf("Influx status: %s - %s", resp.Status, string(body))
}
//Waits on an internal quit signal.
func (worker Worker) waitForQuitOrGoOn() error {
select {
//Got stop signal
case <-worker.quitInternal:
worker.log.Debug("Received quit")
worker.quitInternal <- true
return errorInterrupted
//Timeout and retry
case <-time.After(time.Duration(10) * time.Second):
return nil
}
}
//Writes the bad queries to a dumpfile.
func (worker Worker) dumpErrorQueries(messageForLog string, errorQueries []string) {
errorFile := worker.dumpFile + "-errors"
worker.log.Warnf("Dumping queries with errors to: %s", errorFile)
errorQueries = append([]string{messageForLog}, errorQueries...)
worker.dumpQueries(errorFile, errorQueries)
}
//Dumps the remaining queries if a quit signal arises.
func (worker Worker) dumpRemainingQueries(remainingQueries []string) {
worker.log.Debugf("Global queue %d own queue %d", len(worker.jobs), len(remainingQueries))
if len(worker.jobs) != 0 || len(remainingQueries) != 0 {
worker.log.Debug("Saving queries to disk")
remainingQueries = append(remainingQueries, worker.readQueriesFromQueue()...)
worker.log.Debugf("dumping %d queries", len(remainingQueries))
worker.dumpQueries(worker.dumpFile, remainingQueries)
}
}
//Writes queries to a dumpfile.
func (worker Worker) dumpQueries(filename string, queries []string) {
mutex.Lock()
if _, err := os.Stat(filename); os.IsNotExist(err) {
if _, err := os.Create(filename); err != nil {
worker.log.Critical(err)
}
}
if f, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY, 0600); err != nil {
worker.log.Critical(err)
} else {
defer f.Close()
for _, query := range queries {
if _, err = f.WriteString(query); err != nil {
worker.log.Critical(err)
}
}
}
mutex.Unlock()
}
//Converts an collector.Printable to a string.
func (worker Worker) castJobToString(job collector.Printable) (string, error) {
var result string
var err error
if helper.VersionOrdinal(worker.version) >= helper.VersionOrdinal("0.9") {
result = job.PrintForInfluxDB(worker.version)
} else {
worker.log.Fatalf("This influxversion [%s] given in the config is not supported", worker.version)
err = errors.New("This influxversion given in the config is not supported")
}
if len(result) > 1 && result[len(result)-1:] != "\n" {
result += "\n"
}
return result, err
}