Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use AioRTC with just Async / Loop / Threading calls. #792

Open
R0NAM1 opened this issue Nov 2, 2022 · 22 comments
Open

How to use AioRTC with just Async / Loop / Threading calls. #792

R0NAM1 opened this issue Nov 2, 2022 · 22 comments

Comments

@R0NAM1
Copy link

R0NAM1 commented Nov 2, 2022

Before filing an issue please verify the following:

  • Check whether there is already an existing issue for the same topic.
    (Not from what I could find)
  • Ensure you are actually reporting an issue related to aiortc. The goal
    of the issue tracker is not to provide general guidance about WebRTC or free
    debugging of your code.
    (Yes, it's explicitly related on how to call AioRTC properly when not using AioHTTP,)
  • Clearly state whether the issue you are reporting can be reproduced with one
    of the examples provided with aiortc without any changes.
    (I was able to take the SERVER example and convert it to Flask, the only difference seems to be how it's called.)
  • Be considerate to the maintainers. aiortc is provided on a best-effort,
    there is no guarantee your issue will be addressed.

I've been attempting to use AioRTC with Flask to stream video to HTML5 Clients, so far everything works correctly EXCEPT the server refuses to accept the User Bind Request from the client (Verified via Wireshark). I believe the issue is with how I'm executing some of the async code, it seems examples using AioHTTP can do this no problem, it takes care of it for them, but for Flask (Which is what I'm using), you need to make the async calls manually.

In the logs, running the loop until completion the final debug messages for ICE are:
REQUEST
RESPONSE

But if after I put loop.run_forever, then I get the following:
REQUEST
RESPONSE
WAITING
IN_PROGRESS
SUCCEEDED

The issue with running it forever is that I don't get my SDP return for the client, so even though the server can now actually get ICE candidates, now the client can't.

How can this be addressed?

async def webRtcStart():
    loop = asyncio.get_event_loop()
    
    global rtspCredString

    player = MediaPlayer(rtspCredString)    
    params = ast.literal_eval((request.data).decode("UTF-8"))
    
    # print(params)

    offer = RTCSessionDescription(sdp=params.get("sdp"), type=params.get("type"))
    webRtcPeer = RTCPeerConnection(configuration=RTCConfiguration(
    iceServers=[RTCIceServer(
        urls=['stun:stun.internal.my.domain'])]))

    logger = logging.getLogger("webRtcPeer")

    logging.basicConfig(level=logging.DEBUG)

    



    if (player.video):
        webRtcPeer.addTrack(player.video)

    await webRtcPeer.setRemoteDescription(offer)

    print(offer)


    answer = await webRtcPeer.createAnswer()

    await webRtcPeer.setLocalDescription(answer)

    final = ('''
        {0}
    ''').format(json.dumps(
            {"sdp": (webRtcPeer.localDescription.sdp), "type": webRtcPeer.localDescription.type}
        ))


    return final

@app.route('/rtcoffer', methods=['GET', 'POST'])
@login_required
def webRTCOFFER():
    try:
        loop = asyncio.get_event_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

        
    # remaining_work_not_depends_on_webRtcStart()
    t = loop.run_until_complete(webRtcStart())
    # loop.run_forever()
    
    print(t)
    return t.encode()

@R0NAM1
Copy link
Author

R0NAM1 commented Nov 2, 2022

I've been looking everywhere as well for any code examples of people using AioRTC with Flask, haven't been able to.

@R0NAM1
Copy link
Author

R0NAM1 commented Nov 3, 2022

I was able to solve this by setting up threading as such:
t = loop.run_until_complete(webRtcStart())
Thread(target=loop.run_forever).start()

@R0NAM1 R0NAM1 closed this as completed Nov 3, 2022
@R0NAM1
Copy link
Author

R0NAM1 commented Nov 3, 2022

@app.route('/rtcoffer', methods=['GET', 'POST'])
@login_required
def webRTCOFFER():

    # Get Event Loop If It Exists, Create It If Not.
    try:
        loop = asyncio.get_event_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        
    # Run an event into that loop until it's complete and returns a value
    t = loop.run_until_complete(webRtcStart())
    
    # Continue running that loop forever to keep AioRTC Objects In Memory Executing, while shifting it to
    # Another thread so we don't block the code.
    Thread(target=loop.run_forever).start()
    
    # Return Our Parsed SDP
    return t.encode()

@namoshizun
Copy link

namoshizun commented Mar 20, 2024

@R0NAM1 could you reopen this issue?

In the Flask sync mode, manually create and manage an event loop works. Now Flask also supports async views but I still can't make it work with aiortc properly...

Following is my offer view adjusted from the aiohttp example:

async def _establish_peer_connection(sdp: str, type: str):
    # global rtc_relay, webcam
    global rtc_relay
    offer = RTCSessionDescription(sdp=sdp, type=type)

    pc = RTCPeerConnection()
    conn_id = rt_connection_pool.add(pc)

    with logger.contextualize(connection_id=conn_id):
        track = rtc_relay.subscribe(CameraVideoTrack())
        # track = rtc_relay.subscribe(webcam.video)
        pc.addTrack(track)
        logger.info("Added relay track")

        await pc.setRemoteDescription(offer)
        answer = await pc.createAnswer()
        await pc.setLocalDescription(answer)
        logger.info("Set local and remote descriptions")

        return {
            "sdp": pc.localDescription.sdp,
            "type": pc.localDescription.type,
            "connection_id": conn_id,
        }


@webrtc_bp.route("/offer", methods=["POST"])
@validate(payload_model_class=WebRTCOfferRequest)
async def accept_offer(*, payload: WebRTCOfferRequest):
    global rtc_relay
    conn_detail = await _establish_peer_connection(payload.sdp, payload.type)
    return jsonify(conn_detail)

ICE connection state stuck at "WAITING":

2024-03-20 09:16:34.107 | INFO     | service.views.rtc_signalling:_establish_peer_connection:29 - Added relay track
INFO:aioice.ice:Connection(0) Remote candidate "878d6563-7030-4ee0-9757-a5846c0aac7d.local" resolved to 192.168.31.20
2024-03-20 09:16:34.309 | INFO     | service.views.rtc_signalling:_establish_peer_connection:34 - Set local and remote descriptions
INFO:aioice.ice:Connection(0) Check CandidatePair(('192.168.31.250', 54441) -> ('192.168.31.20', 60505)) State.FROZEN -> State.WAITING
INFO:aioice.ice:Connection(0) Check CandidatePair(('172.17.0.1', 47757) -> ('192.168.31.20', 60505)) State.FROZEN -> State.WAITING
INFO:aioice.ice:Connection(0) Check CandidatePair(('10.42.65.0', 56703) -> ('192.168.31.20', 60505)) State.FROZEN -> State.WAITING

@ZoroLH
Copy link

ZoroLH commented Apr 30, 2024

Same quesion here, seams like flask thread stop when return response. "Thread(target=loop.run_forever).start()" solved my problem, but question is should I manually call loop.stop() to avoid keeping creat threads in backend?

@R0NAM1 R0NAM1 reopened this May 1, 2024
@R0NAM1
Copy link
Author

R0NAM1 commented May 1, 2024

Turn's out I am also still having an issue,

@namoshizun I don't know how to fix that off that bat, sorry. I might see if I can restructure my rtc code to be like it though, in case that works.

@ZoroLH I've noticed the issue seems to but not with stopping loogs for threads, they usually close fine, I have AioRTC tasks being left behind for some reason, that's the real issue because that will take up memory and such.

My current issue is that when I call rtcPeer.close() it just sits there and never does anything, and for some reason AioRTC objects never seem to stop or go out of memory into a garbage collector. I have a github discussion here about it:

#1088

@R0NAM1
Copy link
Author

R0NAM1 commented May 1, 2024

@namoshizun I don't know what I was talking about before with trying some code from yours, I think I misread how you do your asyncio stuff.

My guess is either that your STUN server is unreachable / down or that something is just running forever, try putting print lines between everything that's important, and see if the SDP is ever returned to the client, if so then it just might need to run forever or something else is wrong.

@hoainamken
Copy link

Hi @R0NAM1 ,
Sorry for asking in your thread. I've managed to stream video using Flask backend with aiortc. I'm using loop.run_forever to keep the player running in the background. Everything works fine until I use Gunicorn. The ICE state changes from 'checking' to 'connected' to 'disconnected'. In UI, the video displays loading forever.

Have you experienced the same issue?

@R0NAM1
Copy link
Author

R0NAM1 commented May 17, 2024

Hi @R0NAM1 , Sorry for asking in your thread. I've managed to stream video using Flask backend with aiortc. I'm using loop.run_forever to keep the player running in the background. Everything works fine until I use Gunicorn. The ICE state changes from 'checking' to 'connected' to 'disconnected'. In UI, the video displays loading forever.

Have you experienced the same issue?

I don't use Gunicorn, I use Waitress and I know the Webrtc Streams run fine, but my issue is with closing WebRTC objects and Threads, right now I've figured out how to close objects, but that only works if the client streams for a short time.

Got a question, using threading.active_count() could you see if your version of the AioRTC code with Flask closes everything properly, even after, say a 24 hour stream? If you application permits this kind of testing, of course. If you think it does, could you share your source so I can learn how to do it properly?

@hoainamken
Copy link

@R0NAM1
Thanks for clarifying. I switched from using Gunicorn to Hypercorn, and now it works. I suppose Gunicorn does not work well with async tasks.
Regarding your question, I haven't tested streaming for a long time, so I couldn't answer. I'll get back to you if I have a chance to test it.

@R0NAM1
Copy link
Author

R0NAM1 commented May 18, 2024

@R0NAM1 Thanks for clarifying. I switched from using Gunicorn to Hypercorn, and now it works. I suppose Gunicorn does not work well with async tasks. Regarding your question, I haven't tested streaming for a long time, so I couldn't answer. I'll get back to you if I have a chance to test it.

Thanks, it's entirely possible part of my issue is the server I'm using with flask, I'll move back to the inbuilt dev server just to test.
Could you also share the closing part of your code? Whatever calls RTCPeerConnection.close().

@hoainamken
Copy link

pc is my RTCPeerConnection, when it closes on the frontend, the backend site will automatically stop

disconnect() {
      if (this.pc) {
        this.pc.close();
      } 
}

@R0NAM1
Copy link
Author

R0NAM1 commented May 18, 2024

pc is my RTCPeerConnection, when it closes on the frontend, the backend site will automatically stop

disconnect() {
      if (this.pc) {
        this.pc.close();
      } 
}

I mean more the context it is called in. For instance, (I haven't committed it yet) my code is called when a client is offering SDP to start a WebRTC session:

# WEBRTC ===================================
# When we grab a WebRTC offer from out browser client for single camera.
@app.route('/rtcoffer/<cam>', methods=['GET', 'POST'])
@login_required
def webRTCOFFER(cam):

...
...

 # Get SDP from client and set set objects
    print("Attmepting SDP")
    print("Running Threads: " + str(active_count())) 
    parsedSDP, dataChannel = userUUIDAssociations[thisUUID][2].run_until_complete(singleWebRtcStart(thisUUID, dockerIPString, cam, request))
    
    # parsedSDP = asyncio.run(singleWebRtcStart(thisUUID, dockerIPString, cam, request))
    
    print("Got Parsed SDP")
    print("Running Threads: " + str(active_count())) 

    # print("After parsed SDP: " + str(userUUIDAssociations[thisUUID][2]))
    
    # Continue running that loop forever to keep AioRTC Objects In Memory Executing, while shifting it to
    # Another thread so we don't block the code.
    # webRTCThread = Thread(target=userUUIDAssociations[thisUUID][2].run_forever)
    print("Running Threads Before Thread: " + str(active_count())) 
    webRTCThread = Thread(target=userUUIDAssociations[thisUUID][2].run_until_complete, args=(rtcWatchdog(thisUUID, dataChannel), ))
    webRTCThread.start()
    print("Running Threads After Thread: " + str(active_count())) 

    # Return Our Parsed SDP to the client
    return parsedSDP.encode()

That's where flask comes in, below is singleWebRtcStart (Everything relevant)

# Single View WebRtc Start
async def singleWebRtcStart(thisUUID, dockerIP, cameraName, request):
    # Pingtime holds the UUID and the current ping time (Time since last ping pong message from client, if over 5 seconds or so get rid of)
    global userUUIDAssociations
    
    # Set params from SDP Client Request to set objects
    params = ast.literal_eval((request.data).decode("UTF-8"))
    # Set offer to params parsed.
    offer = RTCSessionDescription(sdp=params.get("sdp"), type=params.get("type"))

    # Set RTCConfig Option Ice STUN Server, should be a local one incase internet is down!
    # Set ICE Server to local server, hardcoded to be NVR, change at install!
    webRtcPeer = RTCPeerConnection(configuration=RTCConfiguration(
    iceServers=[RTCIceServer(
        urls=[stunServer])]))

    # I need to call requestCameraPlayer to get a player to make readers from
    cameraPlayer = requestCameraPlayer(dockerIP)

    # Create tracks to tie to transceivers
    camAudioTrack = AudioCameraPlayerTrack(cameraPlayer, thisUUID)
    camVideoTrack = VideoCameraPlayerTrack(cameraPlayer, thisUUID)
    
    if (camVideoTrack):
        webRtcPeer.addTransceiver(camVideoTrack, direction='sendonly')
    if (camAudioTrack):
        webRtcPeer.addTransceiver(camAudioTrack, direction='sendrecv')

...
...

dataChannelToSend = None

    # Create Event Watcher On Data Channel To Know If Client Is Still Alive, AKA Ping - Pong
    # Also process messages from client
    @webRtcPeer.on("datachannel")
    def on_datachannel(channel):
        dataChannelToSend = channel
        # Check if camera supports PTZ and/or TWA
        if (hasPTZ == True):
            ptzcoords = 'Supported' #PTZ Coords will be part of WebRTC Communication, send every 0.5 seconds.
            update_task = asyncio.create_task(updatePTZReadOut(webRtcPeer, cameraName, channel)) # Update forever until cancelled  

        if (hasTWA == True):
            if webRtcPeer.sctp.state == 'connected':
                channel.send("truetwa") # Allows Remote TWA Toggle to be clicked and processed.

        tmpCamTuple = False
        
        userUUIDAssociations[thisUUID][1] = time.time()

        # Data channel created, on message sent from peer.
        @channel.on("message")
        async def on_message(message):
            global userUUIDAssociations
            # Won't always be connected when sending a message, try and pass here
            try:

                # If ping, send heartbeat pong
                if isinstance(message, str) and message.startswith("ping"):
                    userUUIDAssociations[thisUUID][1] = time.time()
                    if webRtcPeer.sctp.state == 'connected':
                        channel.send("pong" + message[4:])

...
...

                   
    # Wait to Set Remote Description
    await webRtcPeer.setRemoteDescription(offer)


    # Generate Answer to Give To Peer
    answer = await webRtcPeer.createAnswer()


    # Set Description of Peer to answer.
    await webRtcPeer.setLocalDescription(answer)


    # Set response to client from the generated objects
    final = ("{0}").format(json.dumps(
            {"sdp": (webRtcPeer.localDescription.sdp), "type": webRtcPeer.localDescription.type}
        ))
    
    # Append webrtcpeer
    userUUIDAssociations[thisUUID].append(webRtcPeer)
    
    # Return response to client
    return final, dataChannelToSend

(requestCameraPlayer is a modified AioRTC MediaPlayer that can support multiple WebRTC consumers, the buffer is a single object instead of a queue, so no retransmissions!)

How the RTCPeerClient is CLOSED (The context I want you to provide the most) in my code right now:

async def rtcWatchdog(uuid, dataChannel):
    global userUUIDAssociations
    print("Starting Watchdog, waiting 5 seconds")
    await asyncio.sleep(5)
    # While kill signal is false check uuid and if time goes over 5 seconds, then stop RTCPeer.
    while sigint == False:
        # print("Check UUID: " + uuid)
        iPingtime = userUUIDAssociations[uuid][1]
        diffTime = float(time.time()) - float(iPingtime)
        
        await asyncio.sleep(1)
        # print("Found Pingtime: " + str(iPingtime))
        # print("Diff time:" + str(diffTime))
                        
        if (diffTime > 5.0 and iPingtime > 0):
            print("Broken Watchdog, killing loop uuid: " + str(uuid))                  
            print("Running Threads Now: " + str(threading.active_count()))
                
            try:    
                time.sleep(3)
                
                print("Is Running: " + str(userUUIDAssociations[uuid][2].is_running()))
                print("Is Closed: " + str(userUUIDAssociations[uuid][2].is_closed()))
                print("RtcConnectionState: ")
                print((userUUIDAssociations[uuid][5]).connectionState)
                        
                # =========================================================================================
                
                print("== Trying to stop RTC Peer== ")
                await (userUUIDAssociations[uuid][5]).close()                
                print("== Is stopped? ==")
                
                await asyncio.sleep(3)

                if dataChannel is not None:
                    print("== Trying to stop RTC Peer Datachannel == ")
                    await dataChannel.close()                
                    print("== Is stopped? ==")
                else:
                    print("!!! Couldn't close datachannel, was None!")
                
                print("Is Running: " + str(userUUIDAssociations[uuid][2].is_running()))
                print("Is Closed: " + str(userUUIDAssociations[uuid][2].is_closed()))
                print("RtcConnectionState: ")
                print((userUUIDAssociations[uuid][5]).connectionState)
                                    
                await asyncio.sleep(2)

                

                print("== Trying to stop event loop ==")
                (userUUIDAssociations[uuid][2].stop()) # This tells the Event Loop to stop executing code, but it does not CLOSE the loop! (Which leaves 1 cascading thread!)
                print("== Event loop stopped?, waiting 5 seconds ==")
                                
                # print("== Trying to close event loop ==")
                # userUUIDAssociations[uuid][2].close() #  Close event loop, which reduces thread count back to what it was originally. <---- THIS WAS A BIG FIX
                # print("== Event loop closed ==")
                
                print("Is Running: " + str(userUUIDAssociations[uuid][2].is_running()))
                print("Is Closed: " + str(userUUIDAssociations[uuid][2].is_closed()))
                print("RtcConnectionState: ")
                print((userUUIDAssociations[uuid][5]).connectionState)
                
                print("Running Threads After No RTC: " + str(threading.active_count()))
                # Remove from userUUIDAssociations
                del userUUIDAssociations[uuid]
                
                print("Watchdog is over, should have quit!")
                break
            
            except Exception as e:
                print("Failed in stopping event loop")
                print(e)

The issue right now is that when a client is connected for a short time when RTC.close() runs two or three threads stay open for about 5 minutes then close, but when I run it for a long time (Above an hour) it gets stuck on RTC.close(), never progressing. It used to always do that, so I guess progress.

Thanks!

@R0NAM1
Copy link
Author

R0NAM1 commented May 18, 2024

A good thing an LLM is used for is querying large datasets in a human way, basically talking with data. This is useful for coding especially because the LLM can reword things very well and provide very good examples.

I bring this up because I just asked one 'Show me how to close an AioRTC RTCPeerConnection with a flask application' and it mentions that I need to close all the tracks it's connected to, and then close the RTCPeer! I'll try that, but that is interesting, isn't it?

from phind.com:

To close an `RTCPeerConnection` within a Flask application using aiortc, you need to ensure that all media tracks are properly stopped and then call the `close()` method on the `RTCPeerConnection` object. This process involves stopping any active media streams, such as audio or video, and then gracefully shutting down the connection.

Here's a step-by-step guide based on the provided sources:

1. **Stop All Media Tracks**: Before closing the `RTCPeerConnection`, ensure that all media tracks (e.g., audio, video) are stopped. This can be done by calling the `stop()` method on each track associated with the peer connection.

2. **Close the Peer Connection**: After stopping all tracks, call the `close()` method on the `RTCPeerConnection` object. This will initiate the closure of the connection.

3. **Handle Exceptions**: Ensure that your code handles exceptions that might occur during these operations, especially since network operations can fail due to various reasons.

Here's an example implementation within a Flask route that handles the shutdown process:

```python
from flask import Flask, jsonify
from aiortc import RTCPeerConnection, RTCSessionDescription
import asyncio

app = Flask(__name__)

@app.route('/shutdown', methods=['POST'])
async def shutdown():
    # Assuming pc is your RTCPeerConnection instance
    if pc is not None:
        # Stop all tracks
        for track in pc.getTracks():
            track.stop()
        
        # Close the peer connection
        await pc.close()
    
    return jsonify({'status': 'success'})

if __name__ == '__main__':
    app.run(debug=True)

In this example, when a POST request is made to /shutdown, the application stops all tracks associated with the RTCPeerConnection (pc) and then closes the connection. Make sure to replace pc with your actual RTCPeerConnection instance variable.

Remember, the RTCPeerConnection class is asynchronous, so you need to use await when calling its methods inside an asynchronous function. Also, ensure that your Flask application is set up to handle asynchronous routes correctly, possibly using aiohttp instead of Flask's built-in routing system for full asynchronous support.

@R0NAM1
Copy link
Author

R0NAM1 commented May 18, 2024

Realization!
I have the CameraPlayer object, but I create an Audio and Video track that are the parasites allowing this RTC session to use it, and I never close those tracks! I need to! I'll try it soon.

@R0NAM1
Copy link
Author

R0NAM1 commented May 19, 2024

Turns out that's what pc.close() does anyway, but I'm delving deeper, for some reason it's the RTPSender not closing, something in there isn't working correctly but I need to wait a few hours, maybe overnight to let the code run

@R0NAM1
Copy link
Author

R0NAM1 commented May 19, 2024

I think I found the issue, the code gets stuck because the RTCP loop never exits:

async def _run_rtcp(self) -> None:
        self.__log_debug("- RTCP started")
        self.__rtcp_started.set()

        try:
            while True:
                # The interval between RTCP packets is varied randomly over the
                # range [0.5, 1.5] times the calculated interval.
                await asyncio.sleep(0.5 + random.random())

                # RTCP SR
                packets: List[AnyRtcpPacket] = [
                    RtcpSrPacket(
                        ssrc=self._ssrc,
                        sender_info=RtcpSenderInfo(
                            ntp_timestamp=self.__ntp_timestamp,
                            rtp_timestamp=self.__rtp_timestamp,
                            packet_count=self.__packet_count,
                            octet_count=self.__octet_count,
                        ),
                    )
                ]
                self.__lsr = ((self.__ntp_timestamp) >> 16) & 0xFFFFFFFF
                self.__lsr_time = time.time()

                # RTCP SDES
                if self.__cname is not None:
                    packets.append(
                        RtcpSdesPacket(
                            chunks=[
                                RtcpSourceInfo(
                                    ssrc=self._ssrc,
                                    items=[(1, self.__cname.encode("utf8"))],
                                )
                            ]
                        )
                    )

                await self._send_rtcp(packets)
        except asyncio.CancelledError:
            pass

        print("RTCP Loop Broken")

        # RTCP BYE
        packet = RtcpByePacket(sources=[self._ssrc])
        await self._send_rtcp([packet])

        print("RTCP Finished")
        self.__log_debug("- RTCP finished")
        self.__rtcp_exited.set()

I'm very sure it gets stuck at await self._send_rtcp(packets) and does NOT continue because it will wait forever, if it continued it would exit. I'm testing this with

await asyncio.wait_for(self._send_rtcp(packets), timeout=5.0)

to see if I can fix this issue, if this does I might be able to open a pull request and contribute to open source, I guess that's naturally when bending how a library is suppost to work.

@R0NAM1
Copy link
Author

R0NAM1 commented May 20, 2024

(It could also be me not closing the datachannel, as I haven't tried that, I will later... I don't think RTCPeerConnection.close() does from what I looked at)

@ZoroLH
Copy link

ZoroLH commented May 20, 2024

Are you experiencing performance issues with Flask? I switched to aiohttp because, with Flask, the video stream FPS would drop significantly when multiple video clients were opened. I haven't encountered this issue with aiohttp.

@R0NAM1
Copy link
Author

R0NAM1 commented May 20, 2024

Are you experiencing performance issues with Flask? I switched to aiohttp because, with Flask, the video stream FPS would drop significantly when multiple video clients were opened. I haven't encountered this issue with aiohttp.

I don't believe so, just some normal performance issues when using a mid gaming PC and a bloated copy of Debian running 2 4k 24/7 cameras recordings and testing webrtc streaming all day.

I also found out where my issues lie, RTCP never stops!


    async def _run_rtcp(self) -> None:
        self.__log_debug("- RTCP started")
        self.__rtcp_started.set()

        try:
            while True:
                # The interval between RTCP packets is varied randomly over the
                # range [0.5, 1.5] times the calculated interval.
                # await asyncio.sleep(0.5 + random.random())
                await asyncio.wait_for(asyncio.sleep(0.5 + random.random()), timeout=5.0)

                # RTCP SR
                packets: List[AnyRtcpPacket] = [
                    RtcpSrPacket(
                        ssrc=self._ssrc,
                        sender_info=RtcpSenderInfo(
                            ntp_timestamp=self.__ntp_timestamp,
                            rtp_timestamp=self.__rtp_timestamp,
                            packet_count=self.__packet_count,
                            octet_count=self.__octet_count,
                        ),
                    )
                ]
                self.__lsr = ((self.__ntp_timestamp) >> 16) & 0xFFFFFFFF
                self.__lsr_time = time.time()

                # RTCP SDES
                if self.__cname is not None:
                    packets.append(
                        RtcpSdesPacket(
                            chunks=[
                                RtcpSourceInfo(
                                    ssrc=self._ssrc,
                                    items=[(1, self.__cname.encode("utf8"))],
                                )
                            ]
                        )
                    )

                #await self._send_rtcp(packets)
                await asyncio.wait_for(self._send_rtcp(packets), timeout=5.0)
                print("Send RTCP")
                
        except asyncio.CancelledError:
            pass

Even after everything closes 'Send RTCP' still prints, so something is keeping it alive.

@R0NAM1
Copy link
Author

R0NAM1 commented May 20, 2024

I may have just fixed it, it looks like asyncio cancelling isn't reliable in this context, causing RTCP to never exit. Switching over to an asyncio.Event() fixes this:

self.__rtcp_wantExit = asyncio.Event()
...
    async def stop(self):
        """
        Irreversibly stop the sender.
        """
        if self.__started:
            print("Stopping RTP Sender, unregistering.")
            self.__transport._unregister_rtp_sender(self)

            # shutdown RTP and RTCP tasks
            print("Awaiting rtp and rtcp waiting started")
            await asyncio.gather(self.__rtp_started.wait(), self.__rtcp_started.wait())
            print("Canceling RTP")
            self.__rtp_task.cancel()
            print("Canceling RTCP")
            self.__rtcp_task.cancel()
            self.__rtcp_wantExit.set()
            print("Awaiting rtp and rtcp exit waiting started")
            await asyncio.gather(self.__rtp_exited.wait(), self.__rtcp_exited.wait())
...
                if self.__rtcp_wantExit.is_set():
                    break

Resulting output:

Running Threads: 15
Send RTCP
Send RTCP
Send RTCP
Send RTCP
Send RTCP
Send RTCP
Send RTCP
Send RTCP
Send RTCP
Send RTCP
Send RTCP
Send RTCP
Send RTCP
Broken Watchdog, killing loop uuid: 7733602c-a3d2-4d08-be12-79e863a3927a
Running Threads Now: 15
Is Running: True
Is Closed: False
RtcConnectionState: 
connected
== Trying to stop RTC Peer Datachannel == 
== Is stopped? ==
== Attempting to close all tracks associated with RTCPeer ==
Closing track: <aiortc.rtcrtptransceiver.RTCRtpTransceiver object at 0x7fc4d8f44110>
AIORTC: Stopping Receiver
AIORTC: Stopped Receiver
AIORTC: Stopping Sender
Stopping RTP Sender, unregistering.
Awaiting rtp and rtcp waiting started
Canceling RTP
Canceling RTCP
Awaiting rtp and rtcp exit waiting started
Send RTCP
RTCP Loop Broken
Send RTCP
Send RTCP
Send RTCP
RTCP Finished
AIORTC: Stopped Sender
Closing track: <aiortc.rtcrtptransceiver.RTCRtpTransceiver object at 0x7fc4d834e8d0>
AIORTC: Stopping Receiver
AIORTC: Stopped Receiver
AIORTC: Stopping Sender
Stopping RTP Sender, unregistering.
Awaiting rtp and rtcp waiting started
Canceling RTP
Canceling RTCP
Awaiting rtp and rtcp exit waiting started
RTCP Loop Broken
RTCP Finished
AIORTC: Stopped Sender
Closing track: <aiortc.rtcrtptransceiver.RTCRtpTransceiver object at 0x7fc4d836b510>
AIORTC: Stopping Receiver
AIORTC: Stopped Receiver
AIORTC: Stopping Sender
Stopping RTP Sender, unregistering.
Awaiting rtp and rtcp waiting started
Canceling RTP
Canceling RTCP
Awaiting rtp and rtcp exit waiting started
RTCP Loop Broken
RTCP Finished
AIORTC: Stopped Sender
Closing track: <aiortc.rtcrtptransceiver.RTCRtpTransceiver object at 0x7fc4d8388150>
AIORTC: Stopping Receiver
AIORTC: Stopped Receiver
AIORTC: Stopping Sender
Stopping RTP Sender, unregistering.
Awaiting rtp and rtcp waiting started
Canceling RTP
Canceling RTCP
Awaiting rtp and rtcp exit waiting started
RTCP Loop Broken
RTCP Finished
AIORTC: Stopped Sender
All closed?
== Trying to stop RTC Peer== 
AIORTC: Stopping Receiver
AIORTC: Stopped Receiver
AIORTC: Stopping Sender
Stopping RTP Sender, unregistering.
Awaiting rtp and rtcp waiting started
Canceling RTP
Canceling RTCP
Awaiting rtp and rtcp exit waiting started
AIORTC: Stopped Sender
AIORTC: Stopping Receiver
AIORTC: Stopped Receiver
AIORTC: Stopping Sender
Stopping RTP Sender, unregistering.
Awaiting rtp and rtcp waiting started
Canceling RTP
Canceling RTCP
Awaiting rtp and rtcp exit waiting started
AIORTC: Stopped Sender
AIORTC: Stopping Receiver
AIORTC: Stopped Receiver
AIORTC: Stopping Sender
Stopping RTP Sender, unregistering.
Awaiting rtp and rtcp waiting started
Canceling RTP
Canceling RTCP
Awaiting rtp and rtcp exit waiting started
AIORTC: Stopped Sender
AIORTC: Stopping Receiver
AIORTC: Stopped Receiver
AIORTC: Stopping Sender
Stopping RTP Sender, unregistering.
Awaiting rtp and rtcp waiting started
Canceling RTP
Canceling RTCP
Awaiting rtp and rtcp exit waiting started
AIORTC: Stopped Sender
== Is stopped? ==


Is Running: True
Is Closed: False
RtcConnectionState: 
closed


== Trying to stop event loop ==
== Event loop stopped?, waiting 5 seconds ==
Is Running: True
Is Closed: False
RtcConnectionState: 
closed
Running Threads After No RTC: 15
Watchdog is over, should have quit!


Running Threads: 14

I'll let this run until I go to bed and see what the behavior is, hope is this fixes it.

@R0NAM1
Copy link
Author

R0NAM1 commented May 22, 2024

I think it works, eventually the threads always stop and if they don't they seem to be reused by other clients (?). I'll make a pull request soon and see if it gets approved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants