/
gateway_polling.go
118 lines (105 loc) · 2.87 KB
/
gateway_polling.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package data_sources
import (
"encoding/json"
"io"
"net/http"
"strings"
"time"
"github.com/Scrin/RuuviBridge/config"
"github.com/Scrin/RuuviBridge/parser"
log "github.com/sirupsen/logrus"
)
type gatewayHistoryTag struct {
Rssi int64 `json:"rssi"`
Timestamp int64 `json:"timestamp"`
Data string `json:"data"`
}
// seems to be emitted only if the authentication fails
type gatewayInfo struct {
GatewayName string `json:"gateway_name"`
}
type gatewayHistory struct {
Data struct {
GwMac string `json:"gw_mac"`
Tags map[string]gatewayHistoryTag `json:"tags"`
} `json:"data"`
}
func StartGatewayPolling(conf config.GatewayPolling, measurements chan<- parser.Measurement) chan<- bool {
interval := conf.Interval
if interval == 0 {
interval = 10 * time.Second
}
log := log.WithFields(log.Fields{
"target": conf.GatewayUrl,
"interval": interval,
})
log.Info("Starting gateway polling")
stop := make(chan bool)
go gatewayPoller(conf.GatewayUrl, conf.BearerToken, interval, measurements, stop, log)
return stop
}
func gatewayPoller(url string, bearer_token string, interval time.Duration, measurements chan<- parser.Measurement, stop <-chan bool, log *log.Entry) {
seenTags := make(map[string]int64)
poll(url, bearer_token, measurements, seenTags, log)
for {
select {
case <-stop:
return
case <-time.After(interval):
poll(url, bearer_token, measurements, seenTags, log)
}
}
}
func poll(url string, bearer_token string, measurements chan<- parser.Measurement, seenTags map[string]int64, log *log.Entry) {
req, err := http.NewRequest("GET", url+"/history", nil)
if err != nil {
log.WithError(err).Error("Failed to construct GET request")
return
}
if bearer_token != "" {
req.Header.Add("Authorization", "Bearer "+bearer_token)
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.WithError(err).Error("Failed to get history from gateway")
return
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
log.WithError(err).Error("Failed to read data from gateway")
return
}
var gatewayInfo gatewayInfo
err = json.Unmarshal(body, &gatewayInfo)
if err != nil {
log.WithError(err).Error("Failed to deserialize gateway data")
return
}
if len(gatewayInfo.GatewayName) > 0 {
log.Error("Failed to authenticate")
return
}
var gatewayHistory gatewayHistory
err = json.Unmarshal(body, &gatewayHistory)
if err != nil {
log.WithError(err).Error("Failed to deserialize gateway data")
return
}
for mac, data := range gatewayHistory.Data.Tags {
mac = strings.ToUpper(mac)
timestamp := data.Timestamp
if seenTags[mac] == timestamp {
continue
}
seenTags[mac] = timestamp
measurement, ok := parser.Parse(data.Data)
if ok {
measurement.Mac = mac
measurement.Rssi = &data.Rssi
measurement.Timestamp = ×tamp
measurements <- measurement
}
}
}