From 94a7ea9bf1e4cc883c9a5cd533988c33d0d9891e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?th=E1=BB=8Bnh?= Date: Wed, 12 Mar 2025 14:41:59 +0700 Subject: [PATCH] x2 connections for the external integration --- backend/routers/transcribe.py | 67 ++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/backend/routers/transcribe.py b/backend/routers/transcribe.py index a46b70995ec..940d89e4c97 100644 --- a/backend/routers/transcribe.py +++ b/backend/routers/transcribe.py @@ -378,11 +378,9 @@ async def deepgram_socket_send(data): def create_pusher_task_handler(): nonlocal websocket_active - pusher_ws = None pusher_connect_lock = asyncio.Lock() - pusher_connected = False - - # Transcript + pusher_transcript_connected = False + pusher_audio_connected = False transcript_ws = None segment_buffers = [] in_progress_memory_id = None @@ -398,7 +396,7 @@ async def transcript_consume(): nonlocal segment_buffers nonlocal in_progress_memory_id nonlocal transcript_ws - nonlocal pusher_connected + nonlocal pusher_transcript_connected while websocket_active or len(segment_buffers) > 0: await asyncio.sleep(1) if transcript_ws and len(segment_buffers) > 0: @@ -412,8 +410,8 @@ async def transcript_consume(): except websockets.exceptions.ConnectionClosed as e: print(f"Pusher transcripts Connection closed: {e}", uid) transcript_ws = None - pusher_connected = False - await reconnect() + pusher_transcript_connected = False + await connect_transcript() except Exception as e: print(f"Pusher transcripts failed: {e}", uid) @@ -430,7 +428,7 @@ async def audio_bytes_consume(): nonlocal websocket_active nonlocal audio_buffers nonlocal audio_bytes_ws - nonlocal pusher_connected + nonlocal pusher_audio_connected while websocket_active or len(audio_buffers) > 0: await asyncio.sleep(1) if audio_bytes_ws and len(audio_buffers) > 0: @@ -444,37 +442,58 @@ async def audio_bytes_consume(): except websockets.exceptions.ConnectionClosed as e: print(f"Pusher audio_bytes Connection closed: {e}", uid) audio_bytes_ws = None - pusher_connected = False - await reconnect() + pusher_audio_connected = False + await connect_audio() except Exception as e: print(f"Pusher audio_bytes failed: {e}", uid) - async def reconnect(): - nonlocal pusher_connected + async def connect(): + await connect_transcript() + await connect_audio() + + async def connect_transcript(): + nonlocal pusher_transcript_connected nonlocal pusher_connect_lock async with pusher_connect_lock: - if pusher_connected: + if pusher_transcript_connected: return - await connect() + await _connect_transcript() - async def connect(): - nonlocal pusher_ws + async def connect_audio(): + nonlocal pusher_audio_connected + nonlocal pusher_connect_lock + async with pusher_connect_lock: + if pusher_audio_connected: + return + await _connect_audio() + + async def _connect_transcript(): nonlocal transcript_ws + nonlocal pusher_transcript_connected + try: + transcript_ws = await connect_to_trigger_pusher(uid, sample_rate) + pusher_transcript_connected = True + except Exception as e: + print(f"Exception in connect transcript pusher: {e}") + + async def _connect_audio(): nonlocal audio_bytes_ws nonlocal audio_bytes_enabled - nonlocal pusher_connected + nonlocal pusher_audio_connected + + if not audio_bytes_enabled: + return try: - pusher_ws = await connect_to_trigger_pusher(uid, sample_rate) - pusher_connected = True - transcript_ws = pusher_ws - if audio_bytes_enabled: - audio_bytes_ws = pusher_ws + audio_bytes_ws = await connect_to_trigger_pusher(uid, sample_rate) + pusher_audio_connected = True except Exception as e: - print(f"Exception in connect: {e}") + print(f"Exception in connect audio pusher: {e}") async def close(code: int = 1000): - await pusher_ws.close(code) + await transcript_ws.close(code) + if audio_bytes_ws: + await audio_bytes_ws.close(code) return (connect, close, transcript_send, transcript_consume,