Skip to content

Commit

Permalink
muxer: propagate query parameters to media playlist, segments and parts
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Jun 11, 2024
1 parent bc060a2 commit 46ea5bc
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 93 deletions.
136 changes: 101 additions & 35 deletions muxer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ import (
"github.com/bluenviron/gohlslib/pkg/playlist"
)

const (
multivariantPlaylistMaxAge = "30"
initMaxAge = "30"
segmentMaxAge = "3600"
)

func boolPtr(v bool) *bool {
return &v
}
Expand Down Expand Up @@ -121,6 +127,7 @@ func generateMultivariantPlaylist(
videoTrack *Track,
audioTrack *Track,
segments []muxerSegment,
rawQuery string,
) ([]byte, error) {
maxBandwidth, averageBandwidth := bandwidth(segments)
var resolution string
Expand Down Expand Up @@ -174,6 +181,11 @@ func generateMultivariantPlaylist(
}
}

mediaPlaylistURI := "stream.m3u8"
if rawQuery != "" {
mediaPlaylistURI += "?" + rawQuery
}

pl := &playlist.Multivariant{
Version: func() int {
if variant == MuxerVariantMPEGTS {
Expand All @@ -197,7 +209,7 @@ func generateMultivariantPlaylist(
}(),
Resolution: resolution,
FrameRate: frameRate,
URI: "stream.m3u8",
URI: mediaPlaylistURI,
}},
}

