Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add missing Prometheus exports (#2620, #2619): paths_bytes_sent, srt_conns, srt_conns_bytes_received, srt_conns_bytes_sent #2629

Merged
merged 3 commits into from
Nov 8, 2023

Conversation

rse
Copy link
Contributor

@rse rse commented Nov 2, 2023

The following tries to resolve issues #2620 and #2619 by adding the following, missing Prometheus exports: paths_bytes_sent, srt_conns, srt_conns_bytes_received, srt_conns_bytes_sent

Fixes #2620
Fixes #2619

@rse rse changed the title dd missing Prometheus exports (#2620, #2619): paths_bytes_sent, srt_conns, srt_conns_bytes_received, srt_conns_bytes_sent add missing Prometheus exports (#2620, #2619): paths_bytes_sent, srt_conns, srt_conns_bytes_received, srt_conns_bytes_sent Nov 2, 2023
@rse rse force-pushed the prometheus-exporter branch 4 times, most recently from 5ad2d59 to dc0f0f3 Compare November 2, 2023 17:18
Copy link

codecov bot commented Nov 2, 2023

Codecov Report

Merging #2629 (6568891) into main (621a10a) will increase coverage by 0.17%.
Report is 2 commits behind head on main.
The diff coverage is 100.00%.

@@            Coverage Diff             @@
##             main    #2629      +/-   ##
==========================================
+ Coverage   59.31%   59.49%   +0.17%     
==========================================
  Files         144      144              
  Lines       15238    15267      +29     
==========================================
+ Hits         9039     9083      +44     
+ Misses       5559     5545      -14     
+ Partials      640      639       -1     
Files Coverage Δ
internal/core/core.go 85.73% <100.00%> (+0.02%) ⬆️
internal/core/metrics.go 96.93% <100.00%> (+4.28%) ⬆️
internal/core/path.go 63.82% <100.00%> (+0.27%) ⬆️
internal/core/srt_server.go 90.05% <100.00%> (+0.78%) ⬆️

... and 4 files with indirect coverage changes

📣 Codecov offers a browser extension for seamless coverage viewing on GitHub. Try it in Chrome or Firefox today!

@rse
Copy link
Contributor Author

rse commented Nov 2, 2023

I've now also ensured that the commits were squashed and that your tests were successful again, too.

@rse
Copy link
Contributor Author

rse commented Nov 2, 2023

These changes finally allow one to have a more complete MediaMTX dashboard in Grafana:

Screenshot 2023-11-02 at 23 20 27

@aler9
Copy link
Member

aler9 commented Nov 3, 2023

Hello, this is a really useful feature and will surely get merged. paths_bytes_sent needs a little adjustement to correctly include bytes sent to all RTSP sessions. If you take a look at your code, here:

if s.rtspStream != nil {
for _, pkt := range u.GetRTPPackets() {
atomic.AddUint64(s.bytesSent, unitSize(u))
s.rtspStream.WritePacketRTPWithNTP(medi, pkt, u.GetNTP()) //nolint:errcheck
}
}

You can notice that s.bytesSent is increased regardless of the number of connected RTSP sessions. RTSP sessions are handled differently from other protocols, because in RTSP one of the underlying transports is multicast, and in multicast, packets are sent once to a multicast address, not to single clients. Therefore, the server calls gortsplib's stream.WritePacketRTPWithNTP, that sends packets to the multicast address and to clients that don't use multicast (unicast clients):

https://github.com/bluenviron/gortsplib/blob/35bf96c5ecf652d801c9dddeb74056e33ce0c284/server_stream_format.go#L39-L62

Therefore, you need to send a PR to gortsplib that adds bytesSent to ServerStream and edits stream.WritePackets in this way:

func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error {
	sf.rtcpSender.ProcessPacket(pkt, ntp, sf.format.PTSEqualsDTS(pkt))

	// send unicast
	for r := range sf.sm.st.activeUnicastReaders {
		sm, ok := r.setuppedMedias[sf.sm.media]
		if ok {
			err := sm.writePacketRTP(byts)
			if err != nil {
				r.onStreamWriteError(err)
			} else {
				s.sentBytes += len(byts)
			}
		}
	}

	// send multicast
	if sf.sm.multicastWriter != nil {
		err := sf.sm.multicastWriter.writePacketRTP(byts)
		if err != nil {
			return err
		}
		s.sentBytes += len(byts)
	}

	return nil
}

After you do this, you can finally obtain the right bytesSent, that is

// BytesSent returns sent bytes.
func (s *Stream) BytesSent() uint64 {
	return atomic.LoadUint64(s.bytesSent) + s.rtspStream.BytesSent() + s.rtspsStream.BytesSent()
}

@rse
Copy link
Contributor Author

rse commented Nov 3, 2023

I understand your point that your gortsplib should help out. I've checked its sources. Although I still do not fully understand its internals, I'm not sure whether the following change would be correct:

func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error {
    sf.rtcpSender.ProcessPacket(pkt, ntp, sf.format.PTSEqualsDTS(pkt))

    // send unicast
    for r := range sf.sm.st.activeUnicastReaders {
        sm, ok := r.setuppedMedias[sf.sm.media]
        if ok {
            err := sm.writePacketRTP(byts)
            if err != nil {
                r.onStreamWriteError(err)
            } else {
                sm.st.sentBytes += len(byts)
            }
        }
    }

    // send multicast
    if sf.sm.multicastWriter != nil {
        err := sf.sm.multicastWriter.writePacketRTP(byts)
        if err != nil {
            return err
        }
        sf.sm.st.sentBytes += len(byts)
    }

    return nil
}

Because it looks like the err := sm.writePacketRTP(byts) in the underlying code already indirectly updates the bytesSent in ServerStream. OTOH, inside the multicast writer code paths I do not see such updates as there is no longer any reference to ServerSession. So, can it be that the correct change would be just this:

func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error {
    sf.rtcpSender.ProcessPacket(pkt, ntp, sf.format.PTSEqualsDTS(pkt))

    // send unicast
    for r := range sf.sm.st.activeUnicastReaders {
        sm, ok := r.setuppedMedias[sf.sm.media]
        if ok { 
            err := sm.writePacketRTP(byts)
            if err != nil {
                r.onStreamWriteError(err)
            }
        }
    }

    // send multicast
    if sf.sm.multicastWriter != nil {
        err := sf.sm.multicastWriter.writePacketRTP(byts)
        if err != nil {
            return err
        }
        sf.sm.st.sentBytes += len(byts)
    }

    return nil
}

Also, a very similar code exists for RTCP, so I guess we also would need:

func (sm *serverStreamMedia) writePacketRTCP(byts []byte) error {
    // send unicast
    for r := range sm.st.activeUnicastReaders {
        sm, ok := r.setuppedMedias[sm.media]
        if ok { 
            err := sm.writePacketRTCP(byts)
            if err != nil {
                r.onStreamWriteError(err)
            }
        }
    }

    // send multicast
    if sm.multicastWriter != nil {
        err := sm.multicastWriter.writePacketRTCP(byts)
        if err != nil {
            return err
        }
        sm.st.sentBytes += len(byts)
    }

    return nil
}

Can you confirm the following resulting changes?

diff --git a/server_stream_format.go b/server_stream_format.go
index 6afa385..b8ce4e8 100644
--- a/server_stream_format.go
+++ b/server_stream_format.go
@@ -56,6 +56,7 @@ func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp t
        if err != nil {
            return err
        }
+       sf.sm.st.sentBytes += len(byts)
    }

    return nil
diff --git a/server_stream_media.go b/server_stream_media.go
index 5368d9c..2b3d207 100644
--- a/server_stream_media.go
+++ b/server_stream_media.go
@@ -59,6 +59,7 @@ func (sm *serverStreamMedia) writePacketRTCP(byts []byte) error {
        if err != nil {
            return err
        }
+       sm.st.sentBytes += len(byts)
    }

    return nil

The problem just remains that the above "sf.sm.st" and "sm.st" references are still incorrect, as the "sentBytes" is on "ServerSession" and not "ServerStream". I'm still trying to find out how to get to the "ServerStream" instead...

@aler9
Copy link
Member

aler9 commented Nov 3, 2023

you're confusing ServerSession with ServerStream: the former represents a client session, while the latter represents a video stream. Actually ServerStream doesn't have any sentBytes or receivedBytes field, while ServerSession has. If you want to obtain the overall number of sent bytes per path, you need the overall number of sent bytes per stream, thus you have to add sentBytes to ServerStream and fill it accordingly.

@rse
Copy link
Contributor Author

rse commented Nov 3, 2023

OK, next attempt: how about the following adjustment to your gortsplib?

diff --git a/server_stream.go b/server_stream.go
index 107f0ba..75edc87 100644
--- a/server_stream.go
+++ b/server_stream.go
@@ -2,6 +2,7 @@ package gortsplib

 import (
    "sync"
+   "sync/atomic"
    "time"

    "github.com/pion/rtcp"
@@ -37,6 +38,7 @@ type ServerStream struct {
    activeUnicastReaders map[*ServerSession]struct{}
    streamMedias         map[*description.Media]*serverStreamMedia
    closed               bool
+   bytesSent            *uint64
 }

 // NewServerStream allocates a ServerStream.
@@ -46,6 +48,7 @@ func NewServerStream(s *Server, desc *description.Session) *ServerStream {
        desc:                 desc,
        readers:              make(map[*ServerSession]struct{}),
        activeUnicastReaders: make(map[*ServerSession]struct{}),
+       bytesSent:            new(uint64),
    }

    st.streamMedias = make(map[*description.Media]*serverStreamMedia, len(desc.Medias))
@@ -71,6 +74,11 @@ func (st *ServerStream) Close() {
    }
 }

+// BytesSent returns the number of written bytes.
+func (ss *ServerStream) BytesSent() uint64 {
+   return atomic.LoadUint64(ss.bytesSent)
+}
+
 // Description returns the description of the stream.
 func (st *ServerStream) Description() *description.Session {
    return st.desc
diff --git a/server_stream_format.go b/server_stream_format.go
index 6afa385..609f992 100644
--- a/server_stream_format.go
+++ b/server_stream_format.go
@@ -2,6 +2,7 @@ package gortsplib

 import (
    "time"
+   "sync/atomic"

    "github.com/pion/rtcp"
    "github.com/pion/rtp"
@@ -46,6 +47,8 @@ func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp t
            err := sm.writePacketRTP(byts)
            if err != nil {
                r.onStreamWriteError(err)
+           } else {
+               atomic.AddUint64(sf.sm.st.bytesSent, uint64(len(byts)))
            }
        }
    }
@@ -56,6 +59,7 @@ func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp t
        if err != nil {
            return err
        }
+       atomic.AddUint64(sf.sm.st.bytesSent, uint64(len(byts)))
    }

    return nil

