Skip to content

Commit

Permalink
Decode streams once and only when needed (#1218)
Browse files Browse the repository at this point in the history
* split data into specialized structs

* move MPEG4-audio decoding into streamTrack

* restore video/audio synchronization in HLS muxer and RTMP server

* log decode errors

* move H264 decoding and re-encoding here from gortsplib

* add tests

* update gortsplib
  • Loading branch information
aler9 committed Nov 2, 2022
1 parent bf14467 commit 0943b26
Show file tree
Hide file tree
Showing 19 changed files with 860 additions and 246 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.18
require (
code.cloudfoundry.org/bytefmt v0.0.0-20211005130812-5bb3c17173e5
github.com/abema/go-mp4 v0.8.0
github.com/aler9/gortsplib v0.0.0-20221101102023-dbb6934a3c3e
github.com/aler9/gortsplib v0.0.0-20221102164639-d3c23a849c83
github.com/asticode/go-astits v1.10.1-0.20220319093903-4abe66a9b757
github.com/fsnotify/fsnotify v1.4.9
github.com/gin-gonic/gin v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20221101102023-dbb6934a3c3e h1:x+EHN8/YHjG6NQM59WG+fdPmozyIarDZgJZymNbDmFE=
github.com/aler9/gortsplib v0.0.0-20221101102023-dbb6934a3c3e/go.mod h1:BOWNZ/QBkY/eVcRqUzJbPFEsRJshwxaxBT01K260Jeo=
github.com/aler9/gortsplib v0.0.0-20221102164639-d3c23a849c83 h1:Qn/TL5+Nm4g+IgQ1DODtu6oCve0plBiJsprbnLG3yfQ=
github.com/aler9/gortsplib v0.0.0-20221102164639-d3c23a849c83/go.mod h1:BOWNZ/QBkY/eVcRqUzJbPFEsRJshwxaxBT01K260Jeo=
github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82 h1:9WgSzBLo3a9ToSVV7sRTBYZ1GGOZUpq4+5H3SN0UZq4=
github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82/go.mod h1:qsMrZCbeBf/mCLOeF16KDkPu4gktn/pOWyaq1aYQE7U=
github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8=
Expand Down
65 changes: 55 additions & 10 deletions internal/core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,65 @@ import (
)

// data is the data unit routed across the server.
// it must contain one or more of the following:
// - a single RTP packet
// - a group of H264 NALUs (grouped by timestamp)
// - a single AAC AU
type data struct {
trackID int
type data interface {
getTrackID() int
getRTPPackets() []*rtp.Packet
getPTSEqualsDTS() bool
}

type dataGeneric struct {
trackID int
rtpPackets []*rtp.Packet
ptsEqualsDTS bool
}

rtpPacket *rtp.Packet
func (d *dataGeneric) getTrackID() int {
return d.trackID
}

// timing
func (d *dataGeneric) getRTPPackets() []*rtp.Packet {
return d.rtpPackets
}

func (d *dataGeneric) getPTSEqualsDTS() bool {
return d.ptsEqualsDTS
}

type dataH264 struct {
trackID int
rtpPackets []*rtp.Packet
ptsEqualsDTS bool
pts time.Duration
nalus [][]byte
}

h264NALUs [][]byte
func (d *dataH264) getTrackID() int {
return d.trackID
}

func (d *dataH264) getRTPPackets() []*rtp.Packet {
return d.rtpPackets
}

func (d *dataH264) getPTSEqualsDTS() bool {
return d.ptsEqualsDTS
}

type dataMPEG4Audio struct {
trackID int
rtpPackets []*rtp.Packet
pts time.Duration
aus [][]byte
}

func (d *dataMPEG4Audio) getTrackID() int {
return d.trackID
}

func (d *dataMPEG4Audio) getRTPPackets() []*rtp.Packet {
return d.rtpPackets
}

mpeg4AudioAU []byte
func (d *dataMPEG4Audio) getPTSEqualsDTS() bool {
return true
}
126 changes: 69 additions & 57 deletions internal/core/hls_muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/mpeg4audio"
"github.com/aler9/gortsplib/pkg/ringbuffer"
"github.com/aler9/gortsplib/pkg/rtpmpeg4audio"
"github.com/gin-gonic/gin"

"github.com/aler9/rtsp-simple-server/internal/conf"
Expand Down Expand Up @@ -295,7 +294,6 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
videoTrackID := -1
var audioTrack *gortsplib.TrackMPEG4Audio
audioTrackID := -1
var aacDecoder *rtpmpeg4audio.Decoder

for i, track := range res.stream.tracks() {
switch tt := track.(type) {
Expand All @@ -314,13 +312,6 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})

audioTrack = tt
audioTrackID = i
aacDecoder = &rtpmpeg4audio.Decoder{
SampleRate: tt.Config.SampleRate,
SizeLength: tt.SizeLength,
IndexLength: tt.IndexLength,
IndexDeltaLength: tt.IndexDeltaLength,
}
aacDecoder.Init()
}
}

Expand Down Expand Up @@ -362,53 +353,12 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})

writerDone := make(chan error)
go func() {
writerDone <- func() error {
var videoInitialPTS *time.Duration

for {
item, ok := m.ringBuffer.Pull()
if !ok {
return fmt.Errorf("terminated")
}
data := item.(*data)

if videoTrack != nil && data.trackID == videoTrackID {
if data.h264NALUs == nil {
continue
}

if videoInitialPTS == nil {
v := data.pts
videoInitialPTS = &v
}
pts := data.pts - *videoInitialPTS

err = m.muxer.WriteH264(time.Now(), pts, data.h264NALUs)
if err != nil {
return fmt.Errorf("muxer error: %v", err)
}
} else if audioTrack != nil && data.trackID == audioTrackID {
aus, pts, err := aacDecoder.Decode(data.rtpPacket)
if err != nil {
if err != rtpmpeg4audio.ErrMorePacketsNeeded {
m.log(logger.Warn, "unable to decode audio track: %v", err)
}
continue
}

for i, au := range aus {
err = m.muxer.WriteAAC(
time.Now(),
pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*
time.Second/time.Duration(audioTrack.ClockRate()),
au)
if err != nil {
return fmt.Errorf("muxer error: %v", err)
}
}
}
}
}()
writerDone <- m.runWriter(
videoTrack,
videoTrackID,
audioTrack,
audioTrackID,
)
}()

