-
Notifications
You must be signed in to change notification settings - Fork 4
/
controller.go
393 lines (347 loc) · 13.2 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
package controller
import (
"fmt"
"log/slog"
"math/rand"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/calvinmclean/automated-garden/garden-app/pkg/action"
"github.com/calvinmclean/automated-garden/garden-app/pkg/mqtt"
"github.com/calvinmclean/automated-garden/garden-app/server"
paho "github.com/eclipse/paho.mqtt.golang"
"github.com/go-co-op/gocron"
"github.com/rivo/tview"
)
// Config holds all the options and sub-configs for the mock controller
type Config struct {
MQTTConfig mqtt.Config `mapstructure:"mqtt"`
NestedConfig `mapstructure:"controller"`
LogConfig server.LogConfig `mapstructure:"log"`
}
// NestedConfig is an unfortunate struct that I had to create to have this nested under the 'controller' key
// in the YAML config
type NestedConfig struct {
// Configs used only for running mock controller
EnableUI bool `mapstructure:"enable_ui" survey:"enable_ui"`
MoistureStrategy string `mapstructure:"moisture_strategy" survey:"moisture_strategy"`
MoistureValue int `mapstructure:"moisture_value" survey:"moisture_value"`
PublishWaterEvent bool `mapstructure:"publish_water_event" survey:"publish_water_event"`
TemperatureValue float64 `mapstructure:"temperature_value"`
HumidityValue float64 `mapstructure:"humidity_value"`
TemperatureHumidityDisableNoise bool `mapstructure:"temperature_humidity_disable_noise"`
// Configs used for both
TopicPrefix string `mapstructure:"topic_prefix" survey:"topic_prefix"`
NumZones int `mapstructure:"num_zones" survey:"num_zones"`
MoistureInterval time.Duration `mapstructure:"moisture_interval" survey:"moisture_interval"`
PublishHealth bool `mapstructure:"publish_health" survey:"publish_health"`
HealthInterval time.Duration `mapstructure:"health_interval" survey:"health_interval"`
PublishTemperatureHumidity bool `mapstructure:"publish_temperature_humidity" survey:"publish_temperature_humidity"`
TemperatureHumidityInterval time.Duration `mapstructure:"temperature_humidity_interval" survey:"temperature_humidity_interval"`
// Configs only used for generate-config
WifiConfig `mapstructure:"wifi" survey:"wifi"`
Zones []ZoneConfig `mapstructure:"zones" survey:"zones"`
DefaultWaterTime time.Duration `mapstructure:"default_water_time" survey:"default_water_time"`
EnableButtons bool `mapstructure:"enable_buttons" survey:"enable_buttons"`
EnableMoistureSensor bool `mapstructure:"enable_moisture_sensor" survey:"enable_moisture_sensor"`
LightPin string `mapstructure:"light_pin" survey:"light_pin"`
StopButtonPin string `mapstructure:"stop_water_button" survey:"stop_water_button"`
DisableWatering bool `mapstructure:"disable_watering" survey:"disable_watering"`
TemperatureHumidityPin string `mapstructure:"temperature_humidity_pin" survey:"temperature_humidity_pin"`
MQTTAddress string `survey:"mqtt_address"`
MQTTPort int `survey:"mqtt_port"`
}
// Controller struct holds the necessary data for running the mock garden-controller
type Controller struct {
Config
mqttClient mqtt.Client
app *tview.Application
logger *slog.Logger
pubLogger *slog.Logger
subLogger *slog.Logger
quit chan os.Signal
assertionData
}
// NewController creates and initializes everything needed to run a Controller based on config
func NewController(cfg Config) (*Controller, error) {
controller := &Controller{
Config: cfg,
quit: make(chan os.Signal, 1),
}
controller.logger = cfg.LogConfig.NewLogger()
controller.subLogger = cfg.LogConfig.NewLogger()
controller.pubLogger = cfg.LogConfig.NewLogger()
if controller.EnableUI {
controller.app = controller.setupUI()
}
controller.logger.Info("starting controller", "topic_prefix", controller.TopicPrefix)
if cfg.NumZones > 0 {
controller.pubLogger.Info("publishing moisture data for Zones", "num_zones", cfg.NumZones)
}
topics, err := controller.topics()
if err != nil {
return nil, fmt.Errorf("unable to determine topics: %w", err)
}
controller.logger.Debug("subscribing to topics", "topics", topics)
// Build TopicHandlers to handle subscription to each topic
var handlers []mqtt.TopicHandler
for _, topic := range topics {
controller.subLogger.Info("initializing handler for MQTT messages", "topic", topic)
handlers = append(handlers, mqtt.TopicHandler{
Topic: topic,
Handler: controller.getHandlerForTopic(topic),
})
}
// Override configured ClientID with the TopicPrefix from command flags
controller.MQTTConfig.ClientID = fmt.Sprintf(controller.TopicPrefix)
controller.mqttClient, err = mqtt.NewClient(controller.MQTTConfig, mqtt.DefaultHandler(controller.logger), handlers...)
if err != nil {
return nil, fmt.Errorf("unable to initialize MQTT client: %w", err)
}
if err := controller.mqttClient.Connect(); err != nil {
return nil, fmt.Errorf("unable to connect to MQTT broker: %w", err)
}
return controller, nil
}
// Start will run the Controller until it is stopped (blocking)
func (c *Controller) Start() {
// Initialize scheduler and schedule publishing Jobs
c.logger.Debug("initializing scheduler")
scheduler := gocron.NewScheduler(time.Local)
if c.MoistureInterval != 0 {
for p := 0; p < c.NumZones; p++ {
c.logger.With(
"interval", c.MoistureInterval.String(),
"strategy", c.MoistureStrategy,
).Debug("create scheduled job to publish moisture data")
_, err := scheduler.Every(c.MoistureInterval).Do(c.publishMoistureData, p)
if err != nil {
c.logger.Error("error scheduling moisture publishing", "error", err)
return
}
}
}
if c.PublishHealth {
c.logger.Debug("create scheduled job to publish health data", "interval", c.HealthInterval.String())
_, err := scheduler.Every(c.HealthInterval).Do(c.publishHealthData)
if err != nil {
c.logger.Error("error scheduling health publishing", "error", err)
return
}
}
if c.PublishTemperatureHumidity {
c.logger.Debug("create scheduled job to publish temperature and humidity data", "interval", c.TemperatureHumidityInterval.String())
_, err := scheduler.Every(c.TemperatureHumidityInterval).Do(c.publishTemperatureHumidityData)
if err != nil {
c.logger.Error("error scheduling temperature and humidity publishing", "error", err)
return
}
}
scheduler.StartAsync()
// Shutdown gracefully on Ctrl+C
wg := &sync.WaitGroup{}
wg.Add(1)
signal.Notify(c.quit, os.Interrupt, syscall.SIGTERM)
var shutdownStart time.Time
go func() {
<-c.quit
shutdownStart = time.Now()
c.logger.Info("gracefully shutting down controller")
scheduler.Stop()
// Disconnect mqttClient
c.logger.Info("disconnecting MQTT Client")
c.mqttClient.Disconnect(1000)
wg.Done()
}()
if c.EnableUI {
if err := c.app.Run(); err != nil {
panic(err)
}
} else {
wg.Wait()
}
c.logger.Info("controller shutdown gracefully", "time_elapsed", time.Since(shutdownStart))
}
// Stop shuts down the controller
func (c *Controller) Stop() {
c.quit <- os.Interrupt
}
// setupUI configures the two-column view for publish and subscribe logs
func (c *Controller) setupUI() *tview.Application {
app := tview.NewApplication()
left := tview.NewTextView().
SetTextAlign(tview.AlignLeft).
SetText("Subscribe Logs").
SetDynamicColors(true).
SetChangedFunc(func() { app.Draw() })
right := tview.NewTextView().
SetTextAlign(tview.AlignLeft).
SetText("Publish Logs").
SetDynamicColors(true).
SetChangedFunc(func() { app.Draw() })
c.subLogger = c.Config.LogConfig.NewLoggerWithWriter(tview.ANSIWriter(left))
c.pubLogger = c.Config.LogConfig.NewLoggerWithWriter(tview.ANSIWriter(right))
header := tview.NewTextView().
SetTextAlign(tview.AlignCenter).
SetText(c.TopicPrefix)
tview.ANSIWriter(header).Write([]byte(fmt.Sprintf(
"\n%d Zones\nPublishWaterEvent: %t, PublishHealth: %t, MoistureStrategy: %s",
c.NumZones, c.PublishWaterEvent, c.PublishHealth, c.MoistureStrategy),
))
grid := tview.NewGrid().
SetRows(3, 0).
SetBorders(true)
grid.
AddItem(header, 0, 0, 1, 2, 0, 0, false).
AddItem(left, 1, 0, 1, 1, 0, 100, false).
AddItem(right, 1, 1, 1, 1, 0, 100, false)
tview.ANSIWriter(left).Write([]byte("\n"))
tview.ANSIWriter(right).Write([]byte("\n"))
return app.SetRoot(grid, true)
}
// publishMoistureData publishes an InfluxDB line containing moisture data for a Zone
func (c *Controller) publishMoistureData(zone int) {
moisture := c.createMoistureData()
topic := fmt.Sprintf("%s/data/moisture", c.TopicPrefix)
moistureLogger := c.pubLogger.With(
"topic", topic,
"moisture", moisture,
"zone", zone,
)
moistureLogger.Info("publishing moisture data for Zone")
err := c.mqttClient.Publish(
topic,
[]byte(fmt.Sprintf("moisture,zone=%d value=%d", zone, moisture)),
)
if err != nil {
moistureLogger.Error("unable to publish moisture data", "error", err)
}
}
// publishHealthData publishes an InfluxDB line to record that the controller is alive and active
func (c *Controller) publishHealthData() {
topic := fmt.Sprintf("%s/data/health", c.TopicPrefix)
healthLogger := c.pubLogger.With("topic", topic)
healthLogger.Info("publishing health data")
err := c.mqttClient.Publish(topic, []byte(fmt.Sprintf("health garden=\"%s\"", c.TopicPrefix)))
if err != nil {
healthLogger.Error("unable to publish health data", "error", err)
}
}
func (c *Controller) publishTemperatureHumidityData() {
temperatureTopic := fmt.Sprintf("%s/data/temperature", c.TopicPrefix)
humidityTopic := fmt.Sprintf("%s/data/humidity", c.TopicPrefix)
temperature := c.TemperatureValue
humidity := c.HumidityValue
if !c.TemperatureHumidityDisableNoise {
temperature = addNoise(temperature, 3)
humidity = addNoise(humidity, 3)
}
logger := c.pubLogger.With(
"temperature", temperature,
"humidity", humidity,
)
logger.Info("publishing temperature and humidity data")
err := c.mqttClient.Publish(temperatureTopic, []byte(fmt.Sprintf("temperature value=%f", temperature)))
if err != nil {
logger.Error("unable to publish temperature data", "error", err)
}
err = c.mqttClient.Publish(humidityTopic, []byte(fmt.Sprintf("humidity value=%f", humidity)))
if err != nil {
logger.Error("unable to publish humidity data", "error", err)
}
}
// addNoise will take a base value and introduce some += variance based on the provided percentage range. This will
// produce sensor data that is relatively consistent but not totally flat
func addNoise(baseValue float64, percentRange float64) float64 {
// nolint:gosec
diff := percentRange - (rand.Float64() * percentRange * 2)
return baseValue + diff
}
// createMoistureData uses the MoistureStrategy config to create a moisture data point
func (c *Controller) createMoistureData() int {
switch c.MoistureStrategy {
case "random":
// nolint:gosec
return rand.Intn(c.MoistureValue)
case "constant":
return c.MoistureValue
case "increasing":
c.MoistureValue++
if c.MoistureValue > 100 {
c.MoistureValue = 0
}
return c.MoistureValue
case "decreasing":
c.MoistureValue--
if c.MoistureValue < 0 {
c.MoistureValue = 100
}
return c.MoistureValue
default:
return 0
}
}
// publishWaterEvent logs moisture data to InfluxDB via Telegraf and MQTT
func (c *Controller) publishWaterEvent(waterMsg action.WaterMessage, cmdTopic string) {
if !c.PublishWaterEvent {
c.pubLogger.Debug("publishing water events is disabled")
return
}
// Incoming topic is "{{.TopicPrefix}}/command/water" but we need to publish on "{{.TopicPrefix}}/data/water"
dataTopic := strings.ReplaceAll(cmdTopic, "command", "data")
waterEventLogger := c.pubLogger.With(
"topic", dataTopic,
"zone_position", waterMsg.Position,
"duration", waterMsg.Duration,
)
waterEventLogger.Info("publishing watering event for Zone")
err := c.mqttClient.Publish(
dataTopic,
[]byte(fmt.Sprintf("water,zone=%d millis=%d", waterMsg.Position, waterMsg.Duration)),
)
if err != nil {
waterEventLogger.Error("unable to publish watering event", "error", err)
}
}
// getHandlerForTopic provides a different MessageHandler function for each of the expected
// topics to be able to handle them in different ways
func (c *Controller) getHandlerForTopic(topic string) paho.MessageHandler {
switch t := strings.Split(topic, "/")[2]; t {
case "water":
return c.waterHandler(topic)
case "stop":
return c.stopHandler(topic)
case "stop_all":
return c.stopAllHandler(topic)
case "light":
return c.lightHandler(topic)
default:
return paho.MessageHandler(func(_ paho.Client, msg paho.Message) {
c.subLogger.With(
"topic", msg.Topic(),
"message", string(msg.Payload()),
).Info("received message on unexpected topic")
})
}
}
// topics returns a list of topics based on the Config values and provided TopicPrefix
func (c *Controller) topics() ([]string, error) {
topics := []string{}
templateFuncs := []func(string) (string, error){
c.MQTTConfig.WaterTopic,
c.MQTTConfig.StopTopic,
c.MQTTConfig.StopAllTopic,
c.MQTTConfig.LightTopic,
}
for _, templateFunc := range templateFuncs {
topic, err := templateFunc(c.TopicPrefix)
if err != nil {
return topics, err
}
topics = append(topics, topic)
}
return topics, nil
}