This repository has been archived by the owner on Dec 14, 2021. It is now read-only.
/
events.go
81 lines (70 loc) · 3.68 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// Copyright © 2017 The Things Network
// Use of this source code is governed by the MIT license that can be found in the LICENSE file.
package mqtt
import (
"encoding/json"
"fmt"
"github.com/TheThingsNetwork/ttn/core/types"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
// AppEventHandler is called for events
type AppEventHandler func(client Client, appID string, eventType types.EventType, payload []byte)
// DeviceEventHandler is called for events
type DeviceEventHandler func(client Client, appID string, devID string, eventType types.EventType, payload []byte)
// PublishAppEvent publishes an event to the topic for application events of the given type
// it will marshal the payload to json
func (c *DefaultClient) PublishAppEvent(appID string, eventType types.EventType, payload interface{}) Token {
topic := ApplicationTopic{appID, AppEvents, string(eventType)}
msg, err := json.Marshal(payload)
if err != nil {
return &simpleToken{fmt.Errorf("Unable to marshal the message payload: %s", err)}
}
return c.publish(topic.String(), msg)
}
// PublishDeviceEvent publishes an event to the topic for device events of the given type
// it will marshal the payload to json
func (c *DefaultClient) PublishDeviceEvent(appID string, devID string, eventType types.EventType, payload interface{}) Token {
topic := DeviceTopic{appID, devID, DeviceEvents, string(eventType)}
msg, err := json.Marshal(payload)
if err != nil {
return &simpleToken{fmt.Errorf("Unable to marshal the message payload: %s", err)}
}
return c.publish(topic.String(), msg)
}
// SubscribeAppEvents subscribes to events of the given type for the given application. In order to subscribe to
// application events from all applications the user has access to, pass an empty string as appID.
func (c *DefaultClient) SubscribeAppEvents(appID string, eventType types.EventType, handler AppEventHandler) Token {
topic := ApplicationTopic{appID, AppEvents, string(eventType)}
return c.subscribe(topic.String(), func(mqtt MQTT.Client, msg MQTT.Message) {
topic, err := ParseApplicationTopic(msg.Topic())
if err != nil {
c.ctx.Warnf("mqtt: received message on invalid events topic: %s", msg.Topic())
return
}
handler(c, topic.AppID, types.EventType(topic.Field), msg.Payload())
})
}
// SubscribeDeviceEvents subscribes to events of the given type for the given device. In order to subscribe to
// events from all devices within an application, pass an empty string as devID. In order to subscribe to all
// events from all devices in all applications the user has access to, pass an empty string as appID.
func (c *DefaultClient) SubscribeDeviceEvents(appID string, devID string, eventType types.EventType, handler DeviceEventHandler) Token {
topic := DeviceTopic{appID, devID, DeviceEvents, string(eventType)}
return c.subscribe(topic.String(), func(mqtt MQTT.Client, msg MQTT.Message) {
topic, err := ParseDeviceTopic(msg.Topic())
if err != nil {
c.ctx.Warnf("mqtt: received message on invalid events topic: %s", msg.Topic())
return
}
handler(c, topic.AppID, topic.DevID, types.EventType(topic.Field), msg.Payload())
})
}
// UnsubscribeAppEvents unsubscribes from the events that were subscribed to by SubscribeAppEvents
func (c *DefaultClient) UnsubscribeAppEvents(appID string, eventType types.EventType) Token {
topic := ApplicationTopic{appID, AppEvents, string(eventType)}
return c.unsubscribe(topic.String())
}
// UnsubscribeDeviceEvents unsubscribes from the events that were subscribed to by SubscribeDeviceEvents
func (c *DefaultClient) UnsubscribeDeviceEvents(appID string, devID string, eventType types.EventType) Token {
topic := DeviceTopic{appID, devID, DeviceEvents, string(eventType)}
return c.unsubscribe(topic.String())
}