/
nagiosSpoolfileCollector.go
106 lines (94 loc) · 3.19 KB
/
nagiosSpoolfileCollector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package spoolfile
import (
"github.com/griesbacher/nagflux/collector"
"github.com/griesbacher/nagflux/collector/livestatus"
"github.com/griesbacher/nagflux/config"
"github.com/griesbacher/nagflux/logging"
"github.com/griesbacher/nagflux/statistics"
"io/ioutil"
"path"
"time"
)
const (
//MinFileAge is the duration to wait, before the files are parsed
MinFileAge = time.Duration(10) * time.Second
//IntervalToCheckDirectory the interval to check if there are new files
IntervalToCheckDirectory = time.Duration(5) * time.Second
)
//NagiosSpoolfileCollector scans the nagios spoolfile folder and delegates the files to its workers.
type NagiosSpoolfileCollector struct {
quit chan bool
jobs chan string
spoolDirectory string
workers []*NagiosSpoolfileWorker
}
//NagiosSpoolfileCollectorFactory creates the give amount of Woker and starts them.
func NagiosSpoolfileCollectorFactory(spoolDirectory string, workerAmount int, results collector.ResultQueues,
livestatusCacheBuilder *livestatus.CacheBuilder, fileBufferSize int, defaultTarget collector.Filterable) *NagiosSpoolfileCollector {
s := &NagiosSpoolfileCollector{
quit: make(chan bool),
jobs: make(chan string, 100),
spoolDirectory: spoolDirectory,
workers: make([]*NagiosSpoolfileWorker, workerAmount),
}
gen := NagiosSpoolfileWorkerGenerator(s.jobs, results, livestatusCacheBuilder, fileBufferSize, defaultTarget)
for w := 0; w < workerAmount; w++ {
s.workers[w] = gen()
}
go s.run()
return s
}
//Stop stops his workers and itself.
func (s *NagiosSpoolfileCollector) Stop() {
s.quit <- true
<-s.quit
for _, worker := range s.workers {
worker.Stop()
}
logging.GetLogger().Debug("SpoolfileCollector stopped")
}
//Delegates the files to its workers.
func (s *NagiosSpoolfileCollector) run() {
promServer := statistics.GetPrometheusServer()
for {
select {
case <-s.quit:
s.quit <- true
return
case <-time.After(IntervalToCheckDirectory):
pause := config.IsAnyTargetOnPause()
if pause {
logging.GetLogger().Debugln("NagiosSpoolfileCollector in pause")
continue
}
logging.GetLogger().Debug("Reading Directory: ", s.spoolDirectory)
files, _ := ioutil.ReadDir(s.spoolDirectory)
promServer.SpoolFilesOnDisk.Set(float64(len(files)))
for _, currentFile := range files {
select {
case <-s.quit:
s.quit <- true
return
case s.jobs <- path.Join(s.spoolDirectory, currentFile.Name()):
case <-time.After(time.Duration(1) * time.Minute):
logging.GetLogger().Warn("NagiosSpoolfileCollector: Could not write to buffer")
}
}
}
}
}
//FilesInDirectoryOlderThanX returns a list of file, of a folder, names which are older then a certain duration.
func FilesInDirectoryOlderThanX(folder string, age time.Duration) []string {
files, _ := ioutil.ReadDir(folder)
var oldFiles []string
for _, currentFile := range files {
if IsItTime(currentFile.ModTime(), age) {
oldFiles = append(oldFiles, path.Join(folder, currentFile.Name()))
}
}
return oldFiles
}
//IsItTime checks if the timestamp plus duration is in the past.
func IsItTime(timeStamp time.Time, duration time.Duration) bool {
return time.Now().After(timeStamp.Add(duration))
}