/
weaviate_broker.go
66 lines (55 loc) · 1.93 KB
/
weaviate_broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/* _ _
*__ _____ __ ___ ___ __ _| |_ ___
*\ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
* \ V V / __/ (_| |\ V /| | (_| | || __/
* \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
*
* Copyright © 2016 - 2019 Weaviate. All rights reserved.
* LICENSE: https://github.com/creativesoftwarefdn/weaviate/blob/develop/LICENSE.md
* DESIGN & CONCEPT: Bob van Luijt (@bobvanluijt)
* CONTACT: hello@creativesoftwarefdn.org
*/
// Package restapi with all rest API functions.
package weaviateBroker
import (
"github.com/creativesoftwarefdn/weaviate/messages"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
var MqttEnabled bool
var MqttClient MQTT.Client
var messaging *messages.Messaging
// ConnectToMqtt connects to Weaviate Mqtt Broker
func ConnectToMqtt(Host string, Port int32) {
// Validate if host is set, if not, don't connect
if Host != "" {
messaging.InfoMessage("Connecting to MQTT broker...")
opts := MQTT.NewClientOptions()
opts.AddBroker("tcp://127.0.0.1:1883")
opts.SetUsername("b9b908ee-0039-4f6a-a2c0-b51ebbd982f1")
opts.SetPassword("05d64e63-e464-40ac-8e55-4ccedb0a60f7")
opts.SetClientID("WEAVIATE-SERVER")
// Connect client
MqttClient = MQTT.NewClient(opts)
if token := MqttClient.Connect(); token.Wait() && token.Error() != nil {
messaging.ErrorMessage("Could not connect to MQTT broker. Server needs to be restarted to reconnect to broker.")
messaging.ErrorMessage(token.Error())
} else {
messaging.InfoMessage("Connected to MQTT broker.")
}
// Mqtt enabled
MqttEnabled = true
} else {
// Mqtt disabled
MqttEnabled = false
}
}
// Publish a message to the right channel
func Publish(channel string, message string) {
if MqttEnabled == true {
if token := MqttClient.Publish(channel, 0, false, message); token.Wait() && token.Error() != nil {
messaging.ErrorMessage(token.Error())
} else {
messaging.DebugMessage("Published message to MQTT broker.")
}
}
}