/
influxdb_logger.go
127 lines (104 loc) · 3.11 KB
/
influxdb_logger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package daemon
import (
"time"
"fmt"
"github.com/Polarishq/middleware/framework/log"
"github.com/declanshanaghy/bbqberry/db/influxdb"
"github.com/declanshanaghy/bbqberry/hardware"
"github.com/declanshanaghy/bbqberry/models"
"github.com/declanshanaghy/bbqberry/framework"
)
// influxDBLogger collects and logs temperature metrics
type influxDBLogger struct {
period time.Duration
reader hardware.TemperatureReader
probes *[]int32
}
// newInfluxDBLogger creates a new influxDBLogger instance which can be
// run in the background to collect and log temperature metrics
func newInfluxDBLoggerRunnable() RunnableIFC {
return newRunnable(newInfluxDBLogger())
}
func newInfluxDBLogger() *influxDBLogger {
reader := hardware.NewTemperatureReader()
probes := framework.Config.GetEnabledProbeIndexes()
return &influxDBLogger{
reader: reader,
probes: probes,
period: time.Second,
}
}
func (o *influxDBLogger) getPeriod() time.Duration {
return o.period
}
func (o *influxDBLogger) setPeriod(period time.Duration) {
o.period = period
}
// GetName returns a human readable name for this background task
func (o *influxDBLogger) GetName() string {
return "influxDBLogger"
}
// Start performs initialization before the first tick
func (o *influxDBLogger) start() error {
return o.tick()
}
// Stop performs cleanup when the goroutine is exiting
func (o *influxDBLogger) stop() error {
return nil
}
// Tick executes on a ticker schedule
func (o *influxDBLogger) tick() error {
readings, err := o.collectTemperatureMetrics()
if err != nil {
return err
}
err = o.logTemperatureMetrics(readings)
if err != nil {
return err
}
return nil
}
func (o *influxDBLogger) collectTemperatureMetrics() ([]*models.TemperatureReading, error) {
nProbes := len(*o.probes)
log.WithField("nProbes", nProbes).Debug("collecting temperature readings")
readings := make([]*models.TemperatureReading, 0)
for _, i := range(*o.probes) {
reading, err := o.reader.GetTemperatureReading(i)
if err != nil {
return nil, err
}
readings = append(readings, reading)
}
return readings, nil
}
func (o *influxDBLogger) logTemperatureMetrics(readings []*models.TemperatureReading) error {
log.WithField("numReadings", len(readings)).Debug("logging temperature metrics")
for _, reading := range readings {
probe := framework.Config.Hardware.Probes[*reading.Probe]
if err := o.writeToInflux(reading, probe); err != nil {
log.WithField("err", err).Error("Unable to write to InfluxDB")
}
}
return nil
}
func (o *influxDBLogger) writeToInflux(reading *models.TemperatureReading, probe *models.TemperatureProbe) error {
tags := map[string]string{
"Probe": fmt.Sprintf("%d", *reading.Probe),
"Label": *probe.Label,
}
fields := map[string]interface{}{
"Celsius": *reading.Celsius,
"Fahrenheit": *reading.Fahrenheit,
"Kelvin": *reading.Kelvin,
"Warning": reading.Warning,
}
log.WithFields(log.Fields{
"Label": *probe.Label,
"Probe": *reading.Probe,
"Fahrenheit": *reading.Fahrenheit,
}).Debugf("Logging temperature to InfluxDB")
if _, err := influxdb.WritePoint("temp", tags, fields); err != nil {
return err
}
return nil
}