Skip to content

Commit

Permalink
restore video/audio synchronization in HLS muxer and RTMP server
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Nov 2, 2022
1 parent ec9fcf3 commit e9896b9
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 65 deletions.
115 changes: 68 additions & 47 deletions internal/core/hls_muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,53 +353,12 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})

writerDone := make(chan error)
go func() {
writerDone <- func() error {
var videoInitialPTS *time.Duration

for {
item, ok := m.ringBuffer.Pull()
if !ok {
return fmt.Errorf("terminated")
}
data := item.(data)

if videoTrack != nil && data.getTrackID() == videoTrackID {
tdata := data.(*dataH264)

if tdata.nalus == nil {
continue
}

if videoInitialPTS == nil {
v := tdata.pts
videoInitialPTS = &v
}
pts := tdata.pts - *videoInitialPTS

err = m.muxer.WriteH264(time.Now(), pts, tdata.nalus)
if err != nil {
return fmt.Errorf("muxer error: %v", err)
}
} else if audioTrack != nil && data.getTrackID() == audioTrackID {
tdata := data.(*dataMPEG4Audio)

if tdata.aus == nil {
continue
}

for i, au := range tdata.aus {
err = m.muxer.WriteAAC(
time.Now(),
tdata.pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*
time.Second/time.Duration(audioTrack.ClockRate()),
au)
if err != nil {
return fmt.Errorf("muxer error: %v", err)
}
}
}
}
}()
writerDone <- m.runWriter(
videoTrack,
videoTrackID,
audioTrack,
audioTrackID,
)
}()

closeCheckTicker := time.NewTicker(closeCheckPeriod)
Expand All @@ -426,6 +385,68 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
}
}

func (m *hlsMuxer) runWriter(
videoTrack *gortsplib.TrackH264,
videoTrackID int,
audioTrack *gortsplib.TrackMPEG4Audio,
audioTrackID int,
) error {
videoStartPTSFilled := false
var videoStartPTS time.Duration
audioStartPTSFilled := false
var audioStartPTS time.Duration

for {
item, ok := m.ringBuffer.Pull()
if !ok {
return fmt.Errorf("terminated")
}
data := item.(data)

if videoTrack != nil && data.getTrackID() == videoTrackID {
tdata := data.(*dataH264)

if tdata.nalus == nil {
continue
}

if !videoStartPTSFilled {
videoStartPTSFilled = true
videoStartPTS = tdata.pts
}
pts := tdata.pts - videoStartPTS

err := m.muxer.WriteH264(time.Now(), pts, tdata.nalus)
if err != nil {
return fmt.Errorf("muxer error: %v", err)
}
} else if audioTrack != nil && data.getTrackID() == audioTrackID {
tdata := data.(*dataMPEG4Audio)

if tdata.aus == nil {
continue
}

if !audioStartPTSFilled {
audioStartPTSFilled = true
audioStartPTS = tdata.pts
}
pts := tdata.pts - audioStartPTS

for i, au := range tdata.aus {
err := m.muxer.WriteAAC(
time.Now(),
pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*
time.Second/time.Duration(audioTrack.ClockRate()),
au)
if err != nil {
return fmt.Errorf("muxer error: %v", err)
}
}
}
}
}

func (m *hlsMuxer) handleRequest(req *hlsMuxerRequest) func() *hls.MuxerFileResponse {
atomic.StoreInt64(m.lastRequestTime, time.Now().UnixNano())

Expand Down
37 changes: 22 additions & 15 deletions internal/core/rtmp_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,11 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
// disable read deadline
c.nconn.SetReadDeadline(time.Time{})

var videoInitialPTS *time.Duration
videoStartPTSFilled := false
var videoStartPTS time.Duration
audioStartPTSFilled := false
var audioStartPTS time.Duration

videoFirstIDRFound := false
var videoStartDTS time.Duration
var videoDTSExtractor *h264.DTSExtractor
Expand All @@ -346,15 +350,11 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
continue
}

// video is decoded in another routine,
// while audio is decoded in this routine:
// we have to sync their PTS.
if videoInitialPTS == nil {
v := tdata.pts
videoInitialPTS = &v
if !videoStartPTSFilled {
videoStartPTSFilled = true
videoStartPTS = tdata.pts
}

pts := tdata.pts - *videoInitialPTS
pts := tdata.pts - videoStartPTS

idrPresent := false
nonIDRPresent := false
Expand Down Expand Up @@ -430,14 +430,21 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
continue
}

if videoTrack != nil && !videoFirstIDRFound {
continue
if !audioStartPTSFilled {
audioStartPTSFilled = true
audioStartPTS = tdata.pts
}
pts := tdata.pts - audioStartPTS

pts := tdata.pts
pts -= videoStartDTS
if pts < 0 {
continue
if videoTrack != nil {
if !videoFirstIDRFound {
continue
}

pts -= videoStartDTS
if pts < 0 {
continue
}
}

for i, au := range tdata.aus {
Expand Down
8 changes: 5 additions & 3 deletions internal/hls/muxer_variant_fmp4_segmenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (m *muxerVariantFMP4Segmenter) writeH264Entry(
}

// put samples into a queue in order to
// - allow to compute sample duration
// - compute sample duration
// - check if next sample is IDR
sample, m.nextVideoSample = m.nextVideoSample, sample
if sample == nil {
Expand Down Expand Up @@ -290,6 +290,9 @@ func (m *muxerVariantFMP4Segmenter) writeAAC(now time.Time, dts time.Duration, a
}

dts -= m.startDTS
if dts < 0 {
return nil
}
}

sample := &augmentedAudioSample{
Expand All @@ -299,8 +302,7 @@ func (m *muxerVariantFMP4Segmenter) writeAAC(now time.Time, dts time.Duration, a
dts: dts,
}

// put samples into a queue in order to
// allow to compute the sample duration
// put samples into a queue in order to compute the sample duration
sample, m.nextAudioSample = m.nextAudioSample, sample
if sample == nil {
return nil
Expand Down

0 comments on commit e9896b9

Please sign in to comment.