Conversation
📝 WalkthroughWalkthroughA new Mistral Voxtral STT (Speech-to-Text) plugin is introduced for the vision-agents ecosystem, featuring real-time WebSocket-based transcription with language detection and partial transcript support. The PR includes the STT implementation, documentation, an example integration combining Mistral STT with Deepgram TTS and Gemini LLM, project configurations, and integration tests. Changes
Sequence DiagramsequenceDiagram
participant User
participant Agent as Agent (Gemini LLM)
participant STT as Mistral STT
participant Mistral as Mistral Voxtral API
participant TTS as Deepgram TTS
User->>Agent: Audio input stream
Agent->>STT: process_audio(pcm_data, participant)
STT->>STT: Resample PCM to target rate
STT->>Mistral: Stream audio via WebSocket
Mistral-->>STT: TranscriptionStreamTextDelta (partial)
STT->>STT: Emit partial transcript
STT->>Agent: Partial transcript event
Mistral-->>STT: TranscriptionStreamDone (final)
STT->>STT: Emit final transcript
STT->>Agent: Final transcript event
Agent->>Agent: Process transcript & generate response
Agent->>TTS: generate_speech(response_text)
TTS-->>Agent: Audio response
Agent->>User: Audio output stream
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@plugins/mistral/vision_agents/plugins/mistral/stt.py`:
- Around line 225-228: The current bare "except Exception as e" around await
self._connection.end_audio() should be replaced with specific exception
handlers: catch network/timeouts (e.g., ConnectionError and
asyncio.TimeoutError) and the Mistral library's specific exception (e.g.,
mistralai.MistralError or the appropriate error class from mistralai.errors),
logging the exception details via logger.warning; update imports (add asyncio
and the correct mistralai error symbol) and ensure the logger.warning message
includes the caught exception object for context instead of a broad catch-all.
- Around line 242-245: The cleanup currently uses a bare "except Exception as e"
around await self._connection.close(); replace it with handling of specific
exception types and ensure asyncio.CancelledError is not swallowed: import
asyncio, await self._connection.close() inside try, then except
asyncio.CancelledError: raise, and catch only likely concrete exceptions (e.g.,
OSError, RuntimeError, ConnectionError or the specific client library exception)
and log via logger.warning(f"Error closing Mistral connection: {e}"). Reference
the await self._connection.close() call and the logger.warning call when making
the change.
- Around line 155-159: In _handle_done (handling TranscriptionStreamDone), avoid
calling event.text.strip() directly because event.text may be None; first guard
for a falsy value (e.g., text = event.text or "" / use a ternary like event.text
if event.text else "") or check if event.text is truthy before calling .strip(),
then proceed to .strip() and return early if the resulting text is empty—update
the _handle_done implementation to mirror the safe checks used in
_handle_text_delta/_handle_error.
- Around line 114-117: Replace the bare "except Exception as e:" handlers in
_receive_loop and in close() with specific exception clauses: first catch
asyncio.CancelledError and re-raise (e.g., in _receive_loop), then catch
mistralai.models.MistralError to log "Mistral API error" and emit the error
event via _emit_error_event, and finally catch httpx.RequestError (covers
ConnectError/Timeout) to log "Network error" and emit the error event; ensure
you import mistralai.models.MistralError and httpx.RequestError and use those
exception types in the except blocks for the receive loop and both places in
close(); also, in the event handling inside _receive_loop, check "if
event.text:" before calling event.text.strip() to avoid calling strip on None.
🧹 Nitpick comments (2)
plugins/mistral/example/mistral_stt_example.py (2)
42-43: Commented-out code references an unimported module.The comment mentions
smart_turn.TurnDetection()butsmart_turnisn't imported. Users trying to uncomment this line will hit aNameError. Consider either adding the import in a comment or removing this line entirely and documenting the turn detection requirement elsewhere.📝 Suggested fix
llm=gemini.LLM("gemini-2.0-flash"), - # turn_detection=smart_turn.TurnDetection(), # Required since Mistral STT has no turn detection + # Note: Mistral STT has no built-in turn detection. Consider adding: + # from vision_agents.plugins import smart_turn + # turn_detection=smart_turn.TurnDetection(), )
47-59: Example hardcodes a 5-second sleep before finishing.The example joins a call, sleeps for 5 seconds, then immediately finishes. For a demonstration, this is rather brief and doesn't show any actual voice interaction. Consider adding a comment explaining this is just a minimal scaffolding example, or increasing the duration for a more realistic demo.
| except Exception as e: | ||
| logger.error(f"Error in Mistral receive loop: {e}") | ||
| if not self.closed: | ||
| self._emit_error_event(e, context="receive_loop") |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
mistralai python library WebSocket exceptions realtime API
💡 Result:
The official mistralai Python SDK does not use WebSockets for “realtime”. Its streaming features are Server‑Sent Events (SSE) exposed as a Python generator (and also a context manager)—so you won’t see “WebSocket exceptions” coming from mistralai itself. [1]
What to catch in mistralai (streaming or non-streaming)
1) API/HTTP errors (non‑2xx responses): catch models.MistralError (base class). It exposes message, status_code, headers, body, etc. [2]
2) Network/transport errors: the SDK uses httpx, so catch httpx.RequestError and common subclasses like httpx.ConnectError and httpx.TimeoutException. [2]
Pattern for SSE streaming with robust exception handling
import os
import httpx
from mistralai import Mistral, models
with Mistral(api_key=os.environ["MISTRAL_API_KEY"]) as client:
try:
stream = client.beta.conversations.start_stream(...)
with stream as events: # ensures the stream is closed [1]
for event in events:
handle(event)
except models.MistralError as e: # HTTP status / response problems [2]
print(e.status_code, e.message)
except httpx.RequestError as e: # network/timeout/connect errors [2]
print(type(e).__name__, str(e))If you are seeing WebSocket exceptions
That’s likely from a different “Realtime API” (e.g., OpenAI Realtime over WebSocket), not from the Mistral mistralai SDK—because mistralai streaming is SSE, not WebSocket. [1]
Sources:
[1] mistralai/client-python README — “Server-sent event streaming” section (generator + context manager)
[2] mistralai/client-python README — “Error Handling” section (MistralError, plus httpx network exceptions)
🏁 Script executed:
# Find the specific file and check for all except Exception instances
fd "stt.py" | grep mistral
# Check the exact lines mentioned in the review
if [ -f "plugins/mistral/vision_agents/plugins/mistral/stt.py" ]; then
echo "=== Lines 110-120 (around line 114-117) ==="
sed -n '110,120p' "plugins/mistral/vision_agents/plugins/mistral/stt.py"
echo -e "\n=== Lines 150-160 (around line 157) ==="
sed -n '150,160p' "plugins/mistral/vision_agents/plugins/mistral/stt.py"
echo -e "\n=== Lines 220-230 (around line 227) ==="
sed -n '220,230p' "plugins/mistral/vision_agents/plugins/mistral/stt.py"
echo -e "\n=== Lines 240-250 (around line 244) ==="
sed -n '240,250p' "plugins/mistral/vision_agents/plugins/mistral/stt.py"
echo -e "\n=== All 'except Exception' instances ==="
rg "except Exception" "plugins/mistral/vision_agents/plugins/mistral/stt.py" -n
fiRepository: GetStream/Vision-Agents
Length of output: 2171
🏁 Script executed:
# Check imports to understand what exceptions might be available from mistralai
fd "stt.py" --path "*mistral*" -x head -50 {} \;Repository: GetStream/Vision-Agents
Length of output: 282
🏁 Script executed:
# Get more context around each exception block to understand what exceptions might be raised
echo "=== Full _receive_loop method (lines 100-125) ==="
sed -n '100,125p' "plugins/mistral/vision_agents/plugins/mistral/stt.py"
echo -e "\n=== Full close method (lines 215-260) ==="
sed -n '215,260p' "plugins/mistral/vision_agents/plugins/mistral/stt.py"
echo -e "\n=== Imports section (first 40 lines) ==="
sed -n '1,40p' "plugins/mistral/vision_agents/plugins/mistral/stt.py"Repository: GetStream/Vision-Agents
Length of output: 3814
🏁 Script executed:
# Check if event.text can be None in the TranscriptionStreamDone event
# Look at how it's used elsewhere and what the event type definition might indicate
echo "=== Check _handle_done implementation and context ==="
sed -n '155,165p' "plugins/mistral/vision_agents/plugins/mistral/stt.py"
# Check if there are type hints or documentation about TranscriptionStreamDone.text
echo -e "\n=== Search for TranscriptionStreamDone usage in codebase ==="
rg "TranscriptionStreamDone" --type py -A 3 -B 1
# Check mistralai package docs or type stubs if available
echo -e "\n=== Check for mistralai realtime module files ==="
find . -type f -name "*.py" | xargs grep -l "TranscriptionStreamDone\|RealtimeConnection" | head -10Repository: GetStream/Vision-Agents
Length of output: 2167
🏁 Script executed:
# Check the mistralai package for what exceptions RealtimeConnection might raise
# Look at the imports and see if we can find any exception handling patterns elsewhere
echo "=== Check for exception handling patterns in other plugins ==="
rg "except.*Error" plugins/ --type py | head -20
echo -e "\n=== Check elevenlabs plugin for comparison ==="
sed -n '1,50p' "plugins/elevenlabs/vision_agents/plugins/elevenlabs/stt.py" | grep -A 10 "except"
# Search for imports of exceptions in the codebase
echo -e "\n=== Search for imported exceptions ==="
rg "from.*import.*Error\|from.*import.*Exception" plugins/mistral/ --type pyRepository: GetStream/Vision-Agents
Length of output: 2112
🏁 Script executed:
# Let's check if mistralai is installed in the environment and inspect its exception types
pip show mistralai 2>/dev/null && python3 << 'EOF'
try:
from mistralai.models import MistralError
from mistralai.extra.realtime import RealtimeConnection
import httpx
print("=== Mistralai exception types ===")
print(f"MistralError: {MistralError.__doc__}")
print(f"\nhttpx exceptions available:")
# List common httpx exceptions
httpx_exceptions = [
'ConnectError',
'ReadError',
'WriteError',
'PoolTimeout',
'TimeoutException',
'RequestError',
'HTTPError'
]
for exc in httpx_exceptions:
if hasattr(httpx, exc):
print(f" - httpx.{exc}")
# Check RealtimeConnection methods for clues about what they raise
print(f"\nRealtimeConnection methods:")
for method in dir(RealtimeConnection):
if not method.startswith('_'):
print(f" - {method}")
except ImportError as e:
print(f"Import error: {e}")
except Exception as e:
print(f"Error: {e}")
EOFRepository: GetStream/Vision-Agents
Length of output: 49
🏁 Script executed:
# Let's check what the actual exceptions might be by looking at the context
# The _receive_loop is iterating over connection.stream events
# Let's see if we can find what exceptions might be raised during event iteration
echo "=== Check async context for _receive_loop ==="
sed -n '90,120p' "plugins/mistral/vision_agents/plugins/mistral/stt.py"Repository: GetStream/Vision-Agents
Length of output: 1386
Replace bare except Exception as e: with specific exception handling per coding guidelines.
The code violates the guideline requiring specific exception handling (lines 114, 227, 244). Based on the mistralai library, catch:
mistralai.models.MistralErrorfor API/HTTP errorshttpx.RequestError(and subclasses likehttpx.ConnectError,httpx.TimeoutException) for network/transport errors
Example for the _receive_loop() method:
except asyncio.CancelledError:
logger.debug("Mistral receive loop cancelled")
raise
except MistralError as e:
logger.error(f"Mistral API error: {e}")
if not self.closed:
self._emit_error_event(e, context="receive_loop")
except httpx.RequestError as e:
logger.error(f"Network error in Mistral receive loop: {e}")
if not self.closed:
self._emit_error_event(e, context="receive_loop")Apply the same specificity to lines 227 and 244 in the close() method.
Additionally, line 157 calls event.text.strip() before checking for None; guard with if event.text: first.
🤖 Prompt for AI Agents
In `@plugins/mistral/vision_agents/plugins/mistral/stt.py` around lines 114 - 117,
Replace the bare "except Exception as e:" handlers in _receive_loop and in
close() with specific exception clauses: first catch asyncio.CancelledError and
re-raise (e.g., in _receive_loop), then catch mistralai.models.MistralError to
log "Mistral API error" and emit the error event via _emit_error_event, and
finally catch httpx.RequestError (covers ConnectError/Timeout) to log "Network
error" and emit the error event; ensure you import mistralai.models.MistralError
and httpx.RequestError and use those exception types in the except blocks for
the receive loop and both places in close(); also, in the event handling inside
_receive_loop, check "if event.text:" before calling event.text.strip() to avoid
calling strip on None.
| def _handle_done(self, event: TranscriptionStreamDone): | ||
| """Handle end-of-stream event with full transcript.""" | ||
| text = event.text.strip() | ||
| if not text: | ||
| return |
There was a problem hiding this comment.
Potential AttributeError if event.text is None.
Unlike _handle_text_delta which checks if not text: return before operating on the text, and _handle_error which uses a ternary guard, this method calls .strip() directly on event.text. If the API ever returns None, this will raise an AttributeError.
🛡️ Proposed fix
def _handle_done(self, event: TranscriptionStreamDone):
"""Handle end-of-stream event with full transcript."""
- text = event.text.strip()
- if not text:
+ if not event.text:
+ return
+ text = event.text.strip()
+ if not text:
return🤖 Prompt for AI Agents
In `@plugins/mistral/vision_agents/plugins/mistral/stt.py` around lines 155 - 159,
In _handle_done (handling TranscriptionStreamDone), avoid calling
event.text.strip() directly because event.text may be None; first guard for a
falsy value (e.g., text = event.text or "" / use a ternary like event.text if
event.text else "") or check if event.text is truthy before calling .strip(),
then proceed to .strip() and return early if the resulting text is empty—update
the _handle_done implementation to mirror the safe checks used in
_handle_text_delta/_handle_error.
| try: | ||
| await self._connection.end_audio() | ||
| except Exception as e: | ||
| logger.warning(f"Error signaling end of audio: {e}") |
There was a problem hiding this comment.
Avoid bare except Exception as e per coding guidelines.
This catch-all may mask specific issues with signaling end-of-audio. Consider catching more specific exceptions from the mistralai library or at minimum, network-related exceptions like ConnectionError or asyncio.TimeoutError.
As per coding guidelines: "Never write except Exception as e - use specific exception handling."
🤖 Prompt for AI Agents
In `@plugins/mistral/vision_agents/plugins/mistral/stt.py` around lines 225 - 228,
The current bare "except Exception as e" around await
self._connection.end_audio() should be replaced with specific exception
handlers: catch network/timeouts (e.g., ConnectionError and
asyncio.TimeoutError) and the Mistral library's specific exception (e.g.,
mistralai.MistralError or the appropriate error class from mistralai.errors),
logging the exception details via logger.warning; update imports (add asyncio
and the correct mistralai error symbol) and ensure the logger.warning message
includes the caught exception object for context instead of a broad catch-all.
| try: | ||
| await self._connection.close() | ||
| except Exception as e: | ||
| logger.warning(f"Error closing Mistral connection: {e}") |
There was a problem hiding this comment.
Avoid bare except Exception as e per coding guidelines.
Same issue here with connection close. Consider specific exception types for connection cleanup failures.
As per coding guidelines: "Never write except Exception as e - use specific exception handling."
🤖 Prompt for AI Agents
In `@plugins/mistral/vision_agents/plugins/mistral/stt.py` around lines 242 - 245,
The cleanup currently uses a bare "except Exception as e" around await
self._connection.close(); replace it with handling of specific exception types
and ensure asyncio.CancelledError is not swallowed: import asyncio, await
self._connection.close() inside try, then except asyncio.CancelledError: raise,
and catch only likely concrete exceptions (e.g., OSError, RuntimeError,
ConnectionError or the specific client library exception) and log via
logger.warning(f"Error closing Mistral connection: {e}"). Reference the await
self._connection.close() call and the logger.warning call when making the
change.
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@plugins/mistral/example/mistral_stt_example.py`:
- Around line 33-48: Update the docstrings for the public helper functions
create_agent and join_call to follow Google-style docstrings: for create_agent
describe Args (kwargs: any configurable creation parameters) and Returns (Agent:
the created Agent instance), and for join_call describe Args (agent: Agent,
call_type: str, call_id: str, **kwargs) and Returns (None), plus a short
one-line summary retained at top; keep wording concise and consistent with
existing project docstring conventions. Ensure the docstrings are placed
immediately under each function signature and include types for each parameter
and the return value.
In `@plugins/mistral/vision_agents/plugins/mistral/stt.py`:
- Around line 119-185: Update the three handler docstrings to Google-style by
adding an Args section that documents the single parameter `event` for each
function: in _handle_text_delta describe event as a TranscriptionStreamTextDelta
containing partial text deltas and any relevant attributes used (e.g., text); in
_handle_done describe event as a TranscriptionStreamDone containing the final
transcript, language, and model; and in _handle_error describe event as a
RealtimeTranscriptionError containing the error details; keep the one-line
summary and add a short Returns/Raises only if the handler returns or raises
(these handlers do not), matching other Google-style docstrings in the codebase.
- Around line 187-217: The code sets self._current_participant in process_audio
but shares self._accumulated_text across all inputs, allowing cross-participant
mixing; update process_audio to detect participant changes and either (a)
enforce a single active participant by rejecting/ignoring audio when
self._current_participant is set to a different Participant, or (b) flush/reset
per-participant buffers by moving _accumulated_text into a per-participant map
(e.g., dict keyed by participant id) and use that map when appending
audio/transcripts; specifically modify process_audio (and related Done/flush
handlers) to check self._current_participant vs incoming participant, and on
mismatch either reset the global _accumulated_text and set the new participant
or switch to per-participant storage to avoid mixing transcripts.
- Around line 223-245: Replace the broad Exception handlers around
self._connection.end_audio() and self._connection.close() with specific
exceptions (e.g., ConnectionError, OSError, asyncio.TimeoutError and any
mistralai-specific exceptions you find) so unexpected errors surface; update the
try/except blocks in the cleanup section that reference
self._connection.end_audio and self._connection.close to catch only those
concrete exception types (and log the error) after verifying the exact
exceptions raised by RealtimeConnection in the mistralai client library or by
running tests.
🧹 Nitpick comments (1)
plugins/mistral/tests/test_mistral_stt.py (1)
18-31: Consider using a context manager or try/finally for cleanup.If an assertion or exception occurs before
stt.close()is reached (e.g., duringprocess_audio), the WebSocket connection may remain open, potentially causing resource leaks or test interference.♻️ Proposed fix using try/finally
async def test_transcribe_chunked_audio(self, mia_audio_48khz_chunked, participant): """Test transcription with chunked audio (simulates real-time streaming).""" stt = mistral.STT() await stt.start() session = STTSession(stt) - # Send audio in chunks like real-time streaming - for chunk in mia_audio_48khz_chunked: - await stt.process_audio(chunk, participant) - await asyncio.sleep( - 0.001 - ) # Simulate real-time pacing, allow receive task to run - - # Close signals end of audio and triggers final transcript - await stt.close() + try: + # Send audio in chunks like real-time streaming + for chunk in mia_audio_48khz_chunked: + await stt.process_audio(chunk, participant) + await asyncio.sleep( + 0.001 + ) # Simulate real-time pacing, allow receive task to run + finally: + # Close signals end of audio and triggers final transcript + await stt.close()
| async def create_agent(**kwargs) -> Agent: | ||
| """Create the agent with Mistral STT.""" | ||
| agent = Agent( | ||
| edge=getstream.Edge(), | ||
| agent_user=User(name="Mistral Agent", id="agent"), | ||
| instructions="You're a helpful voice AI assistant. Keep replies short and conversational.", | ||
| tts=deepgram.TTS(), # Uses Deepgram for text-to-speech | ||
| stt=mistral.STT(), # Uses Mistral Voxtral for speech-to-text | ||
| llm=gemini.LLM("gemini-2.0-flash"), | ||
| # turn_detection=smart_turn.TurnDetection(), # Required since Mistral STT has no turn detection | ||
| ) | ||
| return agent | ||
|
|
||
|
|
||
| async def join_call(agent: Agent, call_type: str, call_id: str, **kwargs) -> None: | ||
| """Join the call and start the agent.""" |
There was a problem hiding this comment.
Add Google-style Args/Returns to the public example helpers.
Both create_agent and join_call accept parameters, but their docstrings don’t include Args/Returns. Please expand them to match the project docstring standard.
✍️ Proposed docstring updates
async def create_agent(**kwargs) -> Agent:
- """Create the agent with Mistral STT."""
+ """Create the agent with Mistral STT.
+
+ Args:
+ **kwargs: Optional agent configuration overrides.
+
+ Returns:
+ Agent: Configured agent instance.
+ """
@@
async def join_call(agent: Agent, call_type: str, call_id: str, **kwargs) -> None:
- """Join the call and start the agent."""
+ """Join the call and start the agent.
+
+ Args:
+ agent: The agent instance to run.
+ call_type: Stream call type.
+ call_id: Stream call identifier.
+ **kwargs: Optional join parameters.
+ """As per coding guidelines, Docstrings should follow the Google style guide for docstrings.
🤖 Prompt for AI Agents
In `@plugins/mistral/example/mistral_stt_example.py` around lines 33 - 48, Update
the docstrings for the public helper functions create_agent and join_call to
follow Google-style docstrings: for create_agent describe Args (kwargs: any
configurable creation parameters) and Returns (Agent: the created Agent
instance), and for join_call describe Args (agent: Agent, call_type: str,
call_id: str, **kwargs) and Returns (None), plus a short one-line summary
retained at top; keep wording concise and consistent with existing project
docstring conventions. Ensure the docstrings are placed immediately under each
function signature and include types for each parameter and the return value.
| def _handle_text_delta(self, event: TranscriptionStreamTextDelta): | ||
| """Handle text delta - emit word-by-word partials, full text on complete.""" | ||
| text = event.text | ||
| if not text: | ||
| return | ||
|
|
||
| participant = self._current_participant | ||
| if participant is None: | ||
| logger.warning("Received transcript but no participant set") | ||
| return | ||
|
|
||
| # Accumulate text for complete events | ||
| self._accumulated_text += text | ||
|
|
||
| processing_time_ms: Optional[float] = None | ||
| if self._audio_start_time is not None: | ||
| processing_time_ms = (time.perf_counter() - self._audio_start_time) * 1000 | ||
|
|
||
| response = TranscriptResponse( | ||
| model_name=self.model, | ||
| processing_time_ms=processing_time_ms, | ||
| ) | ||
|
|
||
| # Emit partial with just the new word/delta (not accumulated) | ||
| text_stripped = text.strip() | ||
| if text_stripped: | ||
| self._emit_partial_transcript_event(text_stripped, participant, response) | ||
|
|
||
| # Check for sentence-ending punctuation - emit complete transcript | ||
| if text.rstrip().endswith((".", "?", "!")): | ||
| accumulated_stripped = self._accumulated_text.strip() | ||
| if accumulated_stripped: | ||
| self._emit_transcript_event(accumulated_stripped, participant, response) | ||
| self._accumulated_text = "" | ||
| self._audio_start_time = None | ||
|
|
||
| def _handle_done(self, event: TranscriptionStreamDone): | ||
| """Handle end-of-stream event with full transcript.""" | ||
| text = event.text.strip() | ||
| if not text: | ||
| return | ||
|
|
||
| participant = self._current_participant | ||
| if participant is None: | ||
| logger.warning("Received done event but no participant set") | ||
| return | ||
|
|
||
| response = TranscriptResponse( | ||
| language=event.language, | ||
| model_name=event.model, | ||
| ) | ||
|
|
||
| self._emit_transcript_event(text, participant, response) | ||
| self._accumulated_text = "" | ||
| self._audio_start_time = None | ||
| self._done_received.set() | ||
|
|
||
| def _handle_error(self, event: RealtimeTranscriptionError): | ||
| """Handle error event.""" | ||
| error_msg = str(event.error) if event.error else "Unknown Mistral error" | ||
| logger.error(f"Mistral transcription error: {error_msg}") | ||
|
|
||
| error = Exception(error_msg) | ||
| self._emit_error_event( | ||
| error, context="transcription", participant=self._current_participant | ||
| ) | ||
| self._audio_start_time = None |
There was a problem hiding this comment.
Add Google-style Args sections to the handler docstrings.
These handlers accept event parameters but the docstrings omit Args. Please expand them for consistency.
✍️ Docstring alignment
def _handle_text_delta(self, event: TranscriptionStreamTextDelta):
- """Handle text delta - emit word-by-word partials, full text on complete."""
+ """Handle text delta - emit word-by-word partials, full text on complete.
+
+ Args:
+ event: The text delta event from Mistral.
+ """
@@
def _handle_done(self, event: TranscriptionStreamDone):
- """Handle end-of-stream event with full transcript."""
+ """Handle end-of-stream event with full transcript.
+
+ Args:
+ event: The end-of-stream event from Mistral.
+ """
@@
def _handle_error(self, event: RealtimeTranscriptionError):
- """Handle error event."""
+ """Handle error event.
+
+ Args:
+ event: The error event from Mistral.
+ """As per coding guidelines, Docstrings should follow the Google style guide for docstrings.
🤖 Prompt for AI Agents
In `@plugins/mistral/vision_agents/plugins/mistral/stt.py` around lines 119 - 185,
Update the three handler docstrings to Google-style by adding an Args section
that documents the single parameter `event` for each function: in
_handle_text_delta describe event as a TranscriptionStreamTextDelta containing
partial text deltas and any relevant attributes used (e.g., text); in
_handle_done describe event as a TranscriptionStreamDone containing the final
transcript, language, and model; and in _handle_error describe event as a
RealtimeTranscriptionError containing the error details; keep the one-line
summary and add a short Returns/Raises only if the handler returns or raises
(these handlers do not), matching other Google-style docstrings in the codebase.
| async def process_audio( | ||
| self, | ||
| pcm_data: PcmData, | ||
| participant: Optional[Participant] = None, | ||
| ): | ||
| """ | ||
| Process audio data through Mistral for transcription. | ||
|
|
||
| Args: | ||
| pcm_data: The PCM audio data to process. | ||
| participant: Optional participant metadata. | ||
| """ | ||
| if self.closed: | ||
| logger.warning("Mistral STT is closed, ignoring audio") | ||
| return | ||
|
|
||
| await self._connection_ready.wait() | ||
|
|
||
| if self._connection is None or self._connection.is_closed: | ||
| logger.warning("Mistral connection not available") | ||
| return | ||
|
|
||
| resampled = pcm_data.resample(self.sample_rate, 1) | ||
| audio_bytes = resampled.samples.tobytes() | ||
|
|
||
| self._current_participant = participant | ||
|
|
||
| if self._audio_start_time is None: | ||
| self._audio_start_time = time.perf_counter() | ||
|
|
||
| await self._connection.send_audio(audio_bytes) |
There was a problem hiding this comment.
Guard against cross‑participant transcript mixing.
_current_participant and _accumulated_text are shared across all audio input. If multiple participants interleave audio before a Done event, transcripts can be attributed to the wrong speaker. Consider per‑participant state or enforce a single active participant.
🛡️ Minimal guard (prefer per‑participant state if multi‑speaker is required)
- self._current_participant = participant
+ if (
+ self._current_participant is not None
+ and participant is not None
+ and participant != self._current_participant
+ and not self._done_received.is_set()
+ ):
+ logger.warning(
+ "Received audio for a new participant while previous transcript is active"
+ )
+ return
+
+ self._current_participant = participant📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def process_audio( | |
| self, | |
| pcm_data: PcmData, | |
| participant: Optional[Participant] = None, | |
| ): | |
| """ | |
| Process audio data through Mistral for transcription. | |
| Args: | |
| pcm_data: The PCM audio data to process. | |
| participant: Optional participant metadata. | |
| """ | |
| if self.closed: | |
| logger.warning("Mistral STT is closed, ignoring audio") | |
| return | |
| await self._connection_ready.wait() | |
| if self._connection is None or self._connection.is_closed: | |
| logger.warning("Mistral connection not available") | |
| return | |
| resampled = pcm_data.resample(self.sample_rate, 1) | |
| audio_bytes = resampled.samples.tobytes() | |
| self._current_participant = participant | |
| if self._audio_start_time is None: | |
| self._audio_start_time = time.perf_counter() | |
| await self._connection.send_audio(audio_bytes) | |
| async def process_audio( | |
| self, | |
| pcm_data: PcmData, | |
| participant: Optional[Participant] = None, | |
| ): | |
| """ | |
| Process audio data through Mistral for transcription. | |
| Args: | |
| pcm_data: The PCM audio data to process. | |
| participant: Optional participant metadata. | |
| """ | |
| if self.closed: | |
| logger.warning("Mistral STT is closed, ignoring audio") | |
| return | |
| await self._connection_ready.wait() | |
| if self._connection is None or self._connection.is_closed: | |
| logger.warning("Mistral connection not available") | |
| return | |
| resampled = pcm_data.resample(self.sample_rate, 1) | |
| audio_bytes = resampled.samples.tobytes() | |
| if ( | |
| self._current_participant is not None | |
| and participant is not None | |
| and participant != self._current_participant | |
| and not self._done_received.is_set() | |
| ): | |
| logger.warning( | |
| "Received audio for a new participant while previous transcript is active" | |
| ) | |
| return | |
| self._current_participant = participant | |
| if self._audio_start_time is None: | |
| self._audio_start_time = time.perf_counter() | |
| await self._connection.send_audio(audio_bytes) |
🤖 Prompt for AI Agents
In `@plugins/mistral/vision_agents/plugins/mistral/stt.py` around lines 187 - 217,
The code sets self._current_participant in process_audio but shares
self._accumulated_text across all inputs, allowing cross-participant mixing;
update process_audio to detect participant changes and either (a) enforce a
single active participant by rejecting/ignoring audio when
self._current_participant is set to a different Participant, or (b) flush/reset
per-participant buffers by moving _accumulated_text into a per-participant map
(e.g., dict keyed by participant id) and use that map when appending
audio/transcripts; specifically modify process_audio (and related Done/flush
handlers) to check self._current_participant vs incoming participant, and on
mismatch either reset the global _accumulated_text and set the new participant
or switch to per-participant storage to avoid mixing transcripts.
| # Signal end of audio to trigger Done event with full transcript | ||
| if self._connection and not self._connection.is_closed: | ||
| try: | ||
| await self._connection.end_audio() | ||
| except Exception as e: | ||
| logger.warning(f"Error signaling end of audio: {e}") | ||
|
|
||
| # Wait for Done event with timeout | ||
| if self._receive_task and not self._done_received.is_set(): | ||
| try: | ||
| await asyncio.wait_for(self._done_received.wait(), timeout=5.0) | ||
| except asyncio.TimeoutError: | ||
| logger.debug("Timeout waiting for done event") | ||
|
|
||
| if self._receive_task: | ||
| await cancel_and_wait(self._receive_task) | ||
| self._receive_task = None | ||
|
|
||
| if self._connection: | ||
| try: | ||
| await self._connection.close() | ||
| except Exception as e: | ||
| logger.warning(f"Error closing Mistral connection: {e}") |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's locate and read the specific file
find . -type f -name "stt.py" | grep mistralRepository: GetStream/Vision-Agents
Length of output: 121
🏁 Script executed:
# Read lines 223-245 to verify the code
sed -n '223,245p' ./plugins/mistral/vision_agents/plugins/mistral/stt.py | cat -nRepository: GetStream/Vision-Agents
Length of output: 1175
🏁 Script executed:
# Look at the imports and class definition to understand the connection type
head -50 ./plugins/mistral/vision_agents/plugins/mistral/stt.py | cat -nRepository: GetStream/Vision-Agents
Length of output: 1907
🏁 Script executed:
# Search for all exception handling in the stt.py file to see patterns
grep -n "except" ./plugins/mistral/vision_agents/plugins/mistral/stt.py | cat -nRepository: GetStream/Vision-Agents
Length of output: 302
🏁 Script executed:
# Look at lines around 111-114 to see what pattern is used elsewhere
sed -n '110,120p' ./plugins/mistral/vision_agents/plugins/mistral/stt.py | cat -nRepository: GetStream/Vision-Agents
Length of output: 610
🏁 Script executed:
# Search for other usages of end_audio and close methods in the codebase
rg "end_audio|\.close\(\)" ./plugins/mistral/vision_agents/plugins/mistral/ -B 2 -A 2Repository: GetStream/Vision-Agents
Length of output: 1863
🏁 Script executed:
# Look at the full close method context to understand better
sed -n '218,250p' ./plugins/mistral/vision_agents/plugins/mistral/stt.py | cat -nRepository: GetStream/Vision-Agents
Length of output: 1524
🏁 Script executed:
# Search for any documentation or type hints related to RealtimeConnection
rg "RealtimeConnection" ./plugins/mistral/vision_agents/plugins/mistral/ -B 2 -A 5Repository: GetStream/Vision-Agents
Length of output: 1627
🌐 Web query:
Mistral AI RealtimeConnection end_audio close methods exceptions Python
💡 Result:
Official Mistral AI Python SDK (mistralai)
- There is no
RealtimeConnectionclass and therefore noend_audio()method in the official SDK (it’s primarily HTTP + streaming via SSE for some endpoints). (github.com) - “Close” is done by exiting the
with Mistral(...) as client:context manager, which closes the underlying HTTPX client(s). (github.com) - The SDK’s documented exception types are:
MistralError(base for HTTP error responses)HTTPValidationError(HTTP 422 validation error)ResponseValidationError(response doesn’t match expected Pydantic model)- network errors from
httpx:RequestError,ConnectError,TimeoutException(github.com)
If you meant vLLM’s RealtimeConnection (OpenAI-compatible realtime transcription over WebSocket)
RealtimeConnectionmanages the WebSocket lifecycle and routes events likesession.update,input_audio_buffer.append, andinput_audio_buffer.commit—but it does not document anend_audio()method. (docs.vllm.ai)- During
input_audio_buffer.append, it can raiseVLLMValidationError(e.g., max audio size exceeded, empty audio). Decode failures are caught and turned into an error message back to the client. (docs.vllm.ai)
Citations:
- 1: https://github.com/mistralai/client-python
- 2: https://github.com/mistralai/client-python
- 3: https://github.com/mistralai/client-python?utm_source=openai
- 4: https://docs.vllm.ai/en/latest/api/vllm/entrypoints/openai/realtime/connection/
- 5: https://docs.vllm.ai/en/latest/api/vllm/entrypoints/openai/realtime/connection/
🏁 Script executed:
# Search for exception types that are used in this file
grep -n "RealtimeTranscriptionError\|import.*Error\|from.*error" ./plugins/mistral/vision_agents/plugins/mistral/stt.pyRepository: GetStream/Vision-Agents
Length of output: 241
🏁 Script executed:
# Look at the context around where RealtimeTranscriptionError is handled
sed -n '100,120p' ./plugins/mistral/vision_agents/plugins/mistral/stt.py | cat -nRepository: GetStream/Vision-Agents
Length of output: 1193
🏁 Script executed:
# Search for any other exception types used with connection-related operations
grep -n "ConnectionError\|OSError\|TimeoutError\|WebSocketException" ./plugins/mistral/vision_agents/plugins/mistral/stt.pyRepository: GetStream/Vision-Agents
Length of output: 111
🏁 Script executed:
# Look for how other async connection operations are handled in the codebase
rg "send_audio|\.connect\(\)" ./plugins/mistral/vision_agents/plugins/mistral/ -B 2 -A 5Repository: GetStream/Vision-Agents
Length of output: 745
Use specific exceptions instead of catch-all Exception in cleanup code.
Lines 227 and 244 catch Exception broadly in the end_audio() and close() calls. Per coding guidelines, use specific exception types so unexpected failures remain visible.
Suggested approach: catch the exception types that RealtimeConnection.end_audio() and .close() actually raise. Common candidates for WebSocket/async operations include ConnectionError, OSError, asyncio.TimeoutError, and any Mistral-specific exceptions (check the mistralai library source or test these methods to confirm).
🤖 Prompt for AI Agents
In `@plugins/mistral/vision_agents/plugins/mistral/stt.py` around lines 223 - 245,
Replace the broad Exception handlers around self._connection.end_audio() and
self._connection.close() with specific exceptions (e.g., ConnectionError,
OSError, asyncio.TimeoutError and any mistralai-specific exceptions you find) so
unexpected errors surface; update the try/except blocks in the cleanup section
that reference self._connection.end_audio and self._connection.close to catch
only those concrete exception types (and log the error) after verifying the
exact exceptions raised by RealtimeConnection in the mistralai client library or
by running tests.
Summary by CodeRabbit
New Features
Documentation
Tests