Skip to content

Commit

Permalink
hls muxer: avoid infinite loop when hlsAlwaysRemux is true
Browse files Browse the repository at this point in the history
when hlsAlwaysRemux is true and a muxer fails, add a pause between its
recreation, in order to avoid infinite loops
  • Loading branch information
aler9 committed Jan 8, 2023
1 parent b20abbe commit 7420ef1
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 46 deletions.
120 changes: 78 additions & 42 deletions internal/core/hls_muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (
)

const (
closeCheckPeriod = 1 * time.Second
closeAfterInactivity = 60 * time.Second
closeCheckPeriod = 1 * time.Second
closeAfterInactivity = 60 * time.Second
hlsMuxerRecreatePause = 10 * time.Second
)

//go:embed hls_index.html
Expand Down Expand Up @@ -57,11 +58,12 @@ type hlsMuxer struct {
name string
remoteAddr string
externalAuthenticationURL string
hlsVariant conf.HLSVariant
hlsSegmentCount int
hlsSegmentDuration conf.StringDuration
hlsPartDuration conf.StringDuration
hlsSegmentMaxSize conf.StringSize
alwaysRemux bool
variant conf.HLSVariant
segmentCount int
segmentDuration conf.StringDuration
partDuration conf.StringDuration
segmentMaxSize conf.StringSize
readBufferCount int
wg *sync.WaitGroup
pathName string
Expand All @@ -88,11 +90,12 @@ func newHLSMuxer(
name string,
remoteAddr string,
externalAuthenticationURL string,
hlsVariant conf.HLSVariant,
hlsSegmentCount int,
hlsSegmentDuration conf.StringDuration,
hlsPartDuration conf.StringDuration,
hlsSegmentMaxSize conf.StringSize,
alwaysRemux bool,
variant conf.HLSVariant,
segmentCount int,
segmentDuration conf.StringDuration,
partDuration conf.StringDuration,
segmentMaxSize conf.StringSize,
readBufferCount int,
req *hlsMuxerRequest,
wg *sync.WaitGroup,
Expand All @@ -106,11 +109,12 @@ func newHLSMuxer(
name: name,
remoteAddr: remoteAddr,
externalAuthenticationURL: externalAuthenticationURL,
hlsVariant: hlsVariant,
hlsSegmentCount: hlsSegmentCount,
hlsSegmentDuration: hlsSegmentDuration,
hlsPartDuration: hlsPartDuration,
hlsSegmentMaxSize: hlsSegmentMaxSize,
alwaysRemux: alwaysRemux,
variant: variant,
segmentCount: segmentCount,
segmentDuration: segmentDuration,
partDuration: partDuration,
segmentMaxSize: segmentMaxSize,
readBufferCount: readBufferCount,
wg: wg,
pathName: pathName,
Expand Down Expand Up @@ -161,21 +165,34 @@ func (m *hlsMuxer) PathName() string {
func (m *hlsMuxer) run() {
defer m.wg.Done()

innerCtx, innerCtxCancel := context.WithCancel(context.Background())
innerReady := make(chan struct{})
innerErr := make(chan error)
go func() {
innerErr <- m.runInner(innerCtx, innerReady)
}()
err := func() error {
var innerReady chan struct{}
var innerErr chan error
var innerCtx context.Context
var innerCtxCancel func()

createInner := func() {
innerReady = make(chan struct{})
innerErr = make(chan error)
innerCtx, innerCtxCancel = context.WithCancel(context.Background())
go func() {
innerErr <- m.runInner(innerCtx, innerReady)
}()
}

isReady := false
createInner()

isReady := false
isRecreating := false
recreateTimer := newEmptyTimer()

err := func() error {
for {
select {
case <-m.ctx.Done():
innerCtxCancel()
<-innerErr
if !isRecreating {
innerCtxCancel()
<-innerErr
}
return errors.New("terminated")

case req := <-m.chRequest:
Expand Down Expand Up @@ -208,13 +225,34 @@ func (m *hlsMuxer) run() {

case err := <-innerErr:
innerCtxCancel()
return err

if m.alwaysRemux {
m.log(logger.Info, "ERR: %v", err)
m.clearQueuedRequests()
isReady = false
isRecreating = true
recreateTimer = time.NewTimer(hlsMuxerRecreatePause)
} else {
return err
}

case <-recreateTimer.C:
isRecreating = false
createInner()
}
}
}()

m.ctxCancel()

m.clearQueuedRequests()

m.parent.muxerClose(m)

m.log(logger.Info, "destroyed (%v)", err)
}

func (m *hlsMuxer) clearQueuedRequests() {
for _, req := range m.requests {
req.res <- hlsMuxerResponse{
muxer: m,
Expand All @@ -223,10 +261,6 @@ func (m *hlsMuxer) run() {
},
}
}

m.parent.muxerClose(m)

m.log(logger.Info, "destroyed (%v)", err)
}

func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error {
Expand Down Expand Up @@ -267,11 +301,11 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})

var err error
m.muxer, err = hls.NewMuxer(
hls.MuxerVariant(m.hlsVariant),
m.hlsSegmentCount,
time.Duration(m.hlsSegmentDuration),
time.Duration(m.hlsPartDuration),
uint64(m.hlsSegmentMaxSize),
hls.MuxerVariant(m.variant),
m.segmentCount,
time.Duration(m.segmentDuration),
time.Duration(m.partDuration),
uint64(m.segmentMaxSize),
videoFormat,
audioFormat,
)
Expand All @@ -296,11 +330,13 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
for {
select {
case <-closeCheckTicker.C:
t := time.Unix(0, atomic.LoadInt64(m.lastRequestTime))
if m.remoteAddr != "" && time.Since(t) >= closeAfterInactivity {
m.ringBuffer.Close()
<-writerDone
return fmt.Errorf("not used anymore")
if m.remoteAddr != "" {
t := time.Unix(0, atomic.LoadInt64(m.lastRequestTime))
if time.Since(t) >= closeAfterInactivity {
m.ringBuffer.Close()
<-writerDone
return fmt.Errorf("not used anymore")
}
}

case err := <-writerDone:
Expand Down
5 changes: 1 addition & 4 deletions internal/core/hls_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,6 @@ outer:
}
delete(s.muxers, c.PathName())

if s.alwaysRemux && c.remoteAddr == "" {
s.findOrCreateMuxer(c.PathName(), "", nil)
}

case req := <-s.chAPIMuxerList:
muxers := make(map[string]*hlsMuxer)

Expand Down Expand Up @@ -339,6 +335,7 @@ func (s *hlsServer) findOrCreateMuxer(pathName string, remoteAddr string, req *h
pathName,
remoteAddr,
s.externalAuthenticationURL,
s.alwaysRemux,
s.variant,
s.segmentCount,
s.segmentDuration,
Expand Down

0 comments on commit 7420ef1

Please sign in to comment.