Skip to content

Commit

Permalink
playback: adjust speed to compensate discrepancies between NTP and DTS
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Apr 7, 2024
1 parent 0a8e626 commit f248cce
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 118 deletions.
6 changes: 3 additions & 3 deletions internal/playback/muxer.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package playback

import "github.com/bluenviron/mediacommon/pkg/formats/fmp4"

type muxer interface {
writeInit(init []byte)
setTrack(trackID int)
writeSample(normalizedElapsed int64, sample *fmp4.PartSample)
writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte)
writeFinalDTS(dts int64)
flush() error
finalFlush() error
}
123 changes: 92 additions & 31 deletions internal/playback/muxer_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import (
)

type muxerFMP4Track struct {
started bool
fmp4.PartTrack
started bool
id int
firstDTS uint64
lastDTS int64
samples []*fmp4.PartSample
}

func findTrack(tracks []*muxerFMP4Track, id int) *muxerFMP4Track {
for _, track := range tracks {
if track.ID == id {
if track.id == id {
return track
}
}
Expand All @@ -39,49 +42,105 @@ func (w *muxerFMP4) setTrack(trackID int) {
w.curTrack = findTrack(w.tracks, trackID)
if w.curTrack == nil {
w.curTrack = &muxerFMP4Track{
PartTrack: fmp4.PartTrack{
ID: trackID,
},
id: trackID,
}
w.tracks = append(w.tracks, w.curTrack)
}
}

func (w *muxerFMP4) writeSample(normalizedElapsed int64, sample *fmp4.PartSample) {
func (w *muxerFMP4) writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte) {
if !w.curTrack.started {
if normalizedElapsed >= 0 {
if dts >= 0 {
w.curTrack.started = true
w.curTrack.BaseTime = uint64(normalizedElapsed)

if !sample.IsNonSyncSample {
w.curTrack.Samples = []*fmp4.PartSample{sample}
w.curTrack.firstDTS = uint64(dts)

if !isNonSyncSample {
w.curTrack.samples = []*fmp4.PartSample{{
PTSOffset: ptsOffset,
IsNonSyncSample: isNonSyncSample,
Payload: payload,
}}

Check warning on line 62 in internal/playback/muxer_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/muxer_fmp4.go#L58-L62

Added lines #L58 - L62 were not covered by tests
} else {
w.curTrack.Samples = append(w.curTrack.Samples, sample)
w.curTrack.samples = append(w.curTrack.samples, &fmp4.PartSample{
PTSOffset: ptsOffset,
IsNonSyncSample: isNonSyncSample,
Payload: payload,
})
}
w.curTrack.lastDTS = dts
} else {
sample.Duration = 0
sample.PTSOffset = 0

if !sample.IsNonSyncSample {
w.curTrack.Samples = []*fmp4.PartSample{sample}
ptsOffset = 0

if !isNonSyncSample {
w.curTrack.samples = []*fmp4.PartSample{{
PTSOffset: ptsOffset,
IsNonSyncSample: isNonSyncSample,
Payload: payload,
}}
} else {
w.curTrack.Samples = append(w.curTrack.Samples, sample)
w.curTrack.samples = append(w.curTrack.samples, &fmp4.PartSample{
PTSOffset: ptsOffset,
IsNonSyncSample: isNonSyncSample,
Payload: payload,
})

Check warning on line 85 in internal/playback/muxer_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/muxer_fmp4.go#L81-L85

Added lines #L81 - L85 were not covered by tests
}
}
} else {
if w.curTrack.Samples == nil {
w.curTrack.BaseTime = uint64(normalizedElapsed)
if w.curTrack.samples == nil {
w.curTrack.firstDTS = uint64(dts)

Check warning on line 90 in internal/playback/muxer_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/muxer_fmp4.go#L90

Added line #L90 was not covered by tests
} else {
diff := dts - w.curTrack.lastDTS
if diff < 0 {
diff = 0
}

Check warning on line 95 in internal/playback/muxer_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/muxer_fmp4.go#L94-L95

Added lines #L94 - L95 were not covered by tests

w.curTrack.samples[len(w.curTrack.samples)-1].Duration = uint32(diff)
}
w.curTrack.Samples = append(w.curTrack.Samples, sample)

w.curTrack.samples = append(w.curTrack.samples, &fmp4.PartSample{
PTSOffset: ptsOffset,
IsNonSyncSample: isNonSyncSample,
Payload: payload,
})
w.curTrack.lastDTS = dts
}
}

func (w *muxerFMP4) flush() error {
func (w *muxerFMP4) writeFinalDTS(dts int64) {
if w.curTrack.started && w.curTrack.samples != nil {
diff := dts - w.curTrack.lastDTS
if diff < 0 {
diff = 0
}

Check warning on line 114 in internal/playback/muxer_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/muxer_fmp4.go#L113-L114

Added lines #L113 - L114 were not covered by tests

w.curTrack.samples[len(w.curTrack.samples)-1].Duration = uint32(diff)
}
}

func (w *muxerFMP4) flush2(final bool) error {
var part fmp4.Part

for _, track := range w.tracks {
if track.started && track.Samples != nil {
part.Tracks = append(part.Tracks, &track.PartTrack)
if track.started && (len(track.samples) > 1 || (final && len(track.samples) != 0)) {
var samples []*fmp4.PartSample
if !final {
samples = track.samples[:len(track.samples)-1]
} else {
samples = track.samples
}

part.Tracks = append(part.Tracks, &fmp4.PartTrack{
ID: track.id,
BaseTime: track.firstDTS,
Samples: samples,
})

if !final {
track.samples = track.samples[len(track.samples)-1:]
track.firstDTS = uint64(track.lastDTS)
} else {
track.samples = nil
}
}
}

Expand Down Expand Up @@ -110,11 +169,13 @@ func (w *muxerFMP4) flush() error {
w.outBuf.Reset()
}

for _, track := range w.tracks {
if track.started {
track.Samples = nil
}
}

return nil
}

func (w *muxerFMP4) flush() error {
return w.flush2(false)
}

func (w *muxerFMP4) finalFlush() error {
return w.flush2(true)
}
38 changes: 20 additions & 18 deletions internal/playback/on_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,11 @@ func seekAndMux(
segments []*Segment,
start time.Time,
duration time.Duration,
w muxer,
m muxer,
) error {
if recordFormat == conf.RecordFormatFMP4 {
minTime := start.Sub(segments[0].Start)
maxTime := minTime + duration
var init []byte
var maxElapsed time.Duration
var firstInit []byte
var segmentEnd time.Time

err := func() error {
f, err := os.Open(segments[0].Fpath)
Expand All @@ -60,29 +58,28 @@ func seekAndMux(
}
defer f.Close()

init, err = segmentFMP4ReadInit(f)
firstInit, err = segmentFMP4ReadInit(f)
if err != nil {
return err
}

w.writeInit(init)
m.writeInit(firstInit)

maxElapsed, err = segmentFMP4SeekAndMuxParts(f, minTime, maxTime, w)
segmentStartOffset := start.Sub(segments[0].Start)

segmentMaxElapsed, err := segmentFMP4SeekAndMuxParts(f, segmentStartOffset, duration, m)
if err != nil {
return err
}

segmentEnd = start.Add(segmentMaxElapsed)

return nil
}()
if err != nil {
return err
}

duration -= maxElapsed
overallElapsed := maxElapsed
prevInit := init
prevEnd := start.Add(maxElapsed)

for _, seg := range segments[1:] {
err := func() error {
f, err := os.Open(seg.Fpath)
Expand All @@ -96,15 +93,19 @@ func seekAndMux(
return err
}

if !segmentFMP4CanBeConcatenated(prevInit, prevEnd, init, seg.Start) {
if !segmentFMP4CanBeConcatenated(firstInit, segmentEnd, init, seg.Start) {
return errStopIteration
}

maxElapsed, err = segmentFMP4WriteParts(f, overallElapsed, duration, w)
segmentStartOffset := seg.Start.Sub(start)

segmentMaxElapsed, err := segmentFMP4WriteParts(f, segmentStartOffset, duration, m)
if err != nil {
return err
}

segmentEnd = start.Add(segmentMaxElapsed)

return nil
}()
if err != nil {
Expand All @@ -114,10 +115,11 @@ func seekAndMux(

return err
}
}

duration -= maxElapsed
overallElapsed += maxElapsed
prevEnd = seg.Start.Add(maxElapsed)
err = m.finalFlush()
if err != nil {
return err

Check warning on line 122 in internal/playback/on_get.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/on_get.go#L122

Added line #L122 was not covered by tests
}

return nil
Expand Down
Loading

0 comments on commit f248cce

Please sign in to comment.