Expand Down Expand Up @@ -240,6 +252,7 @@ func generateInitFile(
func generateMediaPlaylistMPEGTS(
segments []muxerSegment,
segmentDeleteCount int,
rawQuery string,
) ([]byte, error) {
pl := &playlist.Media{
Version: 3,
Expand All @@ -250,17 +263,36 @@ func generateMediaPlaylistMPEGTS(

for _, s := range segments {
if seg, ok := s.(*muxerSegmentMPEGTS); ok {
u := seg.name
if rawQuery != "" {
u += "?" + rawQuery
}

pl.Segments = append(pl.Segments, &playlist.MediaSegment{
DateTime: &seg.startNTP,
Duration: seg.getDuration(),
URI: seg.name,
URI: u,
})
}
}

return pl.Marshal()
}

func filterOutHLSParams(rawQuery string) string {
if rawQuery != "" {
if q, err := url.ParseQuery(rawQuery); err == nil {
for k := range q {
if strings.HasPrefix(k, "_HLS_") {
delete(q, k)
}

Check warning on line 288 in muxer_server.go

View check run for this annotation

Codecov / codecov/patch

muxer_server.go#L287-L288

Added lines #L287 - L288 were not covered by tests
}
rawQuery = q.Encode()
}
}
return rawQuery
}

func generateMediaPlaylistFMP4(
isDeltaUpdate bool,
variant MuxerVariant,
Expand All @@ -269,9 +301,11 @@ func generateMediaPlaylistFMP4(
nextPartID uint64,
segmentDeleteCount int,
prefix string,
rawQuery string,
) ([]byte, error) {
targetDuration := targetDuration(segments)
skipBoundary := time.Duration(targetDuration) * 6 * time.Second
rawQuery = filterOutHLSParams(rawQuery)

pl := &playlist.Media{
Version: 10,
Expand All @@ -297,8 +331,13 @@ func generateMediaPlaylistFMP4(
skipped := 0

if !isDeltaUpdate {
u := prefix + "_init.mp4"
if rawQuery != "" {
u += "?" + rawQuery
}

pl.Map = &playlist.MediaMap{
URI: prefix + "_init.mp4",
URI: u,
}
} else {
var curDuration time.Duration
Expand All @@ -324,9 +363,14 @@ func generateMediaPlaylistFMP4(

switch seg := sog.(type) {
case *muxerSegmentFMP4:
u := seg.name
if rawQuery != "" {
u += "?" + rawQuery
}

plse := &playlist.MediaSegment{
Duration: seg.getDuration(),
URI: seg.name,
URI: u,
}

if (len(segments) - i) <= 2 {
Expand All @@ -335,9 +379,14 @@ func generateMediaPlaylistFMP4(

if variant == MuxerVariantLowLatency && (len(segments)-i) <= 2 {
for _, part := range seg.parts {
u = part.getName()
if rawQuery != "" {
u += "?" + rawQuery
}

plse.Parts = append(plse.Parts, &playlist.MediaPart{
Duration: part.finalDuration,
URI: part.getName(),
URI: u,
Independent: part.isIndependent,
})
}
Expand All @@ -356,17 +405,26 @@ func generateMediaPlaylistFMP4(

if variant == MuxerVariantLowLatency {
for _, part := range nextSegmentParts {
u := part.getName()
if rawQuery != "" {
u += "?" + rawQuery
}

Check warning on line 411 in muxer_server.go

View check run for this annotation

Codecov / codecov/patch

muxer_server.go#L408-L411

Added lines #L408 - L411 were not covered by tests

pl.Parts = append(pl.Parts, &playlist.MediaPart{
Duration: part.finalDuration,
URI: part.getName(),
URI: u,

Check warning on line 415 in muxer_server.go

View check run for this annotation

Codecov / codecov/patch

muxer_server.go#L415

Added line #L415 was not covered by tests
Independent: part.isIndependent,
})
}

// preload hint must always be present
// otherwise hls.js goes into a loop
u := partName(prefix, nextPartID)
if rawQuery != "" {
u += "?" + rawQuery
}
pl.PreloadHint = &playlist.MediaPreloadHint{
URI: partName(prefix, nextPartID),
URI: u,
}
}

Expand All @@ -381,11 +439,13 @@ func generateMediaPlaylist(
nextPartID uint64,
segmentDeleteCount int,
prefix string,
rawQuery string,
) ([]byte, error) {
if variant == MuxerVariantMPEGTS {
return generateMediaPlaylistMPEGTS(
segments,
segmentDeleteCount,
rawQuery,
)
}

Expand All @@ -397,6 +457,7 @@ func generateMediaPlaylist(
nextPartID,
segmentDeleteCount,
prefix,
rawQuery,
)
}

Expand All @@ -407,18 +468,17 @@ type muxerServer struct {
audioTrack *Track
prefix string

mutex sync.Mutex
cond *sync.Cond
closed bool
segments []muxerSegment
segmentsByName map[string]muxerSegment
segmentDeleteCount int
partsByName map[string]*muxerPart
nextSegmentID uint64
nextSegmentParts []*muxerPart
nextPartID uint64
multivariantPlaylist []byte
init []byte
mutex sync.Mutex
cond *sync.Cond
closed bool
segments []muxerSegment
segmentsByName map[string]muxerSegment
segmentDeleteCount int
partsByName map[string]*muxerPart
nextSegmentID uint64
nextSegmentParts []*muxerPart
nextPartID uint64
init []byte
}

func (s *muxerServer) initialize() {
Expand Down Expand Up @@ -486,14 +546,14 @@ func (s *muxerServer) handle(w http.ResponseWriter, r *http.Request) {

switch {
case name == "index.m3u8":
s.handleMultivariantPlaylist(w)
s.handleMultivariantPlaylist(w, r)

case name == "stream.m3u8":
q := r.URL.Query()
msn := queryVal(q, "_HLS_msn")
part := queryVal(q, "_HLS_part")
skip := queryVal(q, "_HLS_skip")
s.handleMediaPlaylist(msn, part, skip, w)
s.handleMediaPlaylist(msn, part, skip, w, r)

case s.variant != MuxerVariantMPEGTS && name == s.prefix+"_init.mp4":
s.handleInitFile(w)
Expand All @@ -504,7 +564,7 @@ func (s *muxerServer) handle(w http.ResponseWriter, r *http.Request) {
}
}

func (s *muxerServer) handleMultivariantPlaylist(w http.ResponseWriter) {
func (s *muxerServer) handleMultivariantPlaylist(w http.ResponseWriter, r *http.Request) {
buf := func() []byte {
s.mutex.Lock()
defer s.mutex.Unlock()
Expand All @@ -513,7 +573,12 @@ func (s *muxerServer) handleMultivariantPlaylist(w http.ResponseWriter) {
s.cond.Wait()
}

return s.multivariantPlaylist
buf, err := generateMultivariantPlaylist(s.variant, s.videoTrack, s.audioTrack, s.segments, r.URL.RawQuery)
if err != nil {
return nil
}

return buf
}()

if buf == nil {
Expand All @@ -523,13 +588,19 @@ func (s *muxerServer) handleMultivariantPlaylist(w http.ResponseWriter) {

// allow caching but use a small period in order to
// allow a stream to change tracks or bitrate
w.Header().Set("Cache-Control", "max-age=30")
w.Header().Set("Cache-Control", "max-age="+multivariantPlaylistMaxAge)
w.Header().Set("Content-Type", `application/vnd.apple.mpegurl`)
w.WriteHeader(http.StatusOK)
w.Write(buf)
}

func (s *muxerServer) handleMediaPlaylist(msn string, part string, skip string, w http.ResponseWriter) {
func (s *muxerServer) handleMediaPlaylist(
msn string,
part string,
skip string,
w http.ResponseWriter,
r *http.Request,
) {
isDeltaUpdate := false

if s.variant == MuxerVariantLowLatency {
Expand Down Expand Up @@ -578,6 +649,7 @@ func (s *muxerServer) handleMediaPlaylist(msn string, part string, skip string,
s.nextPartID,
s.segmentDeleteCount,
s.prefix,
r.URL.RawQuery,

Check warning on line 652 in muxer_server.go

View check run for this annotation

Codecov / codecov/patch

muxer_server.go#L652

Added line #L652 was not covered by tests
)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
Expand Down Expand Up @@ -626,6 +698,7 @@ func (s *muxerServer) handleMediaPlaylist(msn string, part string, skip string,
s.nextPartID,
s.segmentDeleteCount,
s.prefix,
r.URL.RawQuery,
)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
Expand Down Expand Up @@ -662,7 +735,7 @@ func (s *muxerServer) handleInitFile(w http.ResponseWriter) {

// allow caching but use a small period in order to
// allow a stream to change track parameters
w.Header().Set("Cache-Control", "max-age=30")
w.Header().Set("Cache-Control", "max-age="+initMaxAge)
w.Header().Set("Content-Type", "video/mp4")
w.WriteHeader(http.StatusOK)
w.Write(init)
Expand All @@ -686,7 +759,7 @@ func (s *muxerServer) handleSegmentOrPart(fname string, w http.ResponseWriter) {
}
defer r.Close()

w.Header().Set("Cache-Control", "max-age=3600")
w.Header().Set("Cache-Control", "max-age="+segmentMaxAge)
w.Header().Set(
"Content-Type",
func() string {
Expand Down Expand Up @@ -728,7 +801,7 @@ func (s *muxerServer) handleSegmentOrPart(fname string, w http.ResponseWriter) {
}
defer r.Close()

w.Header().Set("Cache-Control", "max-age=3600")
w.Header().Set("Cache-Control", "max-age="+segmentMaxAge)
w.Header().Set("Content-Type", "video/mp4")
w.WriteHeader(http.StatusOK)
io.Copy(w, r)
Expand Down Expand Up @@ -791,13 +864,6 @@ func (s *muxerServer) publishSegmentInner(segment muxerSegment) error {
s.segmentDeleteCount++
}

// always regenerate multivariant playlist since it contains bandwidth
buf, err := generateMultivariantPlaylist(s.variant, s.videoTrack, s.audioTrack, s.segments)
if err != nil {
return err
}
s.multivariantPlaylist = buf

// regenerate init.mp4 only if missing or codec parameters have changed
if s.variant != MuxerVariantMPEGTS && (s.init == nil || segment.isForceSwitched()) {
byts, err := generateInitFile(s.videoTrack, s.audioTrack)
Expand Down
Loading

0 comments on commit 46ea5bc

Please sign in to comment.