closeCheckTicker := time.NewTicker(closeCheckPeriod)
Expand All @@ -435,6 +385,68 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
}
}

func (m *hlsMuxer) runWriter(
videoTrack *gortsplib.TrackH264,
videoTrackID int,
audioTrack *gortsplib.TrackMPEG4Audio,
audioTrackID int,
) error {
videoStartPTSFilled := false
var videoStartPTS time.Duration
audioStartPTSFilled := false
var audioStartPTS time.Duration

for {
item, ok := m.ringBuffer.Pull()
if !ok {
return fmt.Errorf("terminated")
}
data := item.(data)

if videoTrack != nil && data.getTrackID() == videoTrackID {
tdata := data.(*dataH264)

if tdata.nalus == nil {
continue
}

if !videoStartPTSFilled {
videoStartPTSFilled = true
videoStartPTS = tdata.pts
}
pts := tdata.pts - videoStartPTS

err := m.muxer.WriteH264(time.Now(), pts, tdata.nalus)
if err != nil {
return fmt.Errorf("muxer error: %v", err)
}
} else if audioTrack != nil && data.getTrackID() == audioTrackID {
tdata := data.(*dataMPEG4Audio)

if tdata.aus == nil {
continue
}

if !audioStartPTSFilled {
audioStartPTSFilled = true
audioStartPTS = tdata.pts
}
pts := tdata.pts - audioStartPTS

for i, au := range tdata.aus {
err := m.muxer.WriteAAC(
time.Now(),
pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*
time.Second/time.Duration(audioTrack.ClockRate()),
au)
if err != nil {
return fmt.Errorf("muxer error: %v", err)
}
}
}
}
}

func (m *hlsMuxer) handleRequest(req *hlsMuxerRequest) func() *hls.MuxerFileResponse {
atomic.StoreInt64(m.lastRequestTime, time.Now().UnixNano())

Expand Down Expand Up @@ -558,7 +570,7 @@ func (m *hlsMuxer) apiHLSMuxersList(req hlsServerAPIMuxersListSubReq) {
}

// onReaderData implements reader.
func (m *hlsMuxer) onReaderData(data *data) {
func (m *hlsMuxer) onReaderData(data data) {
m.ringBuffer.Push(data)
}

Expand Down
19 changes: 12 additions & 7 deletions internal/core/hls_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,26 @@ func (s *hlsSource) run(ctx context.Context) error {
}

onVideoData := func(pts time.Duration, nalus [][]byte) {
stream.writeData(&data{
err := stream.writeData(&dataH264{
trackID: videoTrackID,
ptsEqualsDTS: h264.IDRPresent(nalus),
pts: pts,
h264NALUs: nalus,
nalus: nalus,
})
if err != nil {
s.Log(logger.Warn, "%v", err)
}
}

onAudioData := func(pts time.Duration, au []byte) {
stream.writeData(&data{
trackID: audioTrackID,
ptsEqualsDTS: true,
pts: pts,
mpeg4AudioAU: au,
err := stream.writeData(&dataMPEG4Audio{
trackID: audioTrackID,
pts: pts,
aus: [][]byte{au},
})
if err != nil {
s.Log(logger.Warn, "%v", err)
}
}

c, err := hls.NewClient(
Expand Down
26 changes: 21 additions & 5 deletions internal/core/hls_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/aler9/gortsplib/pkg/url"
"github.com/asticode/go-astits"
"github.com/gin-gonic/gin"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -136,11 +137,26 @@ func TestHLSSource(t *testing.T) {

c := gortsplib.Client{
OnPacketRTP: func(ctx *gortsplib.ClientOnPacketRTPCtx) {
require.Equal(t, [][]byte{
{0x07, 0x01, 0x02, 0x03},
{0x08},
{0x05},
}, ctx.H264NALUs)
require.Equal(t, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: ctx.Packet.SequenceNumber,
Timestamp: ctx.Packet.Timestamp,
SSRC: ctx.Packet.SSRC,
CSRC: []uint32{},
},
Payload: []byte{
0x18,
0x00, 0x04,
0x07, 0x01, 0x02, 0x03, // SPS
0x00, 0x01,
0x08, // PPS
0x00, 0x01,
0x05, // ODR
},
}, ctx.Packet)
close(frameRecv)
},
}
Expand Down
2 changes: 1 addition & 1 deletion internal/core/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ package core
// reader is an entity that can read a stream.
type reader interface {
close()
onReaderData(*data)
onReaderData(data)
apiReaderDescribe() interface{}
}
7 changes: 5 additions & 2 deletions internal/core/rpicamera_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,15 @@ func (s *rpiCameraSource) run(ctx context.Context) error {
stream = res.stream
}

stream.writeData(&data{
err := stream.writeData(&dataH264{
trackID: 0,
ptsEqualsDTS: h264.IDRPresent(nalus),
pts: dts,
h264NALUs: nalus,
nalus: nalus,
})
if err != nil {
s.Log(logger.Warn, "%v", err)
}
}

cam, err := rpicamera.New(s.params, onData)
Expand Down

0 comments on commit 0943b26

Please sign in to comment.