Skip to content

Commit

Permalink
metrics: add paths_bytes_sent, srt_conns, srt_conns_bytes_received, s…
Browse files Browse the repository at this point in the history
…rt_conns_bytes_sent (#2620) (#2619) (#2629)

* add missing Prometheus exports (#2620, #2619):
paths_bytes_sent, srt_conns, srt_conns_bytes_received, srt_conns_bytes_sent

* protect Stream.BytesSent()

* add tests

---------

Co-authored-by: aler9 <46489434+aler9@users.noreply.github.com>
  • Loading branch information
rse and aler9 committed Nov 8, 2023
1 parent 2a6060d commit 4bf0d10
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 44 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1450,6 +1450,7 @@ Obtaining:
# metrics of every path
paths{name="[path_name]",state="[state]"} 1
paths_bytes_received{name="[path_name]",state="[state]"} 1234
paths_bytes_sent{name="[path_name]",state="[state]"} 1234

# metrics of every HLS muxer
hls_muxers{name="[name]"} 1
Expand Down Expand Up @@ -1480,6 +1481,11 @@ rtmp_conns{id="[id]",state="[state]"} 1
rtmp_conns_bytes_received{id="[id]",state="[state]"} 1234
rtmp_conns_bytes_sent{id="[id]",state="[state]"} 187

# metrics of every SRT connection
srt_conns{id="[id]",state="[state]"} 1
srt_conns_bytes_received{id="[id]",state="[state]"} 1234
srt_conns_bytes_sent{id="[id]",state="[state]"} 187

# metrics of every WebRTC session
webrtc_sessions{id="[id]"} 1
webrtc_sessions_bytes_received{id="[id]",state="[state]"} 1234
Expand Down
3 changes: 3 additions & 0 deletions apidocs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,9 @@ components:
bytesReceived:
type: integer
format: int64
bytesSent:
type: integer
format: int64
readers:
type: array
items:
Expand Down
2 changes: 2 additions & 0 deletions internal/core/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ func TestAPIPathsList(t *testing.T) {
Ready bool `json:"ready"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}

type pathList struct {
Expand Down Expand Up @@ -625,6 +626,7 @@ func TestAPIPathsGet(t *testing.T) {
Ready bool `json:"Ready"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}

var pathName string
Expand Down
1 change: 1 addition & 0 deletions internal/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ func (p *Core) createResources(initial bool) error {
p.conf.RunOnConnectRestart,
p.conf.RunOnDisconnect,
p.externalCmdPool,
p.metrics,
p.pathManager,
p,
)
Expand Down
25 changes: 25 additions & 0 deletions internal/core/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type metrics struct {
rtspServer apiRTSPServer
rtspsServer apiRTSPServer
rtmpServer apiRTMPServer
srtServer apiSRTServer
hlsManager apiHLSManager
webRTCManager apiWebRTCManager
}
Expand Down Expand Up @@ -96,6 +97,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
tags := "{name=\"" + i.Name + "\",state=\"" + state + "\"}"
out += metric("paths", tags, 1)
out += metric("paths_bytes_received", tags, int64(i.BytesReceived))
out += metric("paths_bytes_sent", tags, int64(i.BytesSent))
}
} else {
out += metric("paths", "", 0)
Expand Down Expand Up @@ -199,6 +201,22 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
}
}

if !interfaceIsEmpty(m.srtServer) {
data, err := m.srtServer.apiConnsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
tags := "{id=\"" + i.ID.String() + "\",state=\"" + string(i.State) + "\"}"
out += metric("srt_conns", tags, 1)
out += metric("srt_conns_bytes_received", tags, int64(i.BytesReceived))
out += metric("srt_conns_bytes_sent", tags, int64(i.BytesSent))
}
} else {
out += metric("srt_conns", "", 0)
out += metric("srt_conns_bytes_received", "", 0)
out += metric("srt_conns_bytes_sent", "", 0)
}
}

