forked from eclipse-ditto/ditto-clients-golang
-
Notifications
You must be signed in to change notification settings - Fork 1
/
client_internal.go
111 lines (96 loc) · 2.81 KB
/
client_internal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// Copyright (c) 2020 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0
//
// SPDX-License-Identifier: EPL-2.0
package ditto
import (
"encoding/json"
"errors"
"github.com/eclipse/ditto-clients-golang/protocol"
"sync"
"time"
//import the Paho Go MQTT library
MQTT "github.com/eclipse/paho.mqtt.golang"
)
const (
honoMQTTTopicSubscribeCommands = "command///req/#"
honoMQTTTopicPublishTelemetry = "t"
honoMQTTTopicPublishEvents = "e"
)
func (client *Client) clientConnectHandler(pahoClient MQTT.Client) {
client.wgConnectHandler.Add(1)
token := client.pahoClient.Subscribe(honoMQTTTopicSubscribeCommands, 1, client.honoMessageHandler)
var err error
if token.WaitTimeout(client.cfg.subscribeTimeout) {
err = token.Error()
} else {
err = ErrSubscribeTimeout
}
if err != nil {
ERROR.Printf("error subscribing to root Hono topic %s : %v", honoMQTTTopicSubscribeCommands, err)
}
client.notifyClientConnected()
}
func (client *Client) notifyClientConnected() {
defer client.wgConnectHandler.Done()
if client.cfg == nil {
return
}
notifyChan := make(chan error, 1)
var notifyOnce sync.Once
go func() {
notifyOnce.Do(func() {
if client.cfg.connectHandler != nil {
client.cfg.connectHandler(client)
}
})
notifyChan <- nil
}()
select {
case <-notifyChan:
DEBUG.Println("notified for client initialization successfully")
case <-time.After(60 * time.Second):
ERROR.Printf("%v", errors.New("timed out waiting for initialization notification to be handled"))
}
}
func (client *Client) clientConnectionLostHandler(pahoClient MQTT.Client, err error) {
client.notifyClientConnectionLost(err)
}
func (client *Client) notifyClientConnectionLost(err error) {
if client.cfg == nil {
return
}
notifyChan := make(chan error, 1)
var notifyOnce sync.Once
go func() {
notifyOnce.Do(func() {
if client.cfg.connectionLostHandler != nil {
client.cfg.connectionLostHandler(client, err)
}
})
notifyChan <- nil
}()
select {
case <-notifyChan:
DEBUG.Println("notified for client connection lost successfully")
case <-time.After(60 * time.Second):
ERROR.Printf("%v", errors.New("timed out waiting for connection lost notification to be handled"))
}
}
func (client *Client) publish(topic string, message *protocol.Envelope, qos byte, retained bool) error {
payload, err := json.Marshal(message)
if err != nil {
return err
}
token := client.pahoClient.Publish(topic, qos, retained, payload)
if !token.WaitTimeout(client.cfg.acknowledgeTimeout) {
return ErrAcknowledgeTimeout
}
return token.Error()
}