forked from evcc-io/evcc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqtt.go
75 lines (59 loc) · 1.27 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package tasks
import (
"errors"
"net"
"strconv"
"time"
"github.com/connectorjs/evcm/util"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
const Mqtt TaskType = "mqtt"
func init() {
registry.Add(Mqtt, MqttHandlerFactory)
}
func MqttHandlerFactory(conf map[string]interface{}) (TaskHandler, error) {
handler := MqttHandler{
Port: 1883,
Timeout: timeout,
}
err := util.DecodeOther(conf, &handler)
if err == nil && handler.Port == 0 {
err = errors.New("missing port")
}
return &handler, err
}
type MqttHandler struct {
Port int
Topic string
Timeout time.Duration
}
func (h *MqttHandler) Test(log *util.Logger, in ResultDetails) []ResultDetails {
addr := net.JoinHostPort(in.IP, strconv.Itoa(h.Port))
opt := mqtt.NewClientOptions()
opt.AddBroker(addr)
opt.SetConnectTimeout(timeout)
client := mqtt.NewClient(opt)
var ok bool
token := client.Connect()
if token.Wait() {
ok = token.Error() == nil
}
if ok && h.Topic != "" {
recv := make(chan bool, 1)
_ = client.Subscribe(h.Topic, 1, func(mqtt.Client, mqtt.Message) {
recv <- true
})
timer := time.NewTimer(timeout)
for {
select {
case <-recv:
out := in.Clone()
out.Topic = h.Topic
return []ResultDetails{out}
case <-timer.C:
return nil
}
}
}
return nil
}