forked from VolantMQ/volantmq
/
types.go
114 lines (89 loc) · 2.32 KB
/
types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package systree
import (
"strconv"
"sync/atomic"
"time"
"github.com/VolantMQ/vlapi/mqttp"
)
// DynamicValue interface describes states of the dynamic value
type DynamicValue interface {
Topic() string
// Retained used by topics provider to get retained message when there is new subscription to given topic
Retained() *mqttp.Publish
// Publish used by systree update routine to publish new value when on periodic basis
Publish() *mqttp.Publish
}
type dynamicValue struct {
topic string
retained *mqttp.Publish
publish *mqttp.Publish
getValue func() []byte
}
type dynamicValueInteger struct {
val uint64
dynamicValue
}
type dynamicValueUpTime struct {
dynamicValue
startTime time.Time
}
type dynamicValueCurrentTime struct {
dynamicValue
}
func newDynamicValueInteger(topic string) *dynamicValueInteger {
v := &dynamicValueInteger{}
v.topic = topic
v.getValue = v.get
return v
}
func newDynamicValueUpTime(topic string) *dynamicValueUpTime {
v := &dynamicValueUpTime{
startTime: time.Now(),
}
v.topic = topic
v.getValue = v.get
return v
}
func newDynamicValueCurrentTime(topic string) *dynamicValueCurrentTime {
v := &dynamicValueCurrentTime{}
v.topic = topic
v.getValue = v.get
return v
}
func (v *dynamicValueInteger) get() []byte {
val := strconv.FormatUint(atomic.LoadUint64(&v.val), 10)
return []byte(val)
}
func (v *dynamicValueUpTime) get() []byte {
diff := time.Since(v.startTime)
return []byte(diff.String())
}
func (v *dynamicValueCurrentTime) get() []byte {
val := time.Now().Format(time.RFC3339)
return []byte(val)
}
func (m *dynamicValue) Topic() string {
return m.topic
}
func (m *dynamicValue) Retained() *mqttp.Publish {
if m.retained == nil {
np, _ := mqttp.New(mqttp.ProtocolV311, mqttp.PUBLISH)
m.retained, _ = np.(*mqttp.Publish)
m.retained.SetTopic(m.topic) // nolint: errcheck
m.retained.SetQoS(mqttp.QoS0) // nolint: errcheck
m.retained.SetRetain(true)
}
m.retained.SetPayload(m.getValue())
return m.retained
}
func (m *dynamicValue) Publish() *mqttp.Publish {
if m.publish == nil {
np, _ := mqttp.New(mqttp.ProtocolV311, mqttp.PUBLISH)
m.publish, _ = np.(*mqttp.Publish)
m.publish.SetTopic(m.topic) // nolint: errcheck
m.publish.SetQoS(mqttp.QoS0) // nolint: errcheck
m.publish.SetRetain(true)
}
m.publish.SetPayload(m.getValue())
return m.publish
}