Skip to content

Commit

Permalink
client: return ErrClientEOS when a playlist ends (#59) (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Jan 1, 2024
1 parent 12db622 commit 9c3bf60
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 3 deletions.
4 changes: 4 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package gohlslib

import (
"context"
"errors"
"fmt"
"log"
"net/http"
Expand All @@ -22,6 +23,9 @@ const (
clientMaxDTSRTCDiff = 10 * time.Second
)

// ErrClientEOS is returned by Wait() when the stream has ended.
var ErrClientEOS = errors.New("end of stream")

// ClientOnDownloadPrimaryPlaylistFunc is the prototype of Client.OnDownloadPrimaryPlaylist.
type ClientOnDownloadPrimaryPlaylistFunc func(url string)

Expand Down
22 changes: 21 additions & 1 deletion client_primary_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type clientPrimaryDownloader struct {

// in
chStreamTracks chan clientStreamProcessor
chStreamEnded chan struct{}

// out
startStreaming chan struct{}
Expand All @@ -122,6 +123,7 @@ type clientPrimaryDownloader struct {
func (d *clientPrimaryDownloader) initialize() {
d.streamProcByTrack = make(map[*Track]clientStreamProcessor)
d.chStreamTracks = make(chan clientStreamProcessor)
d.chStreamEnded = make(chan struct{})
d.startStreaming = make(chan struct{})
d.leadingTimeSyncReady = make(chan struct{})
}
Expand All @@ -148,6 +150,7 @@ func (d *clientPrimaryDownloader) run(ctx context.Context) error {
initialPlaylist: plt,
rp: d.rp,
onStreamTracks: d.onStreamTracks,
onStreamEnded: d.onStreamEnded,
onSetLeadingTimeSync: d.onSetLeadingTimeSync,
onGetLeadingTimeSync: d.onGetLeadingTimeSync,
onData: d.onData,
Expand Down Expand Up @@ -176,6 +179,7 @@ func (d *clientPrimaryDownloader) run(ctx context.Context) error {
initialPlaylist: nil,
rp: d.rp,
onStreamTracks: d.onStreamTracks,
onStreamEnded: d.onStreamEnded,
onSetLeadingTimeSync: d.onSetLeadingTimeSync,
onGetLeadingTimeSync: d.onGetLeadingTimeSync,
onData: d.onData,
Expand Down Expand Up @@ -208,6 +212,7 @@ func (d *clientPrimaryDownloader) run(ctx context.Context) error {
onSetLeadingTimeSync: d.onSetLeadingTimeSync,
onGetLeadingTimeSync: d.onGetLeadingTimeSync,
onData: d.onData,
onStreamEnded: d.onStreamEnded,
}
d.rp.add(ds)
streamCount++
Expand Down Expand Up @@ -251,7 +256,15 @@ func (d *clientPrimaryDownloader) run(ctx context.Context) error {

close(d.startStreaming)

return nil
for i := 0; i < streamCount; i++ {
select {
case <-d.chStreamEnded:
case <-ctx.Done():
return fmt.Errorf("terminated")
}
}

return ErrClientEOS
}

func (d *clientPrimaryDownloader) onStreamTracks(ctx context.Context, streamProc clientStreamProcessor) bool {
Expand All @@ -270,6 +283,13 @@ func (d *clientPrimaryDownloader) onStreamTracks(ctx context.Context, streamProc
return true
}

func (d *clientPrimaryDownloader) onStreamEnded(ctx context.Context) {
select {
case d.chStreamEnded <- struct{}{}:
case <-ctx.Done():
}
}

func (d *clientPrimaryDownloader) onSetLeadingTimeSync(ts clientTimeSync) {
d.leadingTimeSync = ts
close(d.leadingTimeSyncReady)
Expand Down
6 changes: 5 additions & 1 deletion client_stream_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type clientStreamDownloader struct {
initialPlaylist *playlist.Media
rp *clientRoutinePool
onStreamTracks clientOnStreamTracksFunc
onStreamEnded func(context.Context)
onSetLeadingTimeSync func(clientTimeSync)
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool)
onData map[*Track]interface{}
Expand Down Expand Up @@ -77,6 +78,7 @@ func (d *clientStreamDownloader) run(ctx context.Context) error {
segmentQueue: segmentQueue,
rp: d.rp,
onStreamTracks: d.onStreamTracks,
onStreamEnded: d.onStreamEnded,
onSetLeadingTimeSync: d.onSetLeadingTimeSync,
onGetLeadingTimeSync: d.onGetLeadingTimeSync,
onData: d.onData,
Expand All @@ -94,6 +96,7 @@ func (d *clientStreamDownloader) run(ctx context.Context) error {
segmentQueue: segmentQueue,
rp: d.rp,
onStreamTracks: d.onStreamTracks,
onStreamEnded: d.onStreamEnded,
onSetLeadingTimeSync: d.onSetLeadingTimeSync,
onGetLeadingTimeSync: d.onGetLeadingTimeSync,
onData: d.onData,
Expand Down Expand Up @@ -231,8 +234,9 @@ func (d *clientStreamDownloader) fillSegmentQueue(
})

if pl.Endlist && pl.Segments[len(pl.Segments)-1] == seg {
segmentQueue.push(nil)
<-ctx.Done()
return fmt.Errorf("stream has ended")
return fmt.Errorf("terminated")
}

return nil
Expand Down
7 changes: 7 additions & 0 deletions client_stream_processor_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type clientStreamProcessorFMP4 struct {
segmentQueue *clientSegmentQueue
rp *clientRoutinePool
onStreamTracks clientOnStreamTracksFunc
onStreamEnded func(context.Context)
onSetLeadingTimeSync func(clientTimeSync)
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool)
onData map[*Track]interface{}
Expand Down Expand Up @@ -119,6 +120,12 @@ func (p *clientStreamProcessorFMP4) run(ctx context.Context) error {
}

func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, seg *segmentData) error {
if seg == nil {
p.onStreamEnded(ctx)
<-ctx.Done()
return fmt.Errorf("terminated")
}

var parts fmp4.Parts
err := parts.Unmarshal(seg.payload)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions client_stream_processor_mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type clientStreamProcessorMPEGTS struct {
segmentQueue *clientSegmentQueue
rp *clientRoutinePool
onStreamTracks clientOnStreamTracksFunc
onStreamEnded func(context.Context)
onSetLeadingTimeSync func(clientTimeSync)
onGetLeadingTimeSync func(context.Context) (clientTimeSync, bool)
onData map[*Track]interface{}
Expand Down Expand Up @@ -85,6 +86,12 @@ func (p *clientStreamProcessorMPEGTS) run(ctx context.Context) error {
}

func (p *clientStreamProcessorMPEGTS) processSegment(ctx context.Context, seg *segmentData) error {
if seg == nil {
p.onStreamEnded(ctx)
<-ctx.Done()
return fmt.Errorf("terminated")
}

if p.switchableReader == nil {
err := p.initializeReader(ctx, seg.payload)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,10 @@ func TestClient(t *testing.T) {
<-audioRecv
<-audioRecv

err = <-c.Wait()
require.Equal(t, ErrClientEOS, err)

c.Close()
<-c.Wait()
})
}
}
Expand Down

0 comments on commit 9c3bf60

Please sign in to comment.