if !interfaceIsEmpty(m.webRTCManager) {
data, err := m.webRTCManager.apiSessionsList()
if err == nil && len(data.Items) != 0 {
Expand Down Expand Up @@ -254,6 +272,13 @@ func (m *metrics) rtmpServerSet(s apiRTMPServer) {
m.rtmpServer = s
}

// srtServerSet is called by srtServer.
func (m *metrics) srtServerSet(s apiSRTServer) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.srtServer = s
}

// webRTCManagerSet is called by webRTCManager.
func (m *metrics) webRTCManagerSet(s apiWebRTCManager) {
m.mutex.Lock()
Expand Down
197 changes: 154 additions & 43 deletions internal/core/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
package core

import (
"bufio"
"context"
"crypto/tls"
"net"
"net/http"
"net/url"
"os"
"sync"
"testing"
"time"

"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
"github.com/datarhei/gosrt"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"

"github.com/bluenviron/mediamtx/internal/protocols/rtmp"
"github.com/bluenviron/mediamtx/internal/protocols/webrtc"
)

func TestMetrics(t *testing.T) {
Expand Down Expand Up @@ -60,60 +67,158 @@ rtsps_sessions_bytes_sent 0
rtmp_conns 0
rtmp_conns_bytes_received 0
rtmp_conns_bytes_sent 0
srt_conns 0
srt_conns_bytes_received 0
srt_conns_bytes_sent 0
webrtc_sessions 0
webrtc_sessions_bytes_received 0
webrtc_sessions_bytes_sent 0
`, string(bo))

medi := testMediaH264

source := gortsplib.Client{}
err = source.StartRecording("rtsp://localhost:8554/rtsp_path",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source.Close()

source2 := gortsplib.Client{TLSConfig: &tls.Config{InsecureSkipVerify: true}}
err = source2.StartRecording("rtsps://localhost:8322/rtsps_path",
&description.Session{Medias: []*description.Media{medi}})
require.NoError(t, err)
defer source2.Close()

u, err := url.Parse("rtmp://localhost:1935/rtmp_path")
require.NoError(t, err)

nconn, err := net.Dial("tcp", u.Host)
require.NoError(t, err)
defer nconn.Close()

conn, err := rtmp.NewClientConn(nconn, u, true)
require.NoError(t, err)

videoTrack := &format.H264{
PayloadTyp: 96,
SPS: []byte{ // 1920x1080 baseline
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},
PPS: []byte{0x08, 0x06, 0x07, 0x08},
PacketizationMode: 1,
}

_, err = rtmp.NewWriter(conn, videoTrack, nil)
require.NoError(t, err)
terminate := make(chan struct{})
var wg sync.WaitGroup
wg.Add(5)

go func() {
defer wg.Done()
source := gortsplib.Client{}
err := source.StartRecording("rtsp://localhost:8554/rtsp_path",
&description.Session{Medias: []*description.Media{{
Type: description.MediaTypeVideo,
Formats: []format.Format{testFormatH264},
}}})
require.NoError(t, err)
defer source.Close()
<-terminate
}()

go func() {
defer wg.Done()
source2 := gortsplib.Client{TLSConfig: &tls.Config{InsecureSkipVerify: true}}
err := source2.StartRecording("rtsps://localhost:8322/rtsps_path",
&description.Session{Medias: []*description.Media{{
Type: description.MediaTypeVideo,
Formats: []format.Format{testFormatH264},
}}})
require.NoError(t, err)
defer source2.Close()
<-terminate
}()

go func() {
defer wg.Done()
u, err := url.Parse("rtmp://localhost:1935/rtmp_path")
require.NoError(t, err)

nconn, err := net.Dial("tcp", u.Host)
require.NoError(t, err)
defer nconn.Close()

conn, err := rtmp.NewClientConn(nconn, u, true)
require.NoError(t, err)

_, err = rtmp.NewWriter(conn, testFormatH264, nil)
require.NoError(t, err)
<-terminate
}()

go func() {
defer wg.Done()

su, err := url.Parse("http://localhost:8889/webrtc_path/whip")
require.NoError(t, err)

s := &webrtc.WHIPClient{
HTTPClient: &http.Client{Transport: &http.Transport{}},
URL: su,
}

tracks, err := s.Publish(context.Background(), testMediaH264.Formats[0], nil)
require.NoError(t, err)
defer checkClose(t, s.Close)

err = tracks[0].WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{1},
})
require.NoError(t, err)
<-terminate
}()

go func() {
defer wg.Done()

srtConf := srt.DefaultConfig()
address, err := srtConf.UnmarshalURL("srt://localhost:8890?streamid=publish:srt_path")
require.NoError(t, err)

err = srtConf.Validate()
require.NoError(t, err)

publisher, err := srt.Dial("srt", address, srtConf)
require.NoError(t, err)
defer publisher.Close()

track := &mpegts.Track{
Codec: &mpegts.CodecH264{},
}

bw := bufio.NewWriter(publisher)
w := mpegts.NewWriter(bw, []*mpegts.Track{track})
require.NoError(t, err)

err = w.WriteH26x(track, 0, 0, true, [][]byte{
{ // SPS
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9,
0x20,
},
{ // PPS
0x08, 0x06, 0x07, 0x08,
},
{ // IDR
0x05, 1,
},
})
require.NoError(t, err)

err = bw.Flush()
require.NoError(t, err)
<-terminate
}()

time.Sleep(500 * time.Millisecond)

bo = httpPullFile(t, hc, "http://localhost:9998/metrics")

require.Regexp(t,
`^paths\{name=".*?",state="ready"\} 1`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+
`paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+
`paths\{name=".*?",state="ready"\} 1`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+
`paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+
`paths\{name=".*?",state="ready"\} 1`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+
`paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+
`paths\{name=".*?",state="ready"\} 1`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+
`paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+
`paths\{name=".*?",state="ready"\} 1`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} [0-9]+`+"\n"+
`paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+
`hls_muxers\{name=".*?"\} 1`+"\n"+
`hls_muxers_bytes_sent\{name=".*?"\} [0-9]+`+"\n"+
`hls_muxers\{name=".*?"\} 1`+"\n"+
`hls_muxers_bytes_sent\{name=".*?"\} [0-9]+`+"\n"+
`hls_muxers\{name=".*?"\} 1`+"\n"+
`hls_muxers_bytes_sent\{name=".*?"\} [0-9]+`+"\n"+
`hls_muxers\{name=".*?"\} 1`+"\n"+
Expand All @@ -135,9 +240,15 @@ webrtc_sessions_bytes_sent 0
`rtmp_conns\{id=".*?",state="publish"\} 1`+"\n"+
`rtmp_conns_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`rtmp_conns_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`webrtc_sessions 0`+"\n"+
`webrtc_sessions_bytes_received 0`+"\n"+
`webrtc_sessions_bytes_sent 0`+"\n"+
`srt_conns\{id=".*?",state="publish"\} 1`+"\n"+
`srt_conns_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns_bytes_sent\{id=".*?",state="publish"\} 0`+"\n"+
`webrtc_sessions\{id=".*?"\} 1`+"\n"+
`webrtc_sessions_bytes_received\{id=".*?"\} [0-9]+`+"\n"+
`webrtc_sessions_bytes_sent\{id=".*?"\} [0-9]+`+"\n"+
"$",
string(bo))

close(terminate)
wg.Wait()
}
6 changes: 6 additions & 0 deletions internal/core/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,12 @@ func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) {
}
return pa.stream.BytesReceived()
}(),
BytesSent: func() uint64 {
if pa.stream == nil {
return 0
}
return pa.stream.BytesSent()
}(),
Readers: func() []defs.APIPathSourceOrReader {
ret := []defs.APIPathSourceOrReader{}
for r := range pa.readers {
Expand Down
Loading

0 comments on commit 4bf0d10

Please sign in to comment.