/
utils.go
145 lines (128 loc) · 2.81 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package client
import (
"strings"
"github.com/alfrunes/mqttie/packets"
)
type subMap map[string]interface{}
func (s subMap) Add(topic string, c chan<- []byte) bool {
i := strings.Index(topic, "+")
if i < 0 {
s[topic] = c
return true
}
if m, ok := s[topic[:i+1]].(subMap); ok {
return m.Add(topic[i+1:], c)
}
m := make(subMap)
if m.Add(topic[i+1:], c) {
s[topic[:i+1]] = m
return true
}
return false
}
func (s subMap) Get(topic string) chan<- []byte {
if c, ok := s[topic].(chan<- []byte); ok {
return c
}
var i, j int
for {
// Check multi-level wildcard (highest precedence)
if c, ok := s[topic[:i]+"#"].(chan<- []byte); ok {
return c
}
if tmp, ok := s[topic[:i]+"+"].(subMap); ok {
// Carve out and replace scope with wildcard
// and recurse onward.
j = strings.Index(topic[i:], "/")
if c := tmp.Get(topic[i+j:]); c != nil {
return c
}
}
// Advance index
j = strings.Index(topic[i:], "/")
if j < 0 {
break
}
i += j + 1
}
return nil
}
func (s subMap) Del(topic string) {
i := strings.Index(topic, "+")
if i == -1 {
delete(s, topic)
return
}
if m, ok := s[topic[:i+1]].(subMap); ok {
if len(m) <= 1 {
delete(s, topic[:i+1])
} else {
m.Del(topic[i+1:])
}
}
}
type packetMap struct {
packets map[uint16]packets.Packet
mutex chan struct{}
}
func newPacketMap() *packetMap {
return &packetMap{
packets: make(map[uint16]packets.Packet),
mutex: make(chan struct{}, 1),
}
}
func (p *packetMap) Add(packetID uint16, packet packets.Packet) bool {
p.mutex <- struct{}{}
defer func() { <-p.mutex }()
if _, ok := p.packets[packetID]; ok {
return false
}
p.packets[packetID] = packet
return true
}
func (p *packetMap) Set(packetID uint16, packet packets.Packet) {
p.mutex <- struct{}{}
p.packets[packetID] = packet
<-p.mutex
}
func (p *packetMap) Get(packetID uint16) (packets.Packet, bool) {
p.mutex <- struct{}{}
defer func() { <-p.mutex }()
packet, ok := p.packets[packetID]
return packet, ok
}
func (p *packetMap) Del(packetID uint16) {
p.mutex <- struct{}{}
delete(p.packets, packetID)
<-p.mutex
}
type packetChanMap struct {
chans map[uint16]chan packets.Packet
mutex chan struct{}
}
func newPacketChanMap() *packetChanMap {
return &packetChanMap{
chans: make(map[uint16]chan packets.Packet),
mutex: make(chan struct{}, 1),
}
}
func (p *packetChanMap) New(packetID uint16) bool {
p.mutex <- struct{}{}
defer func() { <-p.mutex }()
if _, ok := p.chans[packetID]; ok {
return false
}
p.chans[packetID] = make(chan packets.Packet, 1)
return true
}
func (p *packetChanMap) Get(packetID uint16) (chan packets.Packet, bool) {
p.mutex <- struct{}{}
defer func() { <-p.mutex }()
c, ok := p.chans[packetID]
return c, ok
}
func (p *packetChanMap) Del(packetID uint16) {
p.mutex <- struct{}{}
delete(p.chans, packetID)
<-p.mutex
}