-
Notifications
You must be signed in to change notification settings - Fork 318
/
packetids.go
87 lines (78 loc) · 1.48 KB
/
packetids.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package common
import (
"sync"
"github.com/256dpi/gomqtt/packet"
)
// AckV2 acknowledge interface
type AckV2 interface {
Ack()
SID() uint64
}
// PacketIDS generates packet id by sequence id for message
type PacketIDS struct {
min packet.ID
max packet.ID
index map[packet.ID]AckV2
reindex map[uint64]packet.ID
sync.RWMutex
}
// NewPacketIDS creates a new PacketIDS
func NewPacketIDS() *PacketIDS {
return &PacketIDS{min: 1, max: 65535, index: make(map[packet.ID]AckV2), reindex: make(map[uint64]packet.ID)}
}
// Ack acknowledges by packet id
func (p *PacketIDS) Ack(id packet.ID) bool {
p.Lock()
ack, ok := p.index[id]
if ok {
delete(p.index, id)
delete(p.reindex, ack.SID())
ack.Ack()
}
p.Unlock()
return ok
}
// Set set acknowledge with a new packet id from sequence id
func (p *PacketIDS) Set(ack AckV2) packet.ID {
p.Lock()
reindex := false
id := packet.ID(ack.SID())
for {
if id < p.min || id > p.max {
id = p.max
reindex = true
}
if _, ok := p.index[id]; !ok {
break
}
id--
reindex = true
}
if reindex {
p.reindex[ack.SID()] = id
}
p.index[id] = ack
p.Unlock()
return id
}
// Get get packet id
func (p *PacketIDS) Get(sid uint64) (pid packet.ID) {
p.Lock()
defer p.Unlock()
if v, ok := p.reindex[sid]; ok {
pid = v
} else {
pid = packet.ID(sid)
}
if _, ok := p.index[pid]; !ok {
pid = 0
}
return
}
// Size returns the size of index
func (p *PacketIDS) Size() (i int) {
p.Lock()
i = len(p.index)
p.Unlock()
return
}