Skip to content

Commit

Permalink
Merge remote-tracking branch 'skrashevich/patch-230328'
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexxIT committed Apr 14, 2023
2 parents 5c657d5 + a6260d0 commit d6259fc
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 26 deletions.
2 changes: 1 addition & 1 deletion cmd/ffmpeg/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var defaults = map[string]string{
"rtsp/udp": "-fflags nobuffer -flags low_delay -timeout 5000000 -user_agent go2rtc/ffmpeg -i {input}",

// output
"output": "-user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}",
"output": "-user_agent ffmpeg/go2rtc -rtsp_transport tcp -bufsize 8192k -f rtsp {output}",

// `-preset superfast` - we can't use ultrafast because it doesn't support `-profile main -level 4.1`
// `-tune zerolatency` - for minimal latency
Expand Down
14 changes: 10 additions & 4 deletions cmd/mp4/mp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
return
}

exit := make(chan []byte)
exit := make(chan []byte, 1)

cons := &mp4.Segment{OnlyKeyframe: true}
cons.Listen(func(msg any) {
if data, ok := msg.([]byte); ok && exit != nil {
exit <- data
select {
case exit <- data:
default:
}
exit = nil
}
})
Expand Down Expand Up @@ -105,7 +108,7 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
return
}

exit := make(chan error)
exit := make(chan error, 1) // Add buffer to prevent blocking

cons := &mp4.Consumer{
RemoteAddr: tcp.RemoteAddr(r),
Expand All @@ -119,7 +122,10 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()
if _, err := w.Write(data); err != nil && exit != nil {
exit <- err
select {
case exit <- err:
default:
}
exit = nil
}
}
Expand Down
5 changes: 4 additions & 1 deletion cmd/rtsp/rtsp.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rtsp

import (
"io"
"net"
"net/url"
"strings"
Expand Down Expand Up @@ -213,7 +214,9 @@ func tcpHandler(conn *rtsp.Conn) {
})

if err := conn.Accept(); err != nil {
log.Warn().Err(err).Caller().Send()
if err != io.EOF {
log.Warn().Err(err).Caller().Send()
}
if closer != nil {
closer()
}
Expand Down
35 changes: 15 additions & 20 deletions pkg/core/track.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Receiver struct {
ID byte // Channel for RTSP, PayloadType for MPEG-TS

senders map[*Sender]chan *rtp.Packet
mu sync.Mutex
mu sync.RWMutex
bytes int
}

Expand All @@ -32,21 +32,21 @@ func (t *Receiver) WriteRTP(packet *rtp.Packet) {
t.mu.Lock()
t.bytes += len(packet.Payload)
for sender, buffer := range t.senders {
if len(buffer) < cap(buffer) {
buffer <- packet
} else {
select {
case buffer <- packet:
default:
sender.overflow++
}
}
t.mu.Unlock()
}

func (t *Receiver) Senders() (senders []*Sender) {
t.mu.Lock()
t.mu.RLock()
for sender := range t.senders {
senders = append(senders, sender)
}
t.mu.Unlock()
t.mu.RUnlock()
return
}

Expand All @@ -73,12 +73,9 @@ func (t *Receiver) Replace(target *Receiver) {

func (t *Receiver) String() string {
s := t.Codec.String() + ", bytes=" + strconv.Itoa(t.bytes)
if t.mu.TryLock() {
s += fmt.Sprintf(", senders=%d", len(t.senders))
t.mu.Unlock()
} else {
s += fmt.Sprintf(", senders=?")
}
t.mu.RLock()
s += fmt.Sprintf(", senders=%d", len(t.senders))
t.mu.RUnlock()
return s
}

Expand All @@ -93,7 +90,7 @@ type Sender struct {
Handler HandlerFunc

receivers []*Receiver
mu sync.Mutex
mu sync.RWMutex
bytes int

overflow int
Expand Down Expand Up @@ -127,15 +124,16 @@ func (s *Sender) HandleRTP(track *Receiver) {
}
track.senders[s] = buffer
track.mu.Unlock()

s.mu.Lock()
s.receivers = append(s.receivers, track)
s.mu.Unlock()

go func() {
// read packets from buffer channel until it will be closed
for packet := range buffer {
s.mu.Lock()
s.bytes += len(packet.Payload)
s.mu.Unlock()
s.Handler(packet)
}

Expand Down Expand Up @@ -171,12 +169,9 @@ func (s *Sender) Close() {

func (s *Sender) String() string {
info := s.Codec.String() + ", bytes=" + strconv.Itoa(s.bytes)
if s.mu.TryLock() {
info += ", receivers=" + strconv.Itoa(len(s.receivers))
s.mu.Unlock()
} else {
info += ", receivers=?"
}
s.mu.RLock()
info += ", receivers=" + strconv.Itoa(len(s.receivers))
s.mu.RUnlock()
if s.overflow > 0 {
info += ", overflow=" + strconv.Itoa(s.overflow)
}
Expand Down

0 comments on commit d6259fc

Please sign in to comment.