Skip to content

Support multiple speakers on the same call#348

Merged
dangusev merged 10 commits intomainfrom
feature/first-speaker-wins-processor
Feb 10, 2026
Merged

Support multiple speakers on the same call#348
dangusev merged 10 commits intomainfrom
feature/first-speaker-wins-processor

Conversation

@dangusev
Copy link
Contributor

@dangusev dangusev commented Feb 9, 2026

This PR adds a mechanism to pick the first active participant if there are multiple on the call.
This allows the Agent to process incoming audio as usual, even when speakers talk simultaneously.

What's changed

  • Added id field to Participant, which is supposed to be unique. The existing user_id field is not guaranteed be unique.
  • Agent now buffers audio data per participant (previously, it was a single shared queue)
  • Added AudioFilter interface and its FirstSpeakerWinsFilter implementation. It picks the first active speaker using Silero VAD, and processes only this audio track, ignoring others.
    The filtering is active only when there are 2+ participants on the call.

Summary by CodeRabbit

  • New Features

    • Multi-speaker handling: per-participant audio routing with a First-Speaker-Wins filter and parallel warmup of audio components.
    • Participant identity: Participant objects now include a stable id; agent speech events and TTS flows carry explicit participant info (replacing user_id).
    • STT behavior: STT integrations now require participant metadata for accurate attribution.
  • Tests

    • Added comprehensive tests for the multi-speaker filter, speaker transitions, silence handling, and warmup behavior.

@coderabbitai
Copy link

coderabbitai bot commented Feb 9, 2026

📝 Walkthrough

Walkthrough

Adds per-participant audio queues and routing, a Silero VAD–backed FirstSpeakerWinsFilter with warmup integration, expands Participant with an id, and updates STT and Agent Say events/APIs to require and propagate Participant objects.

Changes

