Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 43 additions & 24 deletions backend/routers/transcribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,9 @@ async def deepgram_socket_send(data):
def create_pusher_task_handler():
nonlocal websocket_active

pusher_ws = None
pusher_connect_lock = asyncio.Lock()
pusher_connected = False

# Transcript
pusher_transcript_connected = False
pusher_audio_connected = False
transcript_ws = None
segment_buffers = []
in_progress_memory_id = None
Expand All @@ -398,7 +396,7 @@ async def transcript_consume():
nonlocal segment_buffers
nonlocal in_progress_memory_id
nonlocal transcript_ws
nonlocal pusher_connected
nonlocal pusher_transcript_connected
while websocket_active or len(segment_buffers) > 0:
await asyncio.sleep(1)
if transcript_ws and len(segment_buffers) > 0:
Expand All @@ -412,8 +410,8 @@ async def transcript_consume():
except websockets.exceptions.ConnectionClosed as e:
print(f"Pusher transcripts Connection closed: {e}", uid)
transcript_ws = None
pusher_connected = False
await reconnect()
pusher_transcript_connected = False
await connect_transcript()
except Exception as e:
print(f"Pusher transcripts failed: {e}", uid)

Expand All @@ -430,7 +428,7 @@ async def audio_bytes_consume():
nonlocal websocket_active
nonlocal audio_buffers
nonlocal audio_bytes_ws
nonlocal pusher_connected
nonlocal pusher_audio_connected
while websocket_active or len(audio_buffers) > 0:
await asyncio.sleep(1)
if audio_bytes_ws and len(audio_buffers) > 0:
Expand All @@ -444,37 +442,58 @@ async def audio_bytes_consume():
except websockets.exceptions.ConnectionClosed as e:
print(f"Pusher audio_bytes Connection closed: {e}", uid)
audio_bytes_ws = None
pusher_connected = False
await reconnect()
pusher_audio_connected = False
await connect_audio()
except Exception as e:
print(f"Pusher audio_bytes failed: {e}", uid)

async def reconnect():
nonlocal pusher_connected
async def connect():
await connect_transcript()
await connect_audio()

async def connect_transcript():
nonlocal pusher_transcript_connected
nonlocal pusher_connect_lock
async with pusher_connect_lock:
if pusher_connected:
if pusher_transcript_connected:
return
await connect()
await _connect_transcript()

async def connect():
nonlocal pusher_ws
async def connect_audio():
nonlocal pusher_audio_connected
nonlocal pusher_connect_lock
async with pusher_connect_lock:
if pusher_audio_connected:
return
await _connect_audio()

async def _connect_transcript():
nonlocal transcript_ws
nonlocal pusher_transcript_connected
try:
transcript_ws = await connect_to_trigger_pusher(uid, sample_rate)
pusher_transcript_connected = True
except Exception as e:
print(f"Exception in connect transcript pusher: {e}")

async def _connect_audio():
nonlocal audio_bytes_ws
nonlocal audio_bytes_enabled
nonlocal pusher_connected
nonlocal pusher_audio_connected

if not audio_bytes_enabled:
return

try:
pusher_ws = await connect_to_trigger_pusher(uid, sample_rate)
pusher_connected = True
transcript_ws = pusher_ws
if audio_bytes_enabled:
audio_bytes_ws = pusher_ws
audio_bytes_ws = await connect_to_trigger_pusher(uid, sample_rate)
pusher_audio_connected = True
except Exception as e:
print(f"Exception in connect: {e}")
print(f"Exception in connect audio pusher: {e}")

async def close(code: int = 1000):
await pusher_ws.close(code)
await transcript_ws.close(code)
if audio_bytes_ws:
await audio_bytes_ws.close(code)

return (connect, close,
transcript_send, transcript_consume,
Expand Down