/
net_mqtt_publish.go
106 lines (99 loc) · 2.28 KB
/
net_mqtt_publish.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package elem
import (
"crypto/tls"
"github.com/Bitspark/slang/pkg/core"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"
)
var netMQTTPublishCfg = &builtinConfig{
safe: true,
blueprint: core.Blueprint{
Id: uuid.MustParse("c6b5bef6-e93e-4bc1-8ded-49c90919f39d"),
Meta: core.BlueprintMetaDef{
Name: "MQTT publish",
ShortDescription: "publishes an MQTT message at a given topic",
Icon: "chart-network",
Tags: []string{"network", "mqtt"},
DocURL: "https://bitspark.de/slang/docs/operator/mqtt-publish",
},
ServiceDefs: map[string]*core.ServiceDef{
core.MAIN_SERVICE: {
In: core.TypeDef{
Type: "map",
Map: map[string]*core.TypeDef{
"topic": {
Type: "string",
},
"payload": {
Type: "binary",
},
},
},
Out: core.TypeDef{
Type: "number",
},
},
},
PropertyDefs: map[string]*core.TypeDef{
"broker": {
Type: "string",
},
"username": {
Type: "string",
},
"password": {
Type: "string",
},
"verifyCertificate": {
Type: "boolean",
Optional: true,
},
"clientCertificate": {
Type: "string",
Optional: true,
},
"clientKey": {
Type: "string",
Optional: true,
},
"caCertificate": {
Type: "string",
Optional: true,
},
// "clientId": {
// Type: "string",
// },
},
},
opFunc: func(op *core.Operator) {
options := mqtt.NewClientOptions().
AddBroker(op.Property("broker").(string)).
SetUsername(op.Property("username").(string)).
SetPassword(op.Property("password").(string)).
SetTLSConfig(&tls.Config{
ClientAuth: tls.NoClientCert,
InsecureSkipVerify: true,
})
client := mqtt.NewClient(options)
token := client.Connect().(*mqtt.ConnectToken)
token.Wait()
if token.Error() != nil {
panic(token.Error())
}
in := op.Main().In()
out := op.Main().Out()
for !op.CheckStop() {
i := in.Pull()
if core.IsMarker(i) {
out.Push(i)
continue
}
im := i.(map[string]interface{})
topic := im["topic"].(string)
payload := im["payload"].(core.Binary)
token := client.Publish(topic, 2, false, []byte(payload)).(*mqtt.PublishToken)
token.Wait()
out.Push(float64(token.MessageID()))
}
},
}