Cohort / File(s) Summary
Core Multi-Participant Audio System
agents-core/vision_agents/core/agents/agents.py, agents-core/vision_agents/core/agents/agent_launcher.py
Per-participant incoming audio queues keyed by participant.id; consumer iterates participant queues, applies _multi_speaker_filter, forwards to audio processors/STT/LLM. agent_launcher schedules warmup for _multi_speaker_filter when present.
Audio Filtering Framework
agents-core/vision_agents/core/utils/audio_filter.py
Added AudioFilter ABC and FirstSpeakerWinsFilter (Silero VAD) with warmup lifecycle, lock acquisition/release, silence timeout, process_audio, and clear APIs.
Types & Transport
agents-core/vision_agents/core/edge/types.py, plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py
Participant dataclass gains id: str. GetStream transport now asserts required fields and builds Participant.id as <user_id>__<track_lookup_prefix>.
STT Signature & Implementations
agents-core/vision_agents/core/stt/stt.py, plugins/fast_whisper/.../stt.py, plugins/fish/.../stt.py, plugins/mistral/.../stt.py, plugins/wizper/.../stt.py
STT.process_audio and related internals now require a non-optional Participant; plugin STT implementations removed fallback participant creation and updated error/context emission to include participant.
Utils & VAD Loader
agents-core/vision_agents/core/utils/utils.py, agents-core/vision_agents/core/vad/silero.py
Made model-path checks and directory creation non-blocking via asyncio.to_thread; Silero loader ensures model directory exists before loading.
Examples & Config
examples/01_simple_agent_example/simple_agent_example.py
Commented out explicit eager_turn_detection=True in example to rely on default behavior.
Events, Tests & Fixtures
agents-core/vision_agents/core/agents/events.py, conftest.py, tests/..., plugins/.../tests/*, tests/test_agents/test_audio_filter.py
Removed user_id from AgentSay event dataclasses and replaced with participant payloads; updated fixtures/tests to supply Participant.id; added mia_audio_16khz_chunked fixture and tests for FirstSpeakerWinsFilter.

Sequence Diagram

sequenceDiagram
    participant Transport as Transport (on_audio_received)
    participant Agent as Agent (_consume_incoming_audio)
    participant Filter as FirstSpeakerWinsFilter (Silero VAD)
    participant Processors as Audio Processors (STT / LLM)

    Transport->>Agent: push 20ms chunk (participant P)
    activate Agent
    Agent->>Agent: enqueue into queue[P.id]
    Agent->>Agent: snapshot participant queues and dequeue 20ms per participant

    Agent->>Filter: process_audio(chunk, participant P)
    activate Filter
    Filter->>Filter: run VAD -> speech? / silence?
    alt no active speaker & P speaks
        Filter->>Filter: acquire lock (active_speaker_id = P.id)
        Filter-->>Agent: return chunk
    else active speaker == P and speaking
        Filter-->>Agent: return chunk
    else active speaker == P and silent
        Filter->>Filter: increment silence timer
        alt silence > threshold
            Filter->>Filter: release lock
            Filter-->>Agent: return None
        else
            Filter-->>Agent: return chunk
        end
    else another speaker active
        Filter-->>Agent: return None
    end
    deactivate Filter

    Agent->>Processors: forward filtered chunk to STT or LLM
    deactivate Agent
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

The device keeps its black mouth shut, then sings;
A single voice seizes the bright room like glass,
Holding the lock, the small machine counts silence—
It measures breath, then lets the other mouths pass.
A quiet tribunal, exact and merciless.

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 69.70% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and clearly summarizes the primary objective of the changeset: implementing support for multiple concurrent speakers on a single call through speaker detection and filtering.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/first-speaker-wins-processor

No actionable comments were generated in the recent review. 🎉


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
conftest.py (1)

193-196: ⚠️ Potential issue | 🔴 Critical

Missing id parameter in participant fixture — will cause TypeError.

As noted in the types.py review, Participant now requires an id field, but this fixture doesn't supply one. Every test depending on this fixture will fail.

Fix
-    return Participant({}, user_id="test-user")
+    return Participant({}, user_id="test-user", id="test-user")
agents-core/vision_agents/core/agents/agents.py (1)

903-910: ⚠️ Potential issue | 🔴 Critical

Participant construction is missing the required id field.

The Participant dataclass requires three arguments: original, user_id, and id (all required, no defaults). This construction only passes two, which will raise a TypeError at runtime.

Proposed fix
                participant = (
                    Participant(
                        original=event.metadata or {},
                        user_id=event.user_id,
+                        id=event.user_id or "",
                    )
                    if event.user_id
                    else None
                )
🤖 Fix all issues with AI agents
In `@agents-core/vision_agents/core/agents/agents.py`:
- Around line 342-350: The first PCM frame for a new participant is dropped
because when self._participant_queues lacks participant.id you create and store
a new AudioQueue (AudioQueue(buffer_limit_ms=self._audio_buffer_limit_ms)) but
never call await queue.put(pcm) for the current event.pcm_data; update the else
branch handling in the method that processes events to: create the AudioQueue,
store it in self._participant_queues[participant.id] = (participant, queue), and
then immediately await queue.put(pcm) so the initial frame is enqueued for the
new participant.
- Around line 1019-1024: Replace the stray print and fix the units: instead of
print(f"vad latency: {end - start}ms") use the module/class logger (e.g.,
logger.debug) to emit the latency from the _multi_speaker_filter.process_audio
timing and multiply the difference between end and start by 1000 to report
milliseconds (format to a sensible precision); update the call site around start
= time.perf_counter(), pcm = await
self._multi_speaker_filter.process_audio(...), end = time.perf_counter() to log
logger.debug("vad latency: %sms", (end - start) * 1000) or remove the log
entirely if unnecessary.

In `@agents-core/vision_agents/core/edge/types.py`:
- Around line 14-18: The Participant dataclass now requires id which breaks
tests that instantiate Participant({}, user_id="test-user"); make id optional
with a sane default so existing fixtures keep working and fix the comment typo:
change Participant's signature to use id: str = field(default_factory=lambda:
str(uuid4())) (import uuid4 from uuid and field from dataclasses) or
alternatively id: Optional[str] = None and generate/set a UUID in __post_init__;
also correct the comment string "connectivty" → "connectivity" to fix the typo.
Ensure imports for typing/uuid/dataclasses are added if needed.

In `@agents-core/vision_agents/core/vad/silero.py`:
- Around line 18-21: The _ensure_dir function uses a race-prone
check-then-create pattern; replace the os.path.exists() + os.makedirs() sequence
in _ensure_dir with a single atomic call using os.makedirs(dir_path,
exist_ok=True) so directory creation is race-free and safe if the directory
already exists.

In `@plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py`:
- Around line 80-81: The unique_id construction uses
participant.track_lookup_prefix directly which can be None or empty, producing
ids like "user__None" or colliding values; update the logic in
stream_edge_transport.py where unique_id is built (the unique_id variable and
the Participant(...) return) to defensively normalize the prefix: get prefix =
(participant.track_lookup_prefix or "").strip(); if prefix is non-empty use
f"{participant.user_id}__{prefix}" else use participant.user_id (no suffix), and
ensure the Participant(original=participant, user_id=participant.user_id,
id=unique_id) call uses that sanitized id.

In `@plugins/smart_turn/tests/test_smart_turn.py`:
- Line 34: The participant fixture currently constructs Participant without the
required id field, causing runtime failures; update the participant pytest
fixture to pass the id argument (e.g., id="test-user") when returning
Participant so it supplies all three required dataclass fields (original,
user_id, id), ensuring Participant(...) is called with id alongside user_id and
original.
🧹 Nitpick comments (6)
agents-core/vision_agents/core/utils/utils.py (1)

85-86: Synchronous os.path.exists / os.remove still block the event loop in the error handler.

Now that line 58 wraps its os.path.exists in asyncio.to_thread, these two calls in the except block are an inconsistency — they perform blocking filesystem I/O on the async thread. The error path is rare, so this is not urgent, but for consistency you could offload them as well.

♻️ Suggested diff
         except httpx.HTTPError as e:
             # Clean up partial download on error
-            if os.path.exists(path):
-                os.remove(path)
+            if await asyncio.to_thread(os.path.exists, path):
+                await asyncio.to_thread(os.remove, path)
             raise RuntimeError(f"Failed to download {model_name}: {e}")
agents-core/vision_agents/core/agents/agent_launcher.py (1)

387-389: Accessing private _multi_speaker_filter from outside Agent.

All other warmup targets (agent.llm, agent.tts, agent.stt, agent.turn_detection) are public attributes. Accessing a private (_-prefixed) attribute here breaks the pattern and couples the launcher to Agent internals. Consider exposing it as a public attribute or property on Agent for consistency.

#!/bin/bash
# Check how _multi_speaker_filter is defined/used in agents.py
rg -n '_multi_speaker_filter' --type=py -C2
plugins/openai/tests/test_openai_realtime.py (1)

70-71: Redundant local imports — numpy and PcmData/AudioFormat already imported at module level.

Lines 3, 6 already import numpy as np and AudioFormat, PcmData at the top of the file. These local re-imports on lines 70–71 are dead weight now.

🧹 Proposed cleanup
         # Convert 16kHz audio to 48kHz for OpenAI realtime
         # OpenAI expects 48kHz PCM audio
-        import numpy as np
-        from getstream.video.rtc.track_util import AudioFormat, PcmData
         from scipy import signal
tests/test_agents/test_audio_filter.py (1)

7-8: Parameter id shadows the Python builtin.

A small darkness — id as a parameter name shadows the builtin id(). Consider participant_id instead.

🧹 Suggested rename
-def _participant(user_id: str, id: str = "") -> Participant:
-    return Participant(original=None, user_id=user_id, id=id or user_id)
+def _participant(user_id: str, participant_id: str = "") -> Participant:
+    return Participant(original=None, user_id=user_id, id=participant_id or user_id)
agents-core/vision_agents/core/utils/audio_filter.py (2)

50-65: Hardcoded /tmp default for model_dir.

The default model_dir="/tmp/first_speaker_wins_model" may not be writable in all deployment environments (read-only containers, restricted /tmp). In agents.py, the FirstSpeakerWinsFilter() is constructed with no arguments, so this default is what gets used in production.

Consider making this configurable or using a more portable default (e.g., via tempfile.mkdtemp or a user-home-relative path).


107-111: reset_interval_seconds=5.0 is hardcoded in on_warmed_up.

This VAD session reset interval isn't exposed through the constructor. If callers ever need to tune it, they'll have to subclass. Fine for now, but worth noting.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (6)
plugins/mistral/vision_agents/plugins/mistral/stt.py (1)

189-203: ⚠️ Potential issue | 🟡 Minor

Stale docstring: participant is no longer optional.

Line 199 still says "Optional participant metadata" but the parameter is now required.

📝 Proposed docstring fix
         """
         Process audio data through Mistral for transcription.

         Args:
             pcm_data: The PCM audio data to process.
-            participant: Optional participant metadata.
+            participant: Participant metadata for the audio source.
         """
plugins/fish/vision_agents/plugins/fish/stt.py (1)

52-70: ⚠️ Potential issue | 🟡 Minor

Stale docstring: references nonexistent user_metadata param and incorrect return type.

The docstring at lines 63-70 describes a user_metadata parameter that doesn't exist and a return type of "List of tuples" — but the method emits events and returns None. Since the signature was updated in this PR, this is a good time to clean up the docstring. As per coding guidelines, "Docstrings should follow the Google style guide for docstrings".

📝 Proposed docstring fix
         """
         Process audio data through Fish Audio for transcription.

-        Fish Audio operates in synchronous mode - it processes audio immediately and
-        returns results to the base class for event emission.
-
         Args:
             pcm_data: The PCM audio data to process.
-            user_metadata: Additional metadata about the user or session.
-
-        Returns:
-            List of tuples (is_final, text, metadata) representing transcription results,
-            or None if no results are available. Fish Audio returns final results only.
+            participant: Participant metadata for the audio source.
         """
plugins/fast_whisper/vision_agents/plugins/fast_whisper/stt.py (2)

94-107: ⚠️ Potential issue | 🟡 Minor

Stale docstring: participant described as "Optional" (line 106) but is now required.

📝 Proposed fix
         Args:
             pcm_data: The PCM audio data to process
-            participant: Optional participant metadata
+            participant: Participant metadata for the audio source.

148-154: ⚠️ Potential issue | 🟡 Minor

Same stale docstring in _process_buffer (line 153).

📝 Proposed fix
         Args:
-            participant: Optional participant metadata
+            participant: Participant metadata for the audio source.
plugins/wizper/vision_agents/plugins/wizper/stt.py (1)

60-71: ⚠️ Potential issue | 🟡 Minor

Stale docstring: participant described as "Optional" (line 70) but is now required.

📝 Proposed fix
         Args:
             pcm_data: The PCM audio data to process
-            participant: Optional participant metadata
+            participant: Participant metadata for the audio source.
agents-core/vision_agents/core/agents/agents.py (1)

1046-1050: ⚠️ Potential issue | 🟡 Minor

except Exception as e: violates coding guidelines.

The guideline says to never write except Exception as e — use specific exception handling instead. This broad catch silently swallows every possible error in the audio loop, which could mask critical bugs.

Proposed fix
-        except Exception as e:
-            self.logger.error(f"❌ Error in audio consumer: {e}", exc_info=True)
+        except (OSError, RuntimeError, ValueError) as e:
+            self.logger.error(f"❌ Error in audio consumer: {e}", exc_info=True)

Adjust the exception types to match the actual failures you expect from queue operations and audio processing. As per coding guidelines, "Never write except Exception as e - use specific exception handling."

🤖 Fix all issues with AI agents
In `@agents-core/vision_agents/core/agents/agents.py`:
- Around line 1003-1011: The loop is awaiting queue.get_duration(duration_ms=20)
sequentially with timeout=0.1 which multiplies stalls per participant; change to
poll all participant queues concurrently (or pre-check availability) so the
wall-clock per iteration stays ~20ms: for example, build a list of awaitables
from each participant queue.get_duration(duration_ms=20) wrapped with
asyncio.wait_for(..., timeout=<short>) and run asyncio.gather(...,
return_exceptions=True) to get results in parallel, then iterate the returned
results alongside the participants to handle successful pcm values and ignore
timeouts/exceptions; alternatively, if the queue type supports
queue.has_duration(20), call that before awaiting to avoid creating many timed
waits.
🧹 Nitpick comments (4)
conftest.py (2)

196-196: Minor inconsistency between user_id and id formatting.

user_id="test-user" uses a hyphen while id="test_user" uses an underscore. This is cosmetic but could cause confusion in tests that assert on these values.


207-221: New chunked fixture looks good — consider scope="session" for speed.

The fixture correctly reuses _mp3_to_pcm (unlike mia_audio_48khz_chunked which duplicates decoding inline). Since the output is deterministic and read-only, promoting to scope="session" would avoid redundant decoding across tests, matching the pattern of mia_audio_16khz.

♻️ Suggested scope change
-@pytest.fixture
+@pytest.fixture(scope="session")
 def mia_audio_16khz_chunked():
plugins/mistral/vision_agents/plugins/mistral/stt.py (1)

114-119: Improved error handling: traceback logging and participant context — good changes.

Using logger.exception captures the full traceback, and passing participant=self._current_participant enriches the error event. Both are welcome improvements.

Note: Line 114's except Exception as e is a broad catch that conflicts with the project's coding guidelines. Since you're already modifying this block, consider narrowing it to the specific exceptions the Mistral SDK can raise (e.g., connection/protocol errors). As per coding guidelines, "Never write except Exception as e - use specific exception handling".

agents-core/vision_agents/core/agents/agents.py (1)

1013-1038: Redundant participant is not None check on line 1035.

participant is unpacked from the queues.values() tuple on line 1004 and is always a Participant instance — it can never be None here. A tiny dead check, harmless but slightly misleading to future readers.

Proposed cleanup
-                    if self.turn_detection is not None and participant is not None:
+                    if self.turn_detection is not None:

@dangusev dangusev merged commit 0bcf9de into main Feb 10, 2026
9 checks passed
@dangusev dangusev deleted the feature/first-speaker-wins-processor branch February 10, 2026 13:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants