From b6286b002fb2719bd28d6f54c755c7ba393f464a Mon Sep 17 00:00:00 2001 From: Katrina Date: Tue, 28 Apr 2026 17:17:40 -0400 Subject: [PATCH 01/11] update nvidia websocket code to use nvidia websocket events --- src/eva/assistant/pipeline/nvidia_stt.py | 172 +++++++++++++++++++++-- src/eva/assistant/pipeline/services.py | 2 + 2 files changed, 159 insertions(+), 15 deletions(-) diff --git a/src/eva/assistant/pipeline/nvidia_stt.py b/src/eva/assistant/pipeline/nvidia_stt.py index 1e6b0974..f3cf49c7 100644 --- a/src/eva/assistant/pipeline/nvidia_stt.py +++ b/src/eva/assistant/pipeline/nvidia_stt.py @@ -7,11 +7,14 @@ """ import asyncio +import base64 import json import ssl import time from collections.abc import AsyncGenerator +from urllib.parse import urlparse +import httpx import websockets from loguru import logger from pipecat.frames.frames import ( @@ -37,11 +40,11 @@ class NVidiaWebSocketSTTService(WebsocketSTTService): Provides real-time speech recognition using NVIDIA's Parakeet ASR model via WebSocket. - Server protocol: - - Audio in: 16-bit PCM, 16kHz, mono (raw bytes) - - Reset in: {"type": "reset", "finalize": true} (triggers final transcript) - - Ready out: {"type": "ready"} - - Transcript out: {"type": "transcript", "text": "...", "is_final": true/false} + Server protocol (OpenAI Realtime API): + - Audio in: {"type": "input_audio_buffer.append", "audio": ""} + - Commit in: {"type": "input_audio_buffer.commit"} (triggers final transcript) + - Ready out: {"type": "conversation.created"} + - Transcript out: {"type": "conversation.item.input_audio_transcription.completed", ...} """ def __init__( @@ -51,15 +54,19 @@ def __init__( api_key: str | None = None, sample_rate: int = 16000, verify: bool = True, + model: str | None = None, **kwargs, ): super().__init__(sample_rate=sample_rate, **kwargs) self._url = url self._api_key = api_key self._verify = verify + self._asr_model = None self._websocket = None self._receive_task: asyncio.Task | None = None self._ready = False + self._finalize_requested = False + self._transcript_parts: list[str] = [] def can_generate_metrics(self) -> bool: return True @@ -80,26 +87,44 @@ async def cancel(self, frame: CancelFrame): # -- Audio sending -- + _audio_chunk_count: int = 0 + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: if self._websocket and self._ready: try: - await self._websocket.send(audio) + # Send audio chunk then commit, matching the reference client pattern + await self._websocket.send( + json.dumps( + { + "type": "input_audio_buffer.append", + "audio": base64.b64encode(audio).decode("ascii"), + } + ) + ) + await self._websocket.send( + json.dumps( + { + "type": "input_audio_buffer.commit", + } + ) + ) + self._audio_chunk_count += 1 + if self._audio_chunk_count % 50 == 1: + logger.debug(f"{self} sent audio chunk #{self._audio_chunk_count} ({len(audio)} bytes)") except Exception as e: logger.error(f"{self} failed to send audio: {e}") + elif not self._ready: + logger.warning(f"{self} audio dropped — not ready") yield None - # -- VAD handling (send reset on speech end, like AssemblyAI's ForceEndpoint) -- + # -- VAD handling -- async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if isinstance(frame, VADUserStoppedSpeakingFrame): - if self._websocket and self._ready: - self.request_finalize() - try: - await self._websocket.send(json.dumps({"type": "reset", "finalize": True})) - except Exception as e: - logger.error(f"{self} failed to send reset: {e}") + self._finalize_requested = True + self.request_finalize() await self.start_processing_metrics() # -- Connection management -- @@ -141,11 +166,16 @@ async def _connect_websocket(self): # Wait for ready message from server try: + logger.info(f"Connecting to {self._url}") ready_msg = await asyncio.wait_for(self._websocket.recv(), timeout=5.0) data = json.loads(ready_msg) if data.get("type") == "ready": self._ready = True logger.info(f"{self} connected and ready") + elif data.get("type") == "conversation.created": + logger.info("Conversation created successfully") + await self._configure_session() + self._ready = True else: logger.warning(f"{self} unexpected initial message: {data}") self._ready = True @@ -159,6 +189,62 @@ async def _connect_websocket(self): logger.error(f"{self} connection failed: {e}") raise + async def _initialize_http_session(self) -> dict: + """Initialize session via HTTP POST to get server defaults (model, sample rate, etc.).""" + parsed = urlparse(self._url) + scheme = "https" if parsed.scheme == "wss" else "http" + http_url = f"{scheme}://{parsed.hostname}" + if parsed.port: + http_url += f":{parsed.port}" + http_url += "/v1/realtime/transcription_sessions" + + headers = {"Content-Type": "application/json"} + if self._api_key: + headers["Authorization"] = f"Bearer {self._api_key}" + + async with httpx.AsyncClient(verify=self._verify) as client: + response = await client.post(http_url, headers=headers, json={}) + response.raise_for_status() + session_data = response.json() + logger.info(f"{self} HTTP session initialized: {session_data}") + return session_data + + async def _configure_session(self): + """Get server defaults via HTTP, then send transcription_session.update over WS.""" + # Step 1: Get server defaults (includes correct model name, sample rate, etc.) + try: + session_config = await self._initialize_http_session() + except Exception as e: + logger.warning(f"{self} HTTP session init failed ({e}), using minimal config") + session_config = {} + + # Step 2: Override only audio format for streaming PCM input. + # Keep server defaults for sample_rate (48kHz) and model — the server + # resamples internally if needed. + session_config["input_audio_format"] = "pcm16" + + if self._asr_model: + session_config.setdefault("input_audio_transcription", {}) + session_config["input_audio_transcription"]["model"] = self._asr_model + + session_update = { + "type": "transcription_session.update", + "session": session_config, + } + logger.info(f"{self} sending session update") + await self._websocket.send(json.dumps(session_update)) + + # Wait for transcription_session.updated confirmation + try: + response = await asyncio.wait_for(self._websocket.recv(), timeout=5.0) + data = json.loads(response) + if data.get("type") == "transcription_session.updated": + logger.info(f"{self} session configured: {data.get('session', {})}") + else: + logger.warning(f"{self} unexpected session update response: {data}") + except TimeoutError: + logger.warning(f"{self} timeout waiting for session update confirmation") + async def _disconnect_websocket(self): self._ready = False if self._websocket: @@ -186,7 +272,61 @@ async def _receive_messages(self): elif msg_type == "ready": self._ready = True elif msg_type == "error": - logger.error(f"{self} server error: {data.get('message')}") + logger.error(f"{self} server error: {data}") + elif msg_type == "conversation.item.input_audio_transcription.delta": + delta = data.get("delta", "") + if delta: + logger.info(f"{self} interim delta: {delta}") + await self.push_frame( + InterimTranscriptionFrame(delta, self._user_id, current_time_ms(), language=None) + ) + + elif msg_type == "conversation.item.input_audio_transcription.completed": + transcript = data.get("transcript", "").strip() + # Ignore is_last_result: the server fires it on its own sentence- + # boundary VAD, which can trigger mid-utterance from pipecat's + # perspective. Only finalize when pipecat VAD has fired. + if transcript: + if self._finalize_requested: + # VAD fired — combine accumulated parts with this sentence. + self._transcript_parts.append(transcript) + full_transcript = " ".join(self._transcript_parts) + self._transcript_parts = [] + self._finalize_requested = False + logger.info(f"{self} final transcript: {full_transcript}") + self.confirm_finalize() + await self.push_frame( + TranscriptionFrame( + full_transcript, self._user_id, current_time_ms(), language=None, finalized=True + ) + ) + await self.stop_processing_metrics() + else: + # VAD hasn't fired yet — accumulate server sentence chunks. + logger.debug(f"{self} interim completed (buffered): {transcript}") + self._transcript_parts.append(transcript) + await self.push_frame( + InterimTranscriptionFrame(transcript, self._user_id, current_time_ms(), language=None) + ) + elif self._finalize_requested: + # Empty completed after VAD fired. + if self._transcript_parts: + # Server sent silence audio; finalize with accumulated text. + full_transcript = " ".join(self._transcript_parts) + self._transcript_parts = [] + self._finalize_requested = False + logger.info(f"{self} final transcript (from buffer): {full_transcript}") + self.confirm_finalize() + await self.push_frame( + TranscriptionFrame( + full_transcript, self._user_id, current_time_ms(), language=None, finalized=True + ) + ) + await self.stop_processing_metrics() + else: + logger.debug(f"{self} empty completed transcript (ghost turn)") + self._finalize_requested = False + self.confirm_finalize() except json.JSONDecodeError: logger.warning(f"{self} non-JSON message received") @@ -207,7 +347,9 @@ async def _handle_transcript(self, data: dict): if is_final: self.confirm_finalize() - await self.push_frame(TranscriptionFrame(text, self._user_id, current_time_ms(), language=None)) + await self.push_frame( + TranscriptionFrame(text, self._user_id, current_time_ms(), language=None, finalized=True) + ) await self.stop_processing_metrics() else: await self.push_frame(InterimTranscriptionFrame(text, self._user_id, current_time_ms(), language=None)) diff --git a/src/eva/assistant/pipeline/services.py b/src/eva/assistant/pipeline/services.py index b3fb4f96..70201404 100644 --- a/src/eva/assistant/pipeline/services.py +++ b/src/eva/assistant/pipeline/services.py @@ -185,6 +185,8 @@ def create_stt_service( api_key=api_key, sample_rate=params.get("sample_rate", SAMPLE_RATE), verify=False, + model=params.get("model"), + language=None, ) elif model_lower == "nvidia-baseten": From 98b3b979f31a8aa8c0eed084c99e7aae21064870 Mon Sep 17 00:00:00 2001 From: Katrina Date: Tue, 28 Apr 2026 21:16:00 -0400 Subject: [PATCH 02/11] adjustments for no vad on parakeet side --- src/eva/assistant/pipeline/nvidia_stt.py | 134 +++++++++-------------- 1 file changed, 54 insertions(+), 80 deletions(-) diff --git a/src/eva/assistant/pipeline/nvidia_stt.py b/src/eva/assistant/pipeline/nvidia_stt.py index f3cf49c7..68cda097 100644 --- a/src/eva/assistant/pipeline/nvidia_stt.py +++ b/src/eva/assistant/pipeline/nvidia_stt.py @@ -1,9 +1,13 @@ """NVIDIA Parakeet streaming speech-to-text service implementation. -Follows the same pattern as Pipecat's built-in AssemblyAI STT service. -The subclass only handles server-specific protocol (connection, audio format, -message parsing). All VAD, TTFB metrics, and finalization are handled by the -WebsocketSTTService base class. +The server streams sentence-level `completed` transcription events as the user +speaks. Finalization is driven entirely by Pipecat VAD: + +- While VAD is active, incoming `completed` events are accumulated in + `_transcript_parts` and forwarded as InterimTranscriptionFrame. +- When VAD fires (VADUserStoppedSpeakingFrame), we finalize immediately: + - If the buffer is already non-empty (server was ahead of VAD), flush now. + - Otherwise set `_finalize_requested` and emit on the next `completed`. """ import asyncio @@ -42,7 +46,7 @@ class NVidiaWebSocketSTTService(WebsocketSTTService): Server protocol (OpenAI Realtime API): - Audio in: {"type": "input_audio_buffer.append", "audio": ""} - - Commit in: {"type": "input_audio_buffer.commit"} (triggers final transcript) + - Commit in: {"type": "input_audio_buffer.commit"} - Ready out: {"type": "conversation.created"} - Transcript out: {"type": "conversation.item.input_audio_transcription.completed", ...} """ @@ -71,7 +75,7 @@ def __init__( def can_generate_metrics(self) -> bool: return True - # -- Lifecycle (matches AssemblyAI pattern exactly) -- + # -- Lifecycle -- async def start(self, frame: StartFrame): await super().start(frame) @@ -92,22 +96,10 @@ async def cancel(self, frame: CancelFrame): async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: if self._websocket and self._ready: try: - # Send audio chunk then commit, matching the reference client pattern await self._websocket.send( - json.dumps( - { - "type": "input_audio_buffer.append", - "audio": base64.b64encode(audio).decode("ascii"), - } - ) - ) - await self._websocket.send( - json.dumps( - { - "type": "input_audio_buffer.commit", - } - ) + json.dumps({"type": "input_audio_buffer.append", "audio": base64.b64encode(audio).decode("ascii")}) ) + await self._websocket.send(json.dumps({"type": "input_audio_buffer.commit"})) self._audio_chunk_count += 1 if self._audio_chunk_count % 50 == 1: logger.debug(f"{self} sent audio chunk #{self._audio_chunk_count} ({len(audio)} bytes)") @@ -126,6 +118,10 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): self._finalize_requested = True self.request_finalize() await self.start_processing_metrics() + # Flush immediately if the server already sent completed chunks + # before VAD fired — no further completed message will arrive. + if self._transcript_parts: + await self._emit_final_transcript() # -- Connection management -- @@ -164,7 +160,6 @@ async def _connect_websocket(self): ) self._ready = False - # Wait for ready message from server try: logger.info(f"Connecting to {self._url}") ready_msg = await asyncio.wait_for(self._websocket.recv(), timeout=5.0) @@ -211,30 +206,21 @@ async def _initialize_http_session(self) -> dict: async def _configure_session(self): """Get server defaults via HTTP, then send transcription_session.update over WS.""" - # Step 1: Get server defaults (includes correct model name, sample rate, etc.) try: session_config = await self._initialize_http_session() except Exception as e: logger.warning(f"{self} HTTP session init failed ({e}), using minimal config") session_config = {} - # Step 2: Override only audio format for streaming PCM input. - # Keep server defaults for sample_rate (48kHz) and model — the server - # resamples internally if needed. session_config["input_audio_format"] = "pcm16" if self._asr_model: session_config.setdefault("input_audio_transcription", {}) session_config["input_audio_transcription"]["model"] = self._asr_model - session_update = { - "type": "transcription_session.update", - "session": session_config, - } logger.info(f"{self} sending session update") - await self._websocket.send(json.dumps(session_update)) + await self._websocket.send(json.dumps({"type": "transcription_session.update", "session": session_config})) - # Wait for transcription_session.updated confirmation try: response = await asyncio.wait_for(self._websocket.recv(), timeout=5.0) data = json.loads(response) @@ -276,70 +262,58 @@ async def _receive_messages(self): elif msg_type == "conversation.item.input_audio_transcription.delta": delta = data.get("delta", "") if delta: - logger.info(f"{self} interim delta: {delta}") + logger.debug(f"{self} interim delta: {delta}") await self.push_frame( InterimTranscriptionFrame(delta, self._user_id, current_time_ms(), language=None) ) - elif msg_type == "conversation.item.input_audio_transcription.completed": - transcript = data.get("transcript", "").strip() - # Ignore is_last_result: the server fires it on its own sentence- - # boundary VAD, which can trigger mid-utterance from pipecat's - # perspective. Only finalize when pipecat VAD has fired. - if transcript: - if self._finalize_requested: - # VAD fired — combine accumulated parts with this sentence. - self._transcript_parts.append(transcript) - full_transcript = " ".join(self._transcript_parts) - self._transcript_parts = [] - self._finalize_requested = False - logger.info(f"{self} final transcript: {full_transcript}") - self.confirm_finalize() - await self.push_frame( - TranscriptionFrame( - full_transcript, self._user_id, current_time_ms(), language=None, finalized=True - ) - ) - await self.stop_processing_metrics() - else: - # VAD hasn't fired yet — accumulate server sentence chunks. - logger.debug(f"{self} interim completed (buffered): {transcript}") - self._transcript_parts.append(transcript) - await self.push_frame( - InterimTranscriptionFrame(transcript, self._user_id, current_time_ms(), language=None) - ) - elif self._finalize_requested: - # Empty completed after VAD fired. - if self._transcript_parts: - # Server sent silence audio; finalize with accumulated text. - full_transcript = " ".join(self._transcript_parts) - self._transcript_parts = [] - self._finalize_requested = False - logger.info(f"{self} final transcript (from buffer): {full_transcript}") - self.confirm_finalize() - await self.push_frame( - TranscriptionFrame( - full_transcript, self._user_id, current_time_ms(), language=None, finalized=True - ) - ) - await self.stop_processing_metrics() - else: - logger.debug(f"{self} empty completed transcript (ghost turn)") - self._finalize_requested = False - self.confirm_finalize() + await self._handle_completed(data) except json.JSONDecodeError: logger.warning(f"{self} non-JSON message received") except Exception as e: logger.error(f"{self} error processing message: {e}") + async def _handle_completed(self, data: dict): + """Handle a server-side sentence completion event.""" + transcript = data.get("transcript", "").strip() + + if transcript: + self._transcript_parts.append(transcript) + if self._finalize_requested: + await self._emit_final_transcript() + else: + logger.debug(f"{self} buffered: {transcript}") + await self.push_frame( + InterimTranscriptionFrame(transcript, self._user_id, current_time_ms(), language=None) + ) + elif self._finalize_requested: + # Empty completed after VAD fired (silence audio). + if self._transcript_parts: + await self._emit_final_transcript() + else: + logger.debug(f"{self} ghost turn (empty completed)") + self._finalize_requested = False + self.confirm_finalize() + + async def _emit_final_transcript(self): + """Flush accumulated transcript parts and emit a finalized TranscriptionFrame.""" + full_transcript = " ".join(self._transcript_parts) + self._transcript_parts = [] + self._finalize_requested = False + logger.info(f"{self} final transcript: {full_transcript}") + self.confirm_finalize() + await self.push_frame( + TranscriptionFrame(full_transcript, self._user_id, current_time_ms(), language=None, finalized=True) + ) + await self.stop_processing_metrics() + async def _handle_transcript(self, data: dict): + """Handle legacy transcript protocol ({"type": "transcript", "is_final": bool}).""" text = data.get("text", "") is_final = data.get("is_final", False) if not text: - # Empty reset response (ghost turn). Push empty finalized - # TranscriptionFrame so the aggregator resolves immediately. if is_final: logger.debug(f"{self} empty final transcript (ghost turn)") self.confirm_finalize() From 79eec27ef4df0f20e895c269ae23bfeee73c8fc4 Mon Sep 17 00:00:00 2001 From: Katrina Date: Wed, 29 Apr 2026 09:56:01 -0400 Subject: [PATCH 03/11] clean up after removing vad from parakeet side --- src/eva/assistant/pipeline/nvidia_stt.py | 10 ++++++---- src/eva/assistant/pipeline/turn_config.py | 8 +++++--- tests/unit/assistant/test_turn_config.py | 20 ++++++++++++++++++++ 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/src/eva/assistant/pipeline/nvidia_stt.py b/src/eva/assistant/pipeline/nvidia_stt.py index 68cda097..40d20cb5 100644 --- a/src/eva/assistant/pipeline/nvidia_stt.py +++ b/src/eva/assistant/pipeline/nvidia_stt.py @@ -31,6 +31,7 @@ VADUserStoppedSpeakingFrame, ) from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.settings import STTSettings from pipecat.services.stt_service import WebsocketSTTService @@ -61,7 +62,11 @@ def __init__( model: str | None = None, **kwargs, ): - super().__init__(sample_rate=sample_rate, **kwargs) + super().__init__( + sample_rate=sample_rate, + settings=STTSettings(model=None, language=None), + **kwargs, + ) self._url = url self._api_key = api_key self._verify = verify @@ -201,7 +206,6 @@ async def _initialize_http_session(self) -> dict: response = await client.post(http_url, headers=headers, json={}) response.raise_for_status() session_data = response.json() - logger.info(f"{self} HTTP session initialized: {session_data}") return session_data async def _configure_session(self): @@ -218,7 +222,6 @@ async def _configure_session(self): session_config.setdefault("input_audio_transcription", {}) session_config["input_audio_transcription"]["model"] = self._asr_model - logger.info(f"{self} sending session update") await self._websocket.send(json.dumps({"type": "transcription_session.update", "session": session_config})) try: @@ -262,7 +265,6 @@ async def _receive_messages(self): elif msg_type == "conversation.item.input_audio_transcription.delta": delta = data.get("delta", "") if delta: - logger.debug(f"{self} interim delta: {delta}") await self.push_frame( InterimTranscriptionFrame(delta, self._user_id, current_time_ms(), language=None) ) diff --git a/src/eva/assistant/pipeline/turn_config.py b/src/eva/assistant/pipeline/turn_config.py index efe71a0e..e4ec3083 100644 --- a/src/eva/assistant/pipeline/turn_config.py +++ b/src/eva/assistant/pipeline/turn_config.py @@ -111,10 +111,12 @@ def create_turn_stop_strategy( return SpeechTimeoutUserTurnStopStrategy(**strategy_params) elif strategy_type_lower == "turn_analyzer": # TurnAnalyzerUserTurnStopStrategy requires a turn_analyzer instance - # If smart_turn_stop_secs is provided, use it; otherwise let SmartTurnParams use its default - smart_params = SmartTurnParams(stop_secs=smart_turn_stop_secs) if smart_turn_stop_secs is not None else None + # smart_turn_stop_secs can be passed via strategy_params (takes precedence) or the explicit argument + params = dict(strategy_params) + stop_secs = params.pop("smart_turn_stop_secs", smart_turn_stop_secs) + smart_params = SmartTurnParams(stop_secs=stop_secs) if stop_secs is not None else None turn_analyzer = LocalSmartTurnAnalyzerV3(params=smart_params) - return TurnAnalyzerUserTurnStopStrategy(turn_analyzer=turn_analyzer, **strategy_params) + return TurnAnalyzerUserTurnStopStrategy(turn_analyzer=turn_analyzer, **params) elif strategy_type_lower == "external": # ExternalUserTurnStopStrategy has no required parameters return ExternalUserTurnStopStrategy(**strategy_params) diff --git a/tests/unit/assistant/test_turn_config.py b/tests/unit/assistant/test_turn_config.py index 8939a4a8..72e135a6 100644 --- a/tests/unit/assistant/test_turn_config.py +++ b/tests/unit/assistant/test_turn_config.py @@ -185,6 +185,26 @@ def test_turn_analyzer_with_stop_secs(self): assert isinstance(passed_params, SmartTurnParams) assert passed_params.stop_secs == 0.8 + def test_turn_analyzer_smart_turn_stop_secs_via_strategy_params(self): + """smart_turn_stop_secs in strategy_params takes precedence over the function argument.""" + from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams + + mock_analyzer = MagicMock() + with patch( + "eva.assistant.pipeline.turn_config.LocalSmartTurnAnalyzerV3", + return_value=mock_analyzer, + ) as mock_cls: + create_turn_stop_strategy( + "turn_analyzer", + {"smart_turn_stop_secs": 1.5}, + smart_turn_stop_secs=0.8, + ) + + call_args = mock_cls.call_args + passed_params = call_args.kwargs["params"] + assert isinstance(passed_params, SmartTurnParams) + assert passed_params.stop_secs == 1.5 + def test_external_stop_strategy(self): """'external' returns ExternalUserTurnStopStrategy.""" result = create_turn_stop_strategy("external", {}) From 154ba29696ee6f734104e95b7ff93013c90d1fa9 Mon Sep 17 00:00:00 2001 From: Katrina Date: Wed, 29 Apr 2026 11:53:06 -0400 Subject: [PATCH 04/11] Fix Parakeet STT silence flooding and remove legacy protocol handlers --- src/eva/assistant/pipeline/nvidia_stt.py | 45 +++++++----------------- 1 file changed, 12 insertions(+), 33 deletions(-) diff --git a/src/eva/assistant/pipeline/nvidia_stt.py b/src/eva/assistant/pipeline/nvidia_stt.py index 40d20cb5..adf080d1 100644 --- a/src/eva/assistant/pipeline/nvidia_stt.py +++ b/src/eva/assistant/pipeline/nvidia_stt.py @@ -22,6 +22,8 @@ import websockets from loguru import logger from pipecat.frames.frames import ( + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, CancelFrame, EndFrame, Frame, @@ -74,6 +76,7 @@ def __init__( self._websocket = None self._receive_task: asyncio.Task | None = None self._ready = False + self._bot_speaking = False self._finalize_requested = False self._transcript_parts: list[str] = [] @@ -99,7 +102,7 @@ async def cancel(self, frame: CancelFrame): _audio_chunk_count: int = 0 async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: - if self._websocket and self._ready: + if self._websocket and self._ready and not self._bot_speaking: try: await self._websocket.send( json.dumps({"type": "input_audio_buffer.append", "audio": base64.b64encode(audio).decode("ascii")}) @@ -119,7 +122,11 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, VADUserStoppedSpeakingFrame): + if isinstance(frame, BotStartedSpeakingFrame): + self._bot_speaking = True + elif isinstance(frame, BotStoppedSpeakingFrame): + self._bot_speaking = False + elif isinstance(frame, VADUserStoppedSpeakingFrame): self._finalize_requested = True self.request_finalize() await self.start_processing_metrics() @@ -169,16 +176,12 @@ async def _connect_websocket(self): logger.info(f"Connecting to {self._url}") ready_msg = await asyncio.wait_for(self._websocket.recv(), timeout=5.0) data = json.loads(ready_msg) - if data.get("type") == "ready": - self._ready = True - logger.info(f"{self} connected and ready") - elif data.get("type") == "conversation.created": + if data.get("type") == "conversation.created": logger.info("Conversation created successfully") await self._configure_session() - self._ready = True else: logger.warning(f"{self} unexpected initial message: {data}") - self._ready = True + self._ready = True except TimeoutError: logger.warning(f"{self} timeout waiting for ready, proceeding") self._ready = True @@ -256,11 +259,7 @@ async def _receive_messages(self): data = json.loads(message) msg_type = data.get("type") - if msg_type == "transcript": - await self._handle_transcript(data) - elif msg_type == "ready": - self._ready = True - elif msg_type == "error": + if msg_type == "error": logger.error(f"{self} server error: {data}") elif msg_type == "conversation.item.input_audio_transcription.delta": delta = data.get("delta", "") @@ -309,23 +308,3 @@ async def _emit_final_transcript(self): TranscriptionFrame(full_transcript, self._user_id, current_time_ms(), language=None, finalized=True) ) await self.stop_processing_metrics() - - async def _handle_transcript(self, data: dict): - """Handle legacy transcript protocol ({"type": "transcript", "is_final": bool}).""" - text = data.get("text", "") - is_final = data.get("is_final", False) - - if not text: - if is_final: - logger.debug(f"{self} empty final transcript (ghost turn)") - self.confirm_finalize() - return - - if is_final: - self.confirm_finalize() - await self.push_frame( - TranscriptionFrame(text, self._user_id, current_time_ms(), language=None, finalized=True) - ) - await self.stop_processing_metrics() - else: - await self.push_frame(InterimTranscriptionFrame(text, self._user_id, current_time_ms(), language=None)) From 0fcd745b39c21b8606e4ee926f65e13b32b91899 Mon Sep 17 00:00:00 2001 From: Katrina Date: Wed, 29 Apr 2026 12:25:34 -0400 Subject: [PATCH 05/11] try to fix silence issues --- src/eva/assistant/pipeline/nvidia_stt.py | 67 ++++++++++++++++-------- 1 file changed, 45 insertions(+), 22 deletions(-) diff --git a/src/eva/assistant/pipeline/nvidia_stt.py b/src/eva/assistant/pipeline/nvidia_stt.py index adf080d1..a4393a3f 100644 --- a/src/eva/assistant/pipeline/nvidia_stt.py +++ b/src/eva/assistant/pipeline/nvidia_stt.py @@ -3,8 +3,11 @@ The server streams sentence-level `completed` transcription events as the user speaks. Finalization is driven entirely by Pipecat VAD: -- While VAD is active, incoming `completed` events are accumulated in - `_transcript_parts` and forwarded as InterimTranscriptionFrame. +- Audio is only sent to Parakeet while VAD is active (`_user_vad_active=True`). + A rolling pre-speech buffer (~400ms) is replayed when VAD fires so that the + audio captured just before the VAD threshold is not lost. +- Incoming `completed` events are accumulated in `_transcript_parts` and + forwarded as InterimTranscriptionFrame. - When VAD fires (VADUserStoppedSpeakingFrame), we finalize immediately: - If the buffer is already non-empty (server was ahead of VAD), flush now. - Otherwise set `_finalize_requested` and emit on the next `completed`. @@ -15,6 +18,7 @@ import json import ssl import time +from collections import deque from collections.abc import AsyncGenerator from urllib.parse import urlparse @@ -22,14 +26,13 @@ import websockets from loguru import logger from pipecat.frames.frames import ( - BotStartedSpeakingFrame, - BotStoppedSpeakingFrame, CancelFrame, EndFrame, Frame, InterimTranscriptionFrame, StartFrame, TranscriptionFrame, + VADUserStartedSpeakingFrame, VADUserStoppedSpeakingFrame, ) from pipecat.processors.frame_processor import FrameDirection @@ -76,7 +79,9 @@ def __init__( self._websocket = None self._receive_task: asyncio.Task | None = None self._ready = False - self._bot_speaking = False + self._user_vad_active = False + # Rolling pre-speech buffer: ~400ms at 20ms/chunk + self._pre_speech_buffer: deque[bytes] = deque(maxlen=20) self._finalize_requested = False self._transcript_parts: list[str] = [] @@ -101,20 +106,33 @@ async def cancel(self, frame: CancelFrame): _audio_chunk_count: int = 0 + async def _send_audio(self, audio: bytes): + """Send a single audio chunk to Parakeet (append + commit).""" + try: + await self._websocket.send( + json.dumps({"type": "input_audio_buffer.append", "audio": base64.b64encode(audio).decode("ascii")}) + ) + await self._websocket.send(json.dumps({"type": "input_audio_buffer.commit"})) + self._audio_chunk_count += 1 + if self._audio_chunk_count % 50 == 1: + logger.debug(f"{self} sent audio chunk #{self._audio_chunk_count} ({len(audio)} bytes)") + except Exception as e: + logger.error(f"{self} failed to send audio: {e}") + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: - if self._websocket and self._ready and not self._bot_speaking: - try: - await self._websocket.send( - json.dumps({"type": "input_audio_buffer.append", "audio": base64.b64encode(audio).decode("ascii")}) - ) - await self._websocket.send(json.dumps({"type": "input_audio_buffer.commit"})) - self._audio_chunk_count += 1 - if self._audio_chunk_count % 50 == 1: - logger.debug(f"{self} sent audio chunk #{self._audio_chunk_count} ({len(audio)} bytes)") - except Exception as e: - logger.error(f"{self} failed to send audio: {e}") - elif not self._ready: - logger.warning(f"{self} audio dropped — not ready") + if not self._websocket or not self._ready: + if not self._ready: + logger.warning(f"{self} audio dropped — not ready") + yield None + return + + if not self._user_vad_active: + # Buffer into rolling pre-speech window; do not send silence to Parakeet + self._pre_speech_buffer.append(audio) + yield None + return + + await self._send_audio(audio) yield None # -- VAD handling -- @@ -122,11 +140,16 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, BotStartedSpeakingFrame): - self._bot_speaking = True - elif isinstance(frame, BotStoppedSpeakingFrame): - self._bot_speaking = False + if isinstance(frame, VADUserStartedSpeakingFrame): + self._user_vad_active = True + # Replay pre-speech buffer so Parakeet gets the audio just before VAD fired + if self._pre_speech_buffer: + logger.debug(f"{self} flushing {len(self._pre_speech_buffer)} pre-speech chunks") + for chunk in list(self._pre_speech_buffer): + await self._send_audio(chunk) + self._pre_speech_buffer.clear() elif isinstance(frame, VADUserStoppedSpeakingFrame): + self._user_vad_active = False self._finalize_requested = True self.request_finalize() await self.start_processing_metrics() From 6203099c9bb69d1279d03bd1d1e4b65b72d71610 Mon Sep 17 00:00:00 2001 From: Katrina Date: Wed, 29 Apr 2026 13:04:41 -0400 Subject: [PATCH 06/11] Revert "try to fix silence issues" This reverts commit 0fcd745b39c21b8606e4ee926f65e13b32b91899. --- src/eva/assistant/pipeline/nvidia_stt.py | 67 ++++++++---------------- 1 file changed, 22 insertions(+), 45 deletions(-) diff --git a/src/eva/assistant/pipeline/nvidia_stt.py b/src/eva/assistant/pipeline/nvidia_stt.py index a4393a3f..adf080d1 100644 --- a/src/eva/assistant/pipeline/nvidia_stt.py +++ b/src/eva/assistant/pipeline/nvidia_stt.py @@ -3,11 +3,8 @@ The server streams sentence-level `completed` transcription events as the user speaks. Finalization is driven entirely by Pipecat VAD: -- Audio is only sent to Parakeet while VAD is active (`_user_vad_active=True`). - A rolling pre-speech buffer (~400ms) is replayed when VAD fires so that the - audio captured just before the VAD threshold is not lost. -- Incoming `completed` events are accumulated in `_transcript_parts` and - forwarded as InterimTranscriptionFrame. +- While VAD is active, incoming `completed` events are accumulated in + `_transcript_parts` and forwarded as InterimTranscriptionFrame. - When VAD fires (VADUserStoppedSpeakingFrame), we finalize immediately: - If the buffer is already non-empty (server was ahead of VAD), flush now. - Otherwise set `_finalize_requested` and emit on the next `completed`. @@ -18,7 +15,6 @@ import json import ssl import time -from collections import deque from collections.abc import AsyncGenerator from urllib.parse import urlparse @@ -26,13 +22,14 @@ import websockets from loguru import logger from pipecat.frames.frames import ( + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, CancelFrame, EndFrame, Frame, InterimTranscriptionFrame, StartFrame, TranscriptionFrame, - VADUserStartedSpeakingFrame, VADUserStoppedSpeakingFrame, ) from pipecat.processors.frame_processor import FrameDirection @@ -79,9 +76,7 @@ def __init__( self._websocket = None self._receive_task: asyncio.Task | None = None self._ready = False - self._user_vad_active = False - # Rolling pre-speech buffer: ~400ms at 20ms/chunk - self._pre_speech_buffer: deque[bytes] = deque(maxlen=20) + self._bot_speaking = False self._finalize_requested = False self._transcript_parts: list[str] = [] @@ -106,33 +101,20 @@ async def cancel(self, frame: CancelFrame): _audio_chunk_count: int = 0 - async def _send_audio(self, audio: bytes): - """Send a single audio chunk to Parakeet (append + commit).""" - try: - await self._websocket.send( - json.dumps({"type": "input_audio_buffer.append", "audio": base64.b64encode(audio).decode("ascii")}) - ) - await self._websocket.send(json.dumps({"type": "input_audio_buffer.commit"})) - self._audio_chunk_count += 1 - if self._audio_chunk_count % 50 == 1: - logger.debug(f"{self} sent audio chunk #{self._audio_chunk_count} ({len(audio)} bytes)") - except Exception as e: - logger.error(f"{self} failed to send audio: {e}") - async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: - if not self._websocket or not self._ready: - if not self._ready: - logger.warning(f"{self} audio dropped — not ready") - yield None - return - - if not self._user_vad_active: - # Buffer into rolling pre-speech window; do not send silence to Parakeet - self._pre_speech_buffer.append(audio) - yield None - return - - await self._send_audio(audio) + if self._websocket and self._ready and not self._bot_speaking: + try: + await self._websocket.send( + json.dumps({"type": "input_audio_buffer.append", "audio": base64.b64encode(audio).decode("ascii")}) + ) + await self._websocket.send(json.dumps({"type": "input_audio_buffer.commit"})) + self._audio_chunk_count += 1 + if self._audio_chunk_count % 50 == 1: + logger.debug(f"{self} sent audio chunk #{self._audio_chunk_count} ({len(audio)} bytes)") + except Exception as e: + logger.error(f"{self} failed to send audio: {e}") + elif not self._ready: + logger.warning(f"{self} audio dropped — not ready") yield None # -- VAD handling -- @@ -140,16 +122,11 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, VADUserStartedSpeakingFrame): - self._user_vad_active = True - # Replay pre-speech buffer so Parakeet gets the audio just before VAD fired - if self._pre_speech_buffer: - logger.debug(f"{self} flushing {len(self._pre_speech_buffer)} pre-speech chunks") - for chunk in list(self._pre_speech_buffer): - await self._send_audio(chunk) - self._pre_speech_buffer.clear() + if isinstance(frame, BotStartedSpeakingFrame): + self._bot_speaking = True + elif isinstance(frame, BotStoppedSpeakingFrame): + self._bot_speaking = False elif isinstance(frame, VADUserStoppedSpeakingFrame): - self._user_vad_active = False self._finalize_requested = True self.request_finalize() await self.start_processing_metrics() From 3dad2b826caecf0acf5464abf10f307ab1776411 Mon Sep 17 00:00:00 2001 From: Katrina Date: Wed, 29 Apr 2026 13:43:30 -0400 Subject: [PATCH 07/11] try new vad handling --- src/eva/assistant/pipeline/nvidia_stt.py | 132 ++++++++++++++++++----- 1 file changed, 107 insertions(+), 25 deletions(-) diff --git a/src/eva/assistant/pipeline/nvidia_stt.py b/src/eva/assistant/pipeline/nvidia_stt.py index adf080d1..39d8028d 100644 --- a/src/eva/assistant/pipeline/nvidia_stt.py +++ b/src/eva/assistant/pipeline/nvidia_stt.py @@ -3,8 +3,15 @@ The server streams sentence-level `completed` transcription events as the user speaks. Finalization is driven entirely by Pipecat VAD: -- While VAD is active, incoming `completed` events are accumulated in - `_transcript_parts` and forwarded as InterimTranscriptionFrame. +- Audio is only sent to Parakeet while the audio gate is open. The gate opens + on VADUserStartedSpeakingFrame (flushing a rolling pre-speech buffer) and + stays open through VADUserStoppedSpeakingFrame so Parakeet receives the + trailing silence it needs to emit a `completed` event. The gate closes once + the transcript is finalized (or after a safety timeout). +- A keepalive sends silent audio during long idle periods (e.g. bot speaking) + to prevent the Parakeet WebSocket from closing. +- Incoming `completed` events are accumulated in `_transcript_parts` and + forwarded as InterimTranscriptionFrame. - When VAD fires (VADUserStoppedSpeakingFrame), we finalize immediately: - If the buffer is already non-empty (server was ahead of VAD), flush now. - Otherwise set `_finalize_requested` and emit on the next `completed`. @@ -15,27 +22,33 @@ import json import ssl import time +from collections import deque from collections.abc import AsyncGenerator from urllib.parse import urlparse import httpx import websockets -from loguru import logger from pipecat.frames.frames import ( - BotStartedSpeakingFrame, - BotStoppedSpeakingFrame, CancelFrame, EndFrame, Frame, InterimTranscriptionFrame, StartFrame, TranscriptionFrame, + VADUserStartedSpeakingFrame, VADUserStoppedSpeakingFrame, ) from pipecat.processors.frame_processor import FrameDirection from pipecat.services.settings import STTSettings from pipecat.services.stt_service import WebsocketSTTService +from eva.utils.logging import get_logger + +logger = get_logger(__name__) +# Maximum seconds of trailing silence sent to Parakeet after VAD fires stop, +# waiting for a `completed` event before force-finalizing. +_FINALIZE_TIMEOUT_SECS = 3.0 + def current_time_ms(): return str(int(round(time.time() * 1000))) @@ -67,6 +80,10 @@ def __init__( super().__init__( sample_rate=sample_rate, settings=STTSettings(model=None, language=None), + # Send a silent keepalive every 10s after 15s of no audio, so the + # Parakeet WebSocket doesn't close during long bot-speech turns. + keepalive_timeout=15.0, + keepalive_interval=10.0, **kwargs, ) self._url = url @@ -76,8 +93,11 @@ def __init__( self._websocket = None self._receive_task: asyncio.Task | None = None self._ready = False - self._bot_speaking = False + self._audio_gate_open = False + # Rolling pre-speech buffer: ~400ms at 20ms/chunk + self._pre_speech_buffer: deque[bytes] = deque(maxlen=20) self._finalize_requested = False + self._finalize_timeout_task: asyncio.Task | None = None self._transcript_parts: list[str] = [] def can_generate_metrics(self) -> bool: @@ -101,39 +121,96 @@ async def cancel(self, frame: CancelFrame): _audio_chunk_count: int = 0 + async def _send_audio(self, audio: bytes): + """Send a single audio chunk to Parakeet (append + commit).""" + try: + await self._websocket.send( + json.dumps({"type": "input_audio_buffer.append", "audio": base64.b64encode(audio).decode("ascii")}) + ) + await self._websocket.send(json.dumps({"type": "input_audio_buffer.commit"})) + self._audio_chunk_count += 1 + if self._audio_chunk_count % 50 == 1: + logger.debug(f"{self} sent audio chunk #{self._audio_chunk_count} ({len(audio)} bytes)") + except Exception as e: + logger.error(f"{self} failed to send audio: {e}") + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: - if self._websocket and self._ready and not self._bot_speaking: - try: - await self._websocket.send( - json.dumps({"type": "input_audio_buffer.append", "audio": base64.b64encode(audio).decode("ascii")}) - ) - await self._websocket.send(json.dumps({"type": "input_audio_buffer.commit"})) - self._audio_chunk_count += 1 - if self._audio_chunk_count % 50 == 1: - logger.debug(f"{self} sent audio chunk #{self._audio_chunk_count} ({len(audio)} bytes)") - except Exception as e: - logger.error(f"{self} failed to send audio: {e}") - elif not self._ready: - logger.warning(f"{self} audio dropped — not ready") + if not self._websocket or not self._ready: + if not self._ready: + logger.warning(f"{self} audio dropped — not ready") + yield None + return + + if not self._audio_gate_open: + # Buffer into rolling pre-speech window; do not send to Parakeet. + self._pre_speech_buffer.append(audio) + yield None + return + + await self._send_audio(audio) yield None + # -- Keepalive -- + + async def _send_keepalive(self, silence: bytes): + """Wrap silent PCM in Parakeet's append+commit protocol.""" + await self._send_audio(silence) + # -- VAD handling -- async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, BotStartedSpeakingFrame): - self._bot_speaking = True - elif isinstance(frame, BotStoppedSpeakingFrame): - self._bot_speaking = False + if isinstance(frame, VADUserStartedSpeakingFrame): + self._audio_gate_open = True + # Replay pre-speech buffer so Parakeet gets audio just before VAD fired. + if self._pre_speech_buffer: + logger.debug(f"{self} flushing {len(self._pre_speech_buffer)} pre-speech chunks") + for chunk in list(self._pre_speech_buffer): + await self._send_audio(chunk) + self._pre_speech_buffer.clear() elif isinstance(frame, VADUserStoppedSpeakingFrame): + # Keep the audio gate OPEN so Parakeet receives trailing silence + # needed to trigger its sentence-completion (`completed`) event. + # The gate is closed in _close_audio_gate() once we finalize. self._finalize_requested = True self.request_finalize() await self.start_processing_metrics() - # Flush immediately if the server already sent completed chunks - # before VAD fired — no further completed message will arrive. if self._transcript_parts: await self._emit_final_transcript() + else: + # Start a safety timeout — if Parakeet doesn't send `completed` + # within a few seconds, force-finalize to stop silence flooding. + self._start_finalize_timeout() + + def _start_finalize_timeout(self): + """Start (or restart) the finalize safety timeout.""" + if self._finalize_timeout_task: + self._finalize_timeout_task.cancel() + self._finalize_timeout_task = self.create_task(self._finalize_timeout_handler()) + + async def _cancel_finalize_timeout(self): + """Cancel any pending finalize timeout.""" + if self._finalize_timeout_task: + await self.cancel_task(self._finalize_timeout_task) + self._finalize_timeout_task = None + + async def _finalize_timeout_handler(self): + """Force-finalize after trailing silence timeout.""" + await asyncio.sleep(_FINALIZE_TIMEOUT_SECS) + if self._finalize_requested: + logger.warning(f"{self} finalize timeout after {_FINALIZE_TIMEOUT_SECS}s — force-finalizing") + if self._transcript_parts: + await self._emit_final_transcript() + else: + # Ghost turn — no transcript arrived. + self._finalize_requested = False + self._close_audio_gate() + self.confirm_finalize() + + def _close_audio_gate(self): + """Close the audio gate so subsequent silence goes to the pre-speech buffer.""" + self._audio_gate_open = False # -- Connection management -- @@ -146,6 +223,7 @@ async def _connect(self): async def _disconnect(self): await super()._disconnect() + await self._cancel_finalize_timeout() if self._receive_task: await self.cancel_task(self._receive_task) @@ -295,6 +373,8 @@ async def _handle_completed(self, data: dict): else: logger.debug(f"{self} ghost turn (empty completed)") self._finalize_requested = False + self._close_audio_gate() + await self._cancel_finalize_timeout() self.confirm_finalize() async def _emit_final_transcript(self): @@ -302,6 +382,8 @@ async def _emit_final_transcript(self): full_transcript = " ".join(self._transcript_parts) self._transcript_parts = [] self._finalize_requested = False + self._close_audio_gate() + await self._cancel_finalize_timeout() logger.info(f"{self} final transcript: {full_transcript}") self.confirm_finalize() await self.push_frame( From d89cdb0377f57541c12dafb7de628ec9d839964a Mon Sep 17 00:00:00 2001 From: Katrina Date: Wed, 29 Apr 2026 14:01:59 -0400 Subject: [PATCH 08/11] try to fix keepalive and log user/assistant intended text --- src/eva/assistant/pipeline/nvidia_stt.py | 35 +++++++++++++++++++----- src/eva/user_simulator/client.py | 9 +++++- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/src/eva/assistant/pipeline/nvidia_stt.py b/src/eva/assistant/pipeline/nvidia_stt.py index 39d8028d..0eee3b85 100644 --- a/src/eva/assistant/pipeline/nvidia_stt.py +++ b/src/eva/assistant/pipeline/nvidia_stt.py @@ -29,6 +29,7 @@ import httpx import websockets from pipecat.frames.frames import ( + AudioRawFrame, CancelFrame, EndFrame, Frame, @@ -117,10 +118,35 @@ async def cancel(self, frame: CancelFrame): await super().cancel(frame) await self._disconnect() - # -- Audio sending -- + # -- Audio processing -- _audio_chunk_count: int = 0 + async def process_audio_frame(self, frame: AudioRawFrame, direction: FrameDirection): + """Override base class to only reset keepalive timer when actually sending. + + The base STTService.process_audio_frame unconditionally resets + ``_last_audio_time`` on every audio frame — including silence during + bot speech. This prevents the keepalive from ever firing, so the + Parakeet WebSocket dies during long bot turns. + + We skip the ``_last_audio_time`` update when the gate is closed and + instead just buffer audio into the pre-speech deque via run_stt. + """ + if self._muted: + return + + if self._audio_gate_open: + # Real user audio — let the base class update _last_audio_time + # and call run_stt normally. + await super().process_audio_frame(frame, direction) + else: + # Gate closed (bot speaking / inter-turn gap). + # Do NOT touch _last_audio_time so the keepalive timer keeps + # ticking. Just buffer into the pre-speech deque. + if frame.audio: + self._pre_speech_buffer.append(frame.audio) + async def _send_audio(self, audio: bytes): """Send a single audio chunk to Parakeet (append + commit).""" try: @@ -141,12 +167,6 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: yield None return - if not self._audio_gate_open: - # Buffer into rolling pre-speech window; do not send to Parakeet. - self._pre_speech_buffer.append(audio) - yield None - return - await self._send_audio(audio) yield None @@ -154,6 +174,7 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: async def _send_keepalive(self, silence: bytes): """Wrap silent PCM in Parakeet's append+commit protocol.""" + logger.debug(f"{self} sending keepalive silence ({len(silence)} bytes)") await self._send_audio(silence) # -- VAD handling -- diff --git a/src/eva/user_simulator/client.py b/src/eva/user_simulator/client.py index bb554300..31be69e5 100644 --- a/src/eva/user_simulator/client.py +++ b/src/eva/user_simulator/client.py @@ -23,7 +23,7 @@ from eva.user_simulator.event_logger import ElevenLabsEventLogger from eva.user_simulator.perturbation import AudioPerturbator from eva.utils.audio_utils import save_pcm_as_wav -from eva.utils.logging import get_logger +from eva.utils.logging import current_record_id, get_logger from eva.utils.prompt_manager import PromptManager logger = get_logger(__name__) @@ -103,6 +103,10 @@ def __init__( self._consecutive_keepalive_count = 0 self._max_consecutive_keepalives = 12 # End call after this many pings without activity (2 minutes) + # Capture the worker's record ID so ElevenLabs callbacks (which run in + # a different thread) can restore it for per-record log routing. + self._record_id = current_record_id.get() + def _on_conversation_end(self, reason: str = "goodbye") -> None: """Signal conversation completion. @@ -444,6 +448,7 @@ def _on_user_speaks(self, response: str) -> None: Args: response: The text that the simulated user said """ + current_record_id.set(self._record_id) self._reset_keepalive_counter() logger.info(f"🎭 User (ElevenLabs): {response}") @@ -462,6 +467,7 @@ def _on_user_response_correction(self, original: str, corrected: str) -> None: original: Original response corrected: Corrected response """ + current_record_id.set(self._record_id) logger.debug(f"User response corrected: {original} -> {corrected}") self.event_logger.log_event( @@ -480,6 +486,7 @@ def _on_assistant_speaks(self, transcript: str) -> None: Args: transcript: The text that the assistant said """ + current_record_id.set(self._record_id) self._reset_keepalive_counter() logger.info(f"🤖 Assistant: {transcript}") From fda9e723f60069f04ef9958ea3ea457d85864704 Mon Sep 17 00:00:00 2001 From: Katrina Date: Wed, 29 Apr 2026 15:05:36 -0400 Subject: [PATCH 09/11] gate fixes --- src/eva/assistant/pipeline/nvidia_stt.py | 90 ++++++++++++------------ 1 file changed, 44 insertions(+), 46 deletions(-) diff --git a/src/eva/assistant/pipeline/nvidia_stt.py b/src/eva/assistant/pipeline/nvidia_stt.py index 0eee3b85..f8e27e0f 100644 --- a/src/eva/assistant/pipeline/nvidia_stt.py +++ b/src/eva/assistant/pipeline/nvidia_stt.py @@ -1,20 +1,28 @@ """NVIDIA Parakeet streaming speech-to-text service implementation. The server streams sentence-level `completed` transcription events as the user -speaks. Finalization is driven entirely by Pipecat VAD: - -- Audio is only sent to Parakeet while the audio gate is open. The gate opens - on VADUserStartedSpeakingFrame (flushing a rolling pre-speech buffer) and - stays open through VADUserStoppedSpeakingFrame so Parakeet receives the - trailing silence it needs to emit a `completed` event. The gate closes once - the transcript is finalized (or after a safety timeout). -- A keepalive sends silent audio during long idle periods (e.g. bot speaking) - to prevent the Parakeet WebSocket from closing. -- Incoming `completed` events are accumulated in `_transcript_parts` and - forwarded as InterimTranscriptionFrame. -- When VAD fires (VADUserStoppedSpeakingFrame), we finalize immediately: - - If the buffer is already non-empty (server was ahead of VAD), flush now. - - Otherwise set `_finalize_requested` and emit on the next `completed`. +speaks. Finalization is driven by Pipecat VAD: + +Audio gating strategy — bot-speaking gate: + - The audio gate is OPEN by default. All audio (speech + inter-turn silence) + flows to Parakeet so that VAD misses on short utterances don't cause stalls. + - The gate CLOSES when BotStartedSpeakingFrame arrives (no point sending bot + TTS audio or echo to an STT service). + - The gate OPENS when BotStoppedSpeakingFrame arrives, resuming normal flow. + - A keepalive sends silent audio during long bot-speech turns to prevent the + Parakeet WebSocket from closing. + - The keepalive timer is only reset when audio is actually sent to Parakeet + (gate open), not when gated frames arrive — this is achieved by overriding + ``process_audio_frame`` so that gated frames skip the base-class timer + update. + +Finalization: + - Incoming `completed` events are accumulated in ``_transcript_parts`` and + forwarded as InterimTranscriptionFrame. + - When VAD fires (VADUserStoppedSpeakingFrame), we finalize immediately: + - If the buffer is already non-empty (server was ahead of VAD), flush now. + - Otherwise set ``_finalize_requested`` and emit on the next ``completed``. + - A safety timeout force-finalizes if ``completed`` never arrives. """ import asyncio @@ -22,7 +30,6 @@ import json import ssl import time -from collections import deque from collections.abc import AsyncGenerator from urllib.parse import urlparse @@ -30,6 +37,8 @@ import websockets from pipecat.frames.frames import ( AudioRawFrame, + BotStartedSpeakingFrame, + BotStoppedSpeakingFrame, CancelFrame, EndFrame, Frame, @@ -94,9 +103,9 @@ def __init__( self._websocket = None self._receive_task: asyncio.Task | None = None self._ready = False - self._audio_gate_open = False - # Rolling pre-speech buffer: ~400ms at 20ms/chunk - self._pre_speech_buffer: deque[bytes] = deque(maxlen=20) + # Gate starts OPEN — audio flows to Parakeet by default. + # Only closed while the bot is speaking. + self._audio_gate_open = True self._finalize_requested = False self._finalize_timeout_task: asyncio.Task | None = None self._transcript_parts: list[str] = [] @@ -130,22 +139,18 @@ async def process_audio_frame(self, frame: AudioRawFrame, direction: FrameDirect bot speech. This prevents the keepalive from ever firing, so the Parakeet WebSocket dies during long bot turns. - We skip the ``_last_audio_time`` update when the gate is closed and - instead just buffer audio into the pre-speech deque via run_stt. + When the gate is closed (bot speaking) we skip the base-class call + entirely so the keepalive timer keeps ticking. """ if self._muted: return if self._audio_gate_open: - # Real user audio — let the base class update _last_audio_time + # Gate open — let the base class update _last_audio_time # and call run_stt normally. await super().process_audio_frame(frame, direction) - else: - # Gate closed (bot speaking / inter-turn gap). - # Do NOT touch _last_audio_time so the keepalive timer keeps - # ticking. Just buffer into the pre-speech deque. - if frame.audio: - self._pre_speech_buffer.append(frame.audio) + # Gate closed (bot speaking) — don't touch _last_audio_time so the + # keepalive timer keeps ticking. Audio is intentionally discarded. async def _send_audio(self, audio: bytes): """Send a single audio chunk to Parakeet (append + commit).""" @@ -177,23 +182,23 @@ async def _send_keepalive(self, silence: bytes): logger.debug(f"{self} sending keepalive silence ({len(silence)} bytes)") await self._send_audio(silence) - # -- VAD handling -- + # -- Frame handling (bot-speaking gate + VAD finalization) -- async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, VADUserStartedSpeakingFrame): + # --- Bot-speaking gate --- + if isinstance(frame, BotStartedSpeakingFrame): + self._audio_gate_open = False + logger.debug(f"{self} audio gate CLOSED (bot speaking)") + elif isinstance(frame, BotStoppedSpeakingFrame): self._audio_gate_open = True - # Replay pre-speech buffer so Parakeet gets audio just before VAD fired. - if self._pre_speech_buffer: - logger.debug(f"{self} flushing {len(self._pre_speech_buffer)} pre-speech chunks") - for chunk in list(self._pre_speech_buffer): - await self._send_audio(chunk) - self._pre_speech_buffer.clear() + logger.debug(f"{self} audio gate OPEN (bot stopped)") + + # --- VAD-based finalization --- + elif isinstance(frame, VADUserStartedSpeakingFrame): + pass # Gate is already open; nothing extra needed. elif isinstance(frame, VADUserStoppedSpeakingFrame): - # Keep the audio gate OPEN so Parakeet receives trailing silence - # needed to trigger its sentence-completion (`completed`) event. - # The gate is closed in _close_audio_gate() once we finalize. self._finalize_requested = True self.request_finalize() await self.start_processing_metrics() @@ -201,7 +206,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self._emit_final_transcript() else: # Start a safety timeout — if Parakeet doesn't send `completed` - # within a few seconds, force-finalize to stop silence flooding. + # within a few seconds, force-finalize. self._start_finalize_timeout() def _start_finalize_timeout(self): @@ -226,13 +231,8 @@ async def _finalize_timeout_handler(self): else: # Ghost turn — no transcript arrived. self._finalize_requested = False - self._close_audio_gate() self.confirm_finalize() - def _close_audio_gate(self): - """Close the audio gate so subsequent silence goes to the pre-speech buffer.""" - self._audio_gate_open = False - # -- Connection management -- async def _connect(self): @@ -394,7 +394,6 @@ async def _handle_completed(self, data: dict): else: logger.debug(f"{self} ghost turn (empty completed)") self._finalize_requested = False - self._close_audio_gate() await self._cancel_finalize_timeout() self.confirm_finalize() @@ -403,7 +402,6 @@ async def _emit_final_transcript(self): full_transcript = " ".join(self._transcript_parts) self._transcript_parts = [] self._finalize_requested = False - self._close_audio_gate() await self._cancel_finalize_timeout() logger.info(f"{self} final transcript: {full_transcript}") self.confirm_finalize() From 5671bafc0522f5e3c9a4be3733e55886908da658 Mon Sep 17 00:00:00 2001 From: Katrina Date: Wed, 29 Apr 2026 15:52:21 -0400 Subject: [PATCH 10/11] fix failing issues --- src/eva/assistant/pipeline/nvidia_stt.py | 114 +++++++++++++++++------ 1 file changed, 86 insertions(+), 28 deletions(-) diff --git a/src/eva/assistant/pipeline/nvidia_stt.py b/src/eva/assistant/pipeline/nvidia_stt.py index f8e27e0f..181ce6c3 100644 --- a/src/eva/assistant/pipeline/nvidia_stt.py +++ b/src/eva/assistant/pipeline/nvidia_stt.py @@ -1,28 +1,23 @@ """NVIDIA Parakeet streaming speech-to-text service implementation. -The server streams sentence-level `completed` transcription events as the user -speaks. Finalization is driven by Pipecat VAD: - -Audio gating strategy — bot-speaking gate: - - The audio gate is OPEN by default. All audio (speech + inter-turn silence) - flows to Parakeet so that VAD misses on short utterances don't cause stalls. - - The gate CLOSES when BotStartedSpeakingFrame arrives (no point sending bot - TTS audio or echo to an STT service). - - The gate OPENS when BotStoppedSpeakingFrame arrives, resuming normal flow. - - A keepalive sends silent audio during long bot-speech turns to prevent the - Parakeet WebSocket from closing. - - The keepalive timer is only reset when audio is actually sent to Parakeet - (gate open), not when gated frames arrive — this is achieved by overriding - ``process_audio_frame`` so that gated frames skip the base-class timer - update. - -Finalization: - - Incoming `completed` events are accumulated in ``_transcript_parts`` and - forwarded as InterimTranscriptionFrame. - - When VAD fires (VADUserStoppedSpeakingFrame), we finalize immediately: - - If the buffer is already non-empty (server was ahead of VAD), flush now. - - Otherwise set ``_finalize_requested`` and emit on the next ``completed``. - - A safety timeout force-finalizes if ``completed`` never arrives. +Audio gating strategy — bot-speaking gate with buffer clearing: + - The audio gate is OPEN by default. Audio flows to Parakeet whenever the + bot is not speaking. + - The gate CLOSES on BotStartedSpeakingFrame. At the same time we send + ``input_audio_buffer.clear`` to Parakeet and discard any buffered + transcript parts so that stale audio from the previous inter-turn gap + does not bleed into the next user turn. + - The gate OPENS on BotStoppedSpeakingFrame, resuming normal audio flow. + - A keepalive sends silent audio during long bot-speech turns to prevent + the Parakeet WebSocket from closing. + +Finalization (VAD-primary, Parakeet-fallback): + - When VAD fires stop, finalize immediately or wait for the next + ``completed`` event (primary path). + - If Parakeet emits a non-empty ``completed`` and VAD has NOT fired, a + fallback timer starts. If VAD still hasn't fired when the timer + expires, we auto-finalize using Parakeet's transcript — this handles + the case where Silero VAD misses a short utterance. """ import asyncio @@ -55,10 +50,15 @@ from eva.utils.logging import get_logger logger = get_logger(__name__) -# Maximum seconds of trailing silence sent to Parakeet after VAD fires stop, -# waiting for a `completed` event before force-finalizing. + +# Seconds after VAD stop to wait for a `completed` before force-finalizing. _FINALIZE_TIMEOUT_SECS = 3.0 +# Seconds after a Parakeet `completed` (with no VAD) before auto-finalizing. +# Gives VAD a chance to catch up; if it doesn't, Parakeet's own sentence +# detection serves as the fallback signal. +_FALLBACK_FINALIZE_SECS = 1.5 + def current_time_ms(): return str(int(round(time.time() * 1000))) @@ -108,6 +108,7 @@ def __init__( self._audio_gate_open = True self._finalize_requested = False self._finalize_timeout_task: asyncio.Task | None = None + self._fallback_finalize_task: asyncio.Task | None = None self._transcript_parts: list[str] = [] def can_generate_metrics(self) -> bool: @@ -182,6 +183,21 @@ async def _send_keepalive(self, silence: bytes): logger.debug(f"{self} sending keepalive silence ({len(silence)} bytes)") await self._send_audio(silence) + # -- Parakeet buffer management -- + + async def _clear_parakeet_buffer(self): + """Send input_audio_buffer.clear to flush stale audio in Parakeet. + + Called when the bot starts speaking so that any accumulated inter-turn + silence doesn't produce stale ``completed`` events on the next turn. + """ + if self._websocket and self._ready: + try: + await self._websocket.send(json.dumps({"type": "input_audio_buffer.clear"})) + logger.debug(f"{self} cleared Parakeet audio buffer") + except Exception as e: + logger.error(f"{self} failed to clear audio buffer: {e}") + # -- Frame handling (bot-speaking gate + VAD finalization) -- async def process_frame(self, frame: Frame, direction: FrameDirection): @@ -190,15 +206,23 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): # --- Bot-speaking gate --- if isinstance(frame, BotStartedSpeakingFrame): self._audio_gate_open = False + # Flush stale audio from Parakeet so old transcripts don't bleed + # into the next user turn. + await self._clear_parakeet_buffer() + self._transcript_parts.clear() + await self._cancel_fallback_finalize() logger.debug(f"{self} audio gate CLOSED (bot speaking)") elif isinstance(frame, BotStoppedSpeakingFrame): self._audio_gate_open = True logger.debug(f"{self} audio gate OPEN (bot stopped)") - # --- VAD-based finalization --- + # --- VAD-based finalization (primary path) --- elif isinstance(frame, VADUserStartedSpeakingFrame): - pass # Gate is already open; nothing extra needed. + # VAD detected speech — cancel any fallback timer since VAD is + # now in control of finalization. + await self._cancel_fallback_finalize() elif isinstance(frame, VADUserStoppedSpeakingFrame): + await self._cancel_fallback_finalize() self._finalize_requested = True self.request_finalize() await self.start_processing_metrics() @@ -209,6 +233,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): # within a few seconds, force-finalize. self._start_finalize_timeout() + # -- Finalize timeout (VAD fired but no completed from Parakeet) -- + def _start_finalize_timeout(self): """Start (or restart) the finalize safety timeout.""" if self._finalize_timeout_task: @@ -233,6 +259,32 @@ async def _finalize_timeout_handler(self): self._finalize_requested = False self.confirm_finalize() + # -- Fallback finalize (Parakeet completed but VAD never fired) -- + + def _start_fallback_finalize(self): + """Start a fallback timer to auto-finalize if VAD doesn't fire.""" + if self._fallback_finalize_task: + self._fallback_finalize_task.cancel() + self._fallback_finalize_task = self.create_task(self._fallback_finalize_handler()) + + async def _cancel_fallback_finalize(self): + """Cancel the fallback finalize timer.""" + if self._fallback_finalize_task: + await self.cancel_task(self._fallback_finalize_task) + self._fallback_finalize_task = None + + async def _fallback_finalize_handler(self): + """Auto-finalize using Parakeet's transcript when VAD missed the speech.""" + await asyncio.sleep(_FALLBACK_FINALIZE_SECS) + if self._transcript_parts and not self._finalize_requested: + logger.warning( + f"{self} VAD miss — fallback finalizing with Parakeet transcript after {_FALLBACK_FINALIZE_SECS}s" + ) + self._finalize_requested = True + self.request_finalize() + await self.start_processing_metrics() + await self._emit_final_transcript() + # -- Connection management -- async def _connect(self): @@ -245,6 +297,7 @@ async def _connect(self): async def _disconnect(self): await super()._disconnect() await self._cancel_finalize_timeout() + await self._cancel_fallback_finalize() if self._receive_task: await self.cancel_task(self._receive_task) @@ -381,12 +434,16 @@ async def _handle_completed(self, data: dict): if transcript: self._transcript_parts.append(transcript) if self._finalize_requested: + # VAD already fired — finalize immediately. await self._emit_final_transcript() else: - logger.debug(f"{self} buffered: {transcript}") + # VAD hasn't fired yet. Push as interim and start the + # fallback timer so we auto-finalize if VAD never fires. + logger.debug(f"{self} buffered (no VAD yet): {transcript}") await self.push_frame( InterimTranscriptionFrame(transcript, self._user_id, current_time_ms(), language=None) ) + self._start_fallback_finalize() elif self._finalize_requested: # Empty completed after VAD fired (silence audio). if self._transcript_parts: @@ -403,6 +460,7 @@ async def _emit_final_transcript(self): self._transcript_parts = [] self._finalize_requested = False await self._cancel_finalize_timeout() + await self._cancel_fallback_finalize() logger.info(f"{self} final transcript: {full_transcript}") self.confirm_finalize() await self.push_frame( From 2abe709e78e82c72bae92d61589e97b39dc932ab Mon Sep 17 00:00:00 2001 From: Katrina Date: Wed, 29 Apr 2026 16:28:36 -0400 Subject: [PATCH 11/11] fix vad --- src/eva/assistant/pipeline/nvidia_stt.py | 43 +++++++++++------------- 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/src/eva/assistant/pipeline/nvidia_stt.py b/src/eva/assistant/pipeline/nvidia_stt.py index 181ce6c3..50cc920b 100644 --- a/src/eva/assistant/pipeline/nvidia_stt.py +++ b/src/eva/assistant/pipeline/nvidia_stt.py @@ -1,12 +1,11 @@ """NVIDIA Parakeet streaming speech-to-text service implementation. -Audio gating strategy — bot-speaking gate with buffer clearing: +Audio gating strategy — bot-speaking gate: - The audio gate is OPEN by default. Audio flows to Parakeet whenever the bot is not speaking. - - The gate CLOSES on BotStartedSpeakingFrame. At the same time we send - ``input_audio_buffer.clear`` to Parakeet and discard any buffered - transcript parts so that stale audio from the previous inter-turn gap - does not bleed into the next user turn. + - The gate CLOSES on BotStartedSpeakingFrame. Any buffered transcript + parts are discarded so that stale Parakeet completions from the + inter-turn silence period do not bleed into the next user turn. - The gate OPENS on BotStoppedSpeakingFrame, resuming normal audio flow. - A keepalive sends silent audio during long bot-speech turns to prevent the Parakeet WebSocket from closing. @@ -183,21 +182,6 @@ async def _send_keepalive(self, silence: bytes): logger.debug(f"{self} sending keepalive silence ({len(silence)} bytes)") await self._send_audio(silence) - # -- Parakeet buffer management -- - - async def _clear_parakeet_buffer(self): - """Send input_audio_buffer.clear to flush stale audio in Parakeet. - - Called when the bot starts speaking so that any accumulated inter-turn - silence doesn't produce stale ``completed`` events on the next turn. - """ - if self._websocket and self._ready: - try: - await self._websocket.send(json.dumps({"type": "input_audio_buffer.clear"})) - logger.debug(f"{self} cleared Parakeet audio buffer") - except Exception as e: - logger.error(f"{self} failed to clear audio buffer: {e}") - # -- Frame handling (bot-speaking gate + VAD finalization) -- async def process_frame(self, frame: Frame, direction: FrameDirection): @@ -206,9 +190,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): # --- Bot-speaking gate --- if isinstance(frame, BotStartedSpeakingFrame): self._audio_gate_open = False - # Flush stale audio from Parakeet so old transcripts don't bleed - # into the next user turn. - await self._clear_parakeet_buffer() + # Discard any stale transcript parts so old Parakeet completions + # from the inter-turn silence period don't bleed into the next turn. self._transcript_parts.clear() await self._cancel_fallback_finalize() logger.debug(f"{self} audio gate CLOSED (bot speaking)") @@ -274,17 +257,29 @@ async def _cancel_fallback_finalize(self): self._fallback_finalize_task = None async def _fallback_finalize_handler(self): - """Auto-finalize using Parakeet's transcript when VAD missed the speech.""" + """Auto-finalize using Parakeet's transcript when VAD missed the speech. + + Because VAD never fired, the downstream LLMUserAggregator has no + active user turn. We push synthetic VAD start/stop frames so the + aggregator sees a proper turn lifecycle and triggers the LLM. + """ await asyncio.sleep(_FALLBACK_FINALIZE_SECS) if self._transcript_parts and not self._finalize_requested: logger.warning( f"{self} VAD miss — fallback finalizing with Parakeet transcript after {_FALLBACK_FINALIZE_SECS}s" ) + # Push synthetic VAD start so the aggregator opens a user turn. + await self.push_frame(VADUserStartedSpeakingFrame()) + self._finalize_requested = True self.request_finalize() await self.start_processing_metrics() await self._emit_final_transcript() + # Push synthetic VAD stop so the aggregator closes the turn + # and triggers the LLM. + await self.push_frame(VADUserStoppedSpeakingFrame()) + # -- Connection management -- async def _connect(self):