forked from pion/webrtc
/
reassembly_queue.go
114 lines (90 loc) · 2.41 KB
/
reassembly_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package sctp
import "sort"
type dataChannelMessageArray []*dataChannelMessage
func (s dataChannelMessageArray) search(seqNum uint16) (*dataChannelMessage, bool) {
i := sort.Search(len(s), func(i int) bool {
return s[i].seqNum >= seqNum
})
if i < len(s) && s[i].seqNum == seqNum {
return s[i], true
}
return nil, false
}
func (s dataChannelMessageArray) sort() {
sort.Slice(s, func(i, j int) bool { return s[i].seqNum < s[j].seqNum })
}
type dataChannelMessage struct {
seqNum uint16
fragmentQueue []*chunkPayloadData
length int
}
func (m *dataChannelMessage) complete() bool {
if len(m.fragmentQueue) == 0 {
// this should be impossible
return false
}
firstPacket := m.fragmentQueue[0]
if len(m.fragmentQueue) == 1 {
return firstPacket.beginingFragment && firstPacket.endingFragment
}
lastPacket := m.fragmentQueue[len(m.fragmentQueue)-1]
return firstPacket.beginingFragment && lastPacket.endingFragment
}
func (m *dataChannelMessage) clear() {
m.length = 0
m.fragmentQueue = []*chunkPayloadData{}
}
func (m *dataChannelMessage) assemble() ([]byte, bool) {
if m.complete() {
b := make([]byte, m.length)
i := 0
for _, p := range m.fragmentQueue {
copy(b[i:], p.userData)
i += len(p.userData)
}
return b, true
}
return nil, false
}
type reassemblyQueue struct {
messageQueue dataChannelMessageArray
unorderedMessage dataChannelMessage
expectedSeqNum uint16
}
func (r *reassemblyQueue) push(p *chunkPayloadData) {
if p.unordered {
r.unorderedMessage.fragmentQueue = append(r.unorderedMessage.fragmentQueue, p)
r.unorderedMessage.length += len(p.userData)
return
}
m, ok := r.messageQueue.search(p.streamSequenceNumber)
if !ok {
m = &dataChannelMessage{seqNum: p.streamSequenceNumber}
r.messageQueue = append(r.messageQueue, m)
r.messageQueue.sort()
}
m.fragmentQueue = append(m.fragmentQueue, p)
m.length += len(p.userData)
}
func (r *reassemblyQueue) pop() ([]byte, bool) {
b, ok := r.unorderedMessage.assemble()
if ok {
r.unorderedMessage.clear()
return b, true
}
// Is there any chance that if the message was in the queue, it wouldn't be
// the first message in the queue?
if len(r.messageQueue) > 0 {
m := r.messageQueue[0]
// Most likely to be true
if m.seqNum == r.expectedSeqNum {
b, ok := m.assemble()
if ok {
r.messageQueue = r.messageQueue[1:]
r.expectedSeqNum++
return b, true
}
}
}
return nil, false
}