Followed by an adjustment to my own PR to MediaMTX:

diff --git a/internal/stream/stream.go b/internal/stream/stream.go
index 5cdc8e7..82500d7 100644
--- a/internal/stream/stream.go
+++ b/internal/stream/stream.go
@@ -79,7 +79,7 @@ func (s *Stream) BytesReceived() uint64 {
 
 // BytesSent returns sent bytes.
 func (s *Stream) BytesSent() uint64 {
-       return atomic.LoadUint64(s.bytesSent)
+       return atomic.LoadUint64(s.bytesSent) + s.rtspStream.BytesSent() + s.rtspsStream.BytesSent()
 }
 
 // RTSPStream returns the RTSP stream.
diff --git a/internal/stream/stream_format.go b/internal/stream/stream_format.go
index 1a64de1..68dca29 100644
--- a/internal/stream/stream_format.go
+++ b/internal/stream/stream_format.go
@@ -89,14 +89,12 @@ func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u uni
 
        if s.rtspStream != nil {
                for _, pkt := range u.GetRTPPackets() {
-                       atomic.AddUint64(s.bytesSent, unitSize(u))
                        s.rtspStream.WritePacketRTPWithNTP(medi, pkt, u.GetNTP()) //nolint:errcheck
                }
        }
 
        if s.rtspsStream != nil {
                for _, pkt := range u.GetRTPPackets() {
-                       atomic.AddUint64(s.bytesSent, unitSize(u))
                        s.rtspsStream.WritePacketRTPWithNTP(medi, pkt, u.GetNTP()) //nolint:errcheck
                }
        }

@aler9
Copy link
Member

aler9 commented Nov 3, 2023

@rse this may work, open a PR inside gortsplib in order to check whether tests are successful.

@rse
Copy link
Contributor Author

rse commented Nov 3, 2023

PR filed for gortsplib... Let's see...

@rse rse force-pushed the prometheus-exporter branch 2 times, most recently from 1462b21 to 04aa21d Compare November 4, 2023 22:23
@rse
Copy link
Contributor Author

rse commented Nov 5, 2023

Hmmm.. the new code in gortsplib fails when used from within MediaMTX:

2023/11/04 22:25:45 INF [RTSP] [session 2ef80b74] is publishing to path 'mypath', 2 tracks (H264, MPEG-4 Audio)
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0xff6a8a]

