-
Notifications
You must be signed in to change notification settings - Fork 14
/
interceptor.go
237 lines (209 loc) · 6.2 KB
/
interceptor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
// Copyright 2019 The ebml-go authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mkvcore
import (
"fmt"
"io"
"sync"
)
// BlockInterceptor is a interface of block stream muxer.
type BlockInterceptor interface {
// Intercept reads blocks of each track, filters, and writes.
Intercept(r []BlockReader, w []BlockWriter)
}
// MustBlockInterceptor panics if creation of a BlockInterceptor fails, such as
// when the NewMultiTrackBlockSorter function fails.
func MustBlockInterceptor(interceptor BlockInterceptor, err error) BlockInterceptor {
if err != nil {
panic(err)
}
return interceptor
}
type filterWriter struct {
trackNumber uint64
ch chan *frame
}
type filterReader struct {
ch chan *frame
}
func (w *filterWriter) Write(keyframe bool, timestamp int64, b []byte) (int, error) {
w.ch <- &frame{
trackNumber: w.trackNumber,
keyframe: keyframe,
timestamp: timestamp,
b: b,
}
return len(b), nil
}
func (r *filterReader) Read() ([]byte, bool, int64, error) {
frame, ok := <-r.ch
if !ok {
return nil, false, 0, io.EOF
}
return frame.b, frame.keyframe, frame.timestamp, nil
}
func (r *filterReader) close() {
close(r.ch)
}
// BlockSorterRule is a type of BlockSorter behaviour for outdated frame.
type BlockSorterRule int
// List of BlockSorterRules.
const (
BlockSorterDropOutdated BlockSorterRule = iota
BlockSorterWriteOutdated
)
// MultiTrackBlockSorterOption configures a MultiTrackBlockSorterOptions.
type MultiTrackBlockSorterOption func(*MultiTrackBlockSorterOptions) error
// MultiTrackBlockSorterOptions stores options for BlockWriter.
type MultiTrackBlockSorterOptions struct {
maxDelayedPackets int
rule BlockSorterRule
maxTimescaleDelay int64
}
// WithMaxDelayedPackets set the maximum number of packets that may be delayed
// within each track.
func WithMaxDelayedPackets(maxDelayedPackets int) MultiTrackBlockSorterOption {
return func(o *MultiTrackBlockSorterOptions) error {
o.maxDelayedPackets = maxDelayedPackets
return nil
}
}
// WithSortRule set the sort rule to apply to how packet ordering should be
// treated within the webm container.
func WithSortRule(rule BlockSorterRule) MultiTrackBlockSorterOption {
return func(o *MultiTrackBlockSorterOptions) error {
o.rule = rule
return nil
}
}
// WithMaxTimescaleDelay set the maximum allowed delay between tracks for a
// given timescale.
func WithMaxTimescaleDelay(maxTimescaleDelay int64) MultiTrackBlockSorterOption {
return func(o *MultiTrackBlockSorterOptions) error {
o.maxTimescaleDelay = maxTimescaleDelay
return nil
}
}
// NewMultiTrackBlockSorter creates BlockInterceptor, which sorts blocks on
// multiple tracks by timestamp. Either WithMaxDelayedPackets or
// WithMaxTimescaleDelay must be specified. If both are specified, then the
// first rule that is satisfied causes the packets to get written (thus a
// backlog of a max packets or max time scale will cause any older packets than
// the one satisfying the rule to be discarded). The index of TrackEntry sorts
// blocks with the same timestamp. Place the audio track before the video track
// to meet WebM Interceptor Guidelines.
func NewMultiTrackBlockSorter(opts ...MultiTrackBlockSorterOption) (BlockInterceptor, error) {
applyOptions := []MultiTrackBlockSorterOption{
WithMaxDelayedPackets(0),
WithSortRule(BlockSorterDropOutdated),
WithMaxTimescaleDelay(0),
}
applyOptions = append(applyOptions, opts...)
options := &MultiTrackBlockSorterOptions{}
for _, o := range applyOptions {
if err := o(options); err != nil {
return nil, err
}
}
if options.maxDelayedPackets == 0 && options.maxTimescaleDelay == 0 {
return nil, fmt.Errorf("must specify either WithMaxDelayedPackets(...) or WithMaxTimescaleDelay(...) with a non-0 value")
}
return &multiTrackBlockSorter{options: *options}, nil
}
type multiTrackBlockSorter struct {
options MultiTrackBlockSorterOptions
}
func (s *multiTrackBlockSorter) Intercept(r []BlockReader, w []BlockWriter) {
var wg sync.WaitGroup
wg.Add(len(r))
ch := make(chan *frame)
for i, r := range r {
go func(i int, r BlockReader) {
for {
var err error
f := &frame{trackNumber: uint64(i)}
if f.b, f.keyframe, f.timestamp, err = r.Read(); err != nil {
wg.Done()
return
}
ch <- f
}
}(i, r)
}
closed := make(chan struct{})
go func() {
wg.Wait()
close(closed)
}()
var tDone int64
buf := make([]*frameBuffer, len(r))
for i := range buf {
buf[i] = &frameBuffer{}
}
flush := func(all bool) {
nChReq := 1
if !all {
nChReq = len(r)
}
for {
var largestTimestampDelta int64
var tOldest int64
var tNewest int64
var nCh, nMax int
var bOldest *frameBuffer
var bNewest *frameBuffer
for _, b := range buf {
if n := b.Size(); n > 0 {
nCh++
if f := b.Head(); f.timestamp < tOldest || bOldest == nil {
tOldest = f.timestamp
bOldest = b
}
if f := b.Tail(); f.timestamp > tNewest || bNewest == nil {
tNewest = f.timestamp
bNewest = b
tDiff := tNewest - tOldest
if tDiff > largestTimestampDelta {
largestTimestampDelta = tDiff
}
}
if n > nMax {
nMax = n
}
}
}
if nCh >= nChReq ||
(nMax > s.options.maxDelayedPackets && s.options.maxDelayedPackets != 0) ||
(largestTimestampDelta > s.options.maxTimescaleDelay && s.options.maxTimescaleDelay != 0) {
fOldest := bOldest.Pop()
_, _ = w[fOldest.trackNumber].Write(fOldest.keyframe, fOldest.timestamp, fOldest.b)
tDone = fOldest.timestamp
} else {
break
}
}
}
for {
select {
case d := <-ch:
if d.timestamp >= tDone || s.options.rule == BlockSorterWriteOutdated {
buf[d.trackNumber].Push(d)
flush(false)
}
case <-closed:
flush(true)
return
}
}
}