forked from q191201771/lal
-
Notifications
You must be signed in to change notification settings - Fork 0
/
chunk_divider.go
153 lines (130 loc) · 3.93 KB
/
chunk_divider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// Copyright 2019, Chef. All rights reserved.
// https://github.com/forkiss/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtmp
// 将message切割成chunk
import (
"github.com/forkiss/lal/pkg/base"
"github.com/forkiss/naza/pkg/bele"
)
type ChunkDivider struct {
localChunkSize int
}
var defaultChunkDivider = ChunkDivider{
localChunkSize: LocalChunkSize,
}
// Message2Chunks @return 返回的内存块由内部申请,不依赖参数<message>内存块
func Message2Chunks(message []byte, header *base.RtmpHeader) []byte {
return defaultChunkDivider.Message2Chunks(message, header)
}
// Message2Chunks TODO chef: 新的 message 的第一个 chunk 始终使用 fmt0 格式,没有参考前一个 message
func (d *ChunkDivider) Message2Chunks(message []byte, header *base.RtmpHeader) []byte {
return message2Chunks(message, header, nil, d.localChunkSize)
}
// @param 返回头的大小
func calcHeader(header *base.RtmpHeader, prevHeader *base.RtmpHeader, out []byte) int {
var index int
// 计算fmt和timestamp
fmt := uint8(0)
var timestamp uint32
if prevHeader == nil {
timestamp = header.TimestampAbs
} else {
if header.MsgStreamId == prevHeader.MsgStreamId {
fmt++
if header.MsgLen == prevHeader.MsgLen && header.MsgTypeId == prevHeader.MsgTypeId {
fmt++
if header.TimestampAbs == prevHeader.TimestampAbs {
fmt++
}
}
if header.TimestampAbs > maxTimestampInMessageHeader {
// 将数据打包成rtmp chunk发送给vlc,时间戳超过3字节最大范围时,
// vlc认为fmt0和fmt3两种格式,都需要携带扩展时间戳字段,并且该时间戳字段必须使用绝对时间戳。
timestamp = header.TimestampAbs
} else {
timestamp = header.TimestampAbs - prevHeader.TimestampAbs
}
} else {
timestamp = header.TimestampAbs
}
}
// 设置fmt
out[index] = fmt << 6
// 设置csid
if header.Csid >= 2 && header.Csid <= 63 {
out[index] |= uint8(header.Csid)
index++
} else if header.Csid >= 64 && header.Csid <= 319 {
// value 0
index++
out[index] = uint8(header.Csid - 64)
index++
} else {
out[index] |= 1
index++
out[index] = uint8(header.Csid - 64)
index++
out[index] = uint8((header.Csid - 64) >> 8)
index++
}
// 设置timestamp msgLen msgTypeId msgStreamId
if fmt <= 2 {
if timestamp > maxTimestampInMessageHeader {
bele.BePutUint24(out[index:], maxTimestampInMessageHeader)
} else {
bele.BePutUint24(out[index:], timestamp)
}
index += 3
if fmt <= 1 {
bele.BePutUint24(out[index:], header.MsgLen)
index += 3
out[index] = header.MsgTypeId
index++
if fmt == 0 {
bele.LePutUint32(out[index:], uint32(header.MsgStreamId))
index += 4
}
}
}
// 设置扩展时间戳
if timestamp > maxTimestampInMessageHeader {
bele.BePutUint32(out[index:], timestamp)
index += 4
}
return index
}
func message2Chunks(message []byte, header *base.RtmpHeader, prevHeader *base.RtmpHeader, chunkSize int) []byte {
//if header.Csid < minCsid || header.Csid > maxCsid {
// return nil, ErrRtmp
//}
// 计算chunk数量,最后一个chunk的大小
numOfChunk := len(message) / chunkSize
lastChunkSize := chunkSize
if len(message)%chunkSize != 0 {
numOfChunk++
lastChunkSize = len(message) % chunkSize
}
maxNeededLen := (chunkSize + maxHeaderSize) * numOfChunk
out := make([]byte, maxNeededLen)
var index int
// NOTICE 和srs交互时,发现srs要求message中的非第一个chunk不能使用fmt0
// 将message切割成chunk放入chunk body中
for i := 0; i < numOfChunk; i++ {
headLen := calcHeader(header, prevHeader, out[index:])
index += headLen
if i != numOfChunk-1 {
copy(out[index:], message[i*chunkSize:i*chunkSize+chunkSize])
index += chunkSize
} else {
copy(out[index:], message[i*chunkSize:i*chunkSize+lastChunkSize])
index += lastChunkSize
}
prevHeader = header
}
return out[:index]
}