goroutine 471 [running]:
github.com/bluenviron/gortsplib/v4.(*ServerStream).BytesSent(...)
	/go/pkg/mod/github.com/bluenviron/gortsplib/v4@v4.4.0/server_stream.go:79
github.com/bluenviron/mediamtx/internal/stream.(*Stream).BytesSent(0xc00063cff0)
	/s/internal/stream/stream.go:82 +0x6a

The references "uint64" is properly initialized in gortsplib AFAIK. Here someone who better knowns Go has to shed some light on me, please. What additional initialization would be needed?

paths_bytes_sent, srt_conns, srt_conns_bytes_received, srt_conns_bytes_sent
@rse
Copy link
Contributor Author

rse commented Nov 5, 2023

Ok, I guess I've found the issue. It is one level above gortsplib: directly inside MediaMTX it can be that there are not RTSP channels, of course.

@rse
Copy link
Contributor Author

rse commented Nov 5, 2023

The changes were now adjusted to use the new gortsplib functionality and finally pass all tests. Please review it now once again. Thanks for your support.

@aler9
Copy link
Member

aler9 commented Nov 7, 2023

Before merging, i need to add a mutex around BytesSent() and tests for everything. Please edit the PR and check the box "allow changes from maintainers".

@rse
Copy link
Contributor Author

rse commented Nov 8, 2023

Changes are now allowed.

@aler9 aler9 merged commit 4bf0d10 into bluenviron:main Nov 8, 2023
8 checks passed
@aler9
Copy link
Member

aler9 commented Nov 8, 2023

merged. Thanks again for this relevant contribution.

@rse rse deleted the prometheus-exporter branch November 9, 2023 22:57
Copy link
Contributor

This issue is mentioned in release v1.3.0 🚀
Check out the entire changelog by clicking here

@github-actions github-actions bot locked and limited conversation to collaborators May 17, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
2 participants