/
net_mqtt_subscribe.go
114 lines (106 loc) · 2.48 KB
/
net_mqtt_subscribe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package elem
import (
"crypto/tls"
"github.com/Bitspark/slang/pkg/core"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"
)
var netMQTTSubscribeCfg = &builtinConfig{
safe: true,
blueprint: core.Blueprint{
Id: uuid.MustParse("fd51e295-3483-4558-9b26-8c16d579c4ef"),
Meta: core.BlueprintMetaDef{
Name: "MQTT subscribe",
ShortDescription: "subscribes at a given topic, behaves like an MQTT client",
Icon: "chart-network",
Tags: []string{"network", "mqtt"},
DocURL: "https://bitspark.de/slang/docs/operator/mqtt-subscribe",
},
ServiceDefs: map[string]*core.ServiceDef{
core.MAIN_SERVICE: {
In: core.TypeDef{
Type: "trigger",
},
Out: core.TypeDef{
Type: "stream",
Stream: &core.TypeDef{
Type: "map",
Map: map[string]*core.TypeDef{
"messageId": {
Type: "number",
},
"payload": {
Type: "binary",
},
"topic": {
Type: "string",
},
},
},
},
},
},
PropertyDefs: core.TypeDefMap{
"broker": {
Type: "string",
},
"username": {
Type: "string",
},
"password": {
Type: "string",
},
"topic": {
Type: "string",
},
"verifyCertificate": {
Type: "boolean",
Optional: true,
},
"clientCertificate": {
Type: "string",
Optional: true,
},
"clientKey": {
Type: "string",
Optional: true,
},
"caCertificate": {
Type: "string",
Optional: true,
},
},
},
opFunc: func(op *core.Operator) {
options := mqtt.NewClientOptions().
AddBroker(op.Property("broker").(string)).
SetUsername(op.Property("username").(string)).
SetPassword(op.Property("password").(string)).
SetTLSConfig(&tls.Config{
ClientAuth: tls.NoClientCert,
InsecureSkipVerify: true,
})
topic := op.Property("topic").(string)
client := mqtt.NewClient(options)
token := client.Connect().(*mqtt.ConnectToken)
token.Wait()
if token.Error() != nil {
panic(token.Error())
}
in := op.Main().In()
out := op.Main().Out()
for !op.CheckStop() {
in.Pull()
out.PushBOS()
outStream := out.Stream()
client.Subscribe(topic, 2, func(client mqtt.Client, message mqtt.Message) {
outStream.Map("messageId").Push(float64(message.MessageID()))
outStream.Map("payload").Push(core.Binary(message.Payload()))
outStream.Map("topic").Push(message.Topic())
})
op.WaitForStop()
out.PushEOS()
break
}
},
}