/
mqtt.go
120 lines (95 loc) · 2.83 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package mqtt
import (
"context"
"crypto/tls"
"fmt"
"log/slog"
"os"
"strconv"
"time"
"github.com/diwise/service-chassis/pkg/infrastructure/o11y/logging"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"
)
type Client interface {
Start() error
Stop()
}
type Config struct {
enabled bool
host string
keepAlive int64
user string
password string
topics []string
}
func NewClient(ctx context.Context, cfg Config, forwardingEndpoint string) (Client, error) {
options := mqtt.NewClientOptions()
connectionString := fmt.Sprintf("ssl://%s:8883", cfg.host)
options.AddBroker(connectionString)
options.SetUsername(cfg.user)
options.SetPassword(cfg.password)
options.SetClientID("diwise/iot-agent/" + uuid.NewString())
options.SetDefaultPublishHandler(NewMessageHandler(ctx, forwardingEndpoint))
options.SetKeepAlive(time.Duration(cfg.keepAlive) * time.Second)
log := logging.GetFromContext(ctx).With(slog.String("mqtt-host", cfg.host))
options.OnConnect = func(mc mqtt.Client) {
log.Info("connected")
for _, topic := range cfg.topics {
log.Info("subscribing to topic", "topic", topic)
token := mc.Subscribe(topic, 0, nil)
token.Wait()
}
}
options.OnConnectionLost = func(mc mqtt.Client, err error) {
log.Error("connection lost", "err", err.Error())
os.Exit(1)
}
options.TLSConfig = &tls.Config{
InsecureSkipVerify: true,
}
return &mqttClient{
cfg: cfg,
log: log,
options: options,
}, nil
}
func NewConfigFromEnvironment(prefix string) (Config, error) {
const topicEnvNamePattern string = "%sMQTT_TOPIC_%d"
cfg := Config{
enabled: os.Getenv(fmt.Sprintf("%sMQTT_DISABLED", prefix)) != "true",
host: os.Getenv(fmt.Sprintf("%sMQTT_HOST", prefix)),
keepAlive: 30,
user: os.Getenv(fmt.Sprintf("%sMQTT_USER", prefix)),
password: os.Getenv(fmt.Sprintf("%sMQTT_PASSWORD", prefix)),
topics: []string{
os.Getenv(fmt.Sprintf(topicEnvNamePattern, prefix, 0)),
},
}
if !cfg.enabled {
return cfg, nil
}
if cfg.host == "" {
return cfg, fmt.Errorf("the mqtt host must be specified using the %sMQTT_HOST environment variable", prefix)
}
if cfg.topics[0] == "" {
return cfg, fmt.Errorf("at least one topic (%sMQTT_TOPIC_0) must be added to the configuration", prefix)
}
customKeepAlive := os.Getenv(fmt.Sprintf("%sMQTT_KEEPALIVE", prefix))
if customKeepAlive != "" {
keepAlive, err := strconv.ParseInt(customKeepAlive, 10, 64)
if err != nil {
return cfg, fmt.Errorf("custom keepalive value %s is not parseable to an int (%s)", customKeepAlive, err.Error())
}
cfg.keepAlive = keepAlive
}
const maxTopicCount int = 10
for idx := 1; idx < maxTopicCount; idx++ {
varName := fmt.Sprintf(topicEnvNamePattern, prefix, idx)
value := os.Getenv(varName)
if value != "" {
cfg.topics = append(cfg.topics, value)
}
}
return cfg, nil
}