Add AVSynchronizer to sync audio & video playback for Avatars#466
Add AVSynchronizer to sync audio & video playback for Avatars#466
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughAdds AVSynchronizer to coordinate audio buffering and timed release of queued video frames; refactors Anam and LemonSlice avatar publishers to use it; adds RTC interruption flushing and epoch-staleness guards; updates LemonSlice example to require LEMONSLICE_AGENT_ID and send an initial response; includes tests for synchronization behavior. Changes
Sequence Diagram(s)sequenceDiagram
participant Writer as External Writer
participant Sync as AVSynchronizer
participant Audio as _BufferTrackingAudioTrack
participant Video as _SyncedVideoTrack
participant Recv as Frame Receiver
Writer->>Sync: write_audio(pcm)
Sync->>Audio: append PCM (update buffered duration)
Writer->>Sync: write_video(frame)
Sync->>Video: enqueue frame with release_at = now + Audio.buffered
Recv->>Video: recv()
alt now < next.release_at
Video->>Recv: return last released frame (repeat, update PTS)
else
Video->>Video: dequeue frame, advance PTS/timebase
Video->>Recv: return released frame
end
Writer->>Sync: flush()
Sync->>Audio: clear buffer
Sync->>Video: discard pending frames
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugins/anam/vision_agents/plugins/anam/anam_avatar_publisher.py (1)
213-220:⚠️ Potential issue | 🟠 MajorFlush-before-interrupt still races with the receiver loops.
_video_receiver()and_audio_receiver()keep feedingself._syncwhile this block awaitsflush()andinterrupt(). Any in-flight frames from the interrupted turn can therefore repopulate the synchronizer right after the flush, so barge-in still leaks stale avatar audio/video. Please gate the receiver side with a turn/generation token, or re-flush after the interrupt completes.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/anam/vision_agents/plugins/anam/anam_avatar_publisher.py` around lines 213 - 220, The flush-before-interrupt races because _video_receiver() and _audio_receiver() can push frames into self._sync while on_turn_started awaits flush() and _session.interrupt(); fix by gating receivers with a turn token or re-flushing after interrupt: update on_turn_started (the handler) to acquire self._send_lock, increment/set a self._current_turn_token marker before calling await self._sync.flush() and await self._session.interrupt(), then either (preferred) keep the lock while setting the token and perform a second await self._sync.flush() after interrupt to ensure no new frames remain, AND modify _video_receiver()/_audio_receiver() to check the token (e.g., only push frames when frame.token == self._current_turn_token) or skip pushing when token mismatches; reference on_turn_started, _video_receiver, _audio_receiver, self._sync, self._send_lock, and _session.interrupt().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@agents-core/vision_agents/core/utils/av_synchronizer.py`:
- Around line 112-130: Add a synchronizer-level shutdown method (e.g., async def
stop(self) / close(self)) on AVSynchronizer that cleanly stops the owned tracks
by calling the underlying track teardown APIs (e.g., await
self._video_track.stop()/flush() and await self._audio_track.stop()/close() or
the appropriate async stop/close on QueuedVideoTrack and AudioStreamTrack) and
ensure any publisher close() paths call this synchronizer.stop()/close() so the
published video/audio tracks are terminated and consumers don't see a frozen
last frame; update references in the class to include this new method and call
it from the publishers' close hooks.
- Around line 61-77: recv() currently bypasses the parent pacing and always
returns immediately, so it ignores self.fps; fix it by preserving the parent's
cadence: when there is no pending frame ready to release (i.e., after checking
self._pending and possibly updating self.last_frame), if you would otherwise
return last_frame immediately you must instead await the parent's paced recv()
to honor self.fps (e.g., call await super().recv() or otherwise reuse the
QueuedVideoTrack pacing logic) and then set pts/time_base from next_timestamp;
keep using next_timestamp(), self._pending, self.last_frame, and self.fps in the
logic so the method only falls back to repeating last_frame after waiting up to
1/self.fps.
In `@plugins/lemonslice/example/lemonslice_avatar_example.py`:
- Around line 39-41: Fail fast when LEMONSLICE_AGENT_ID is missing by validating
os.getenv("LEMONSLICE_AGENT_ID") before constructing
lemonslice.LemonSliceAvatarPublisher: retrieve the value into a variable (e.g.,
agent_id), if it's falsy raise a clear RuntimeError or ValueError with a
descriptive message, and then pass that variable into
LemonSliceAvatarPublisher(agent_id=agent_id). Also add "LEMONSLICE_AGENT_ID" to
the required env var list at the top of the file so the header documents this
dependency.
---
Outside diff comments:
In `@plugins/anam/vision_agents/plugins/anam/anam_avatar_publisher.py`:
- Around line 213-220: The flush-before-interrupt races because
_video_receiver() and _audio_receiver() can push frames into self._sync while
on_turn_started awaits flush() and _session.interrupt(); fix by gating receivers
with a turn token or re-flushing after interrupt: update on_turn_started (the
handler) to acquire self._send_lock, increment/set a self._current_turn_token
marker before calling await self._sync.flush() and await
self._session.interrupt(), then either (preferred) keep the lock while setting
the token and perform a second await self._sync.flush() after interrupt to
ensure no new frames remain, AND modify _video_receiver()/_audio_receiver() to
check the token (e.g., only push frames when frame.token ==
self._current_turn_token) or skip pushing when token mismatches; reference
on_turn_started, _video_receiver, _audio_receiver, self._sync, self._send_lock,
and _session.interrupt().
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: ccae3d86-019b-4766-a4e1-1d5d4f403d09
📒 Files selected for processing (6)
agents-core/vision_agents/core/utils/av_synchronizer.pyplugins/anam/vision_agents/plugins/anam/anam_avatar_publisher.pyplugins/lemonslice/example/lemonslice_avatar_example.pyplugins/lemonslice/vision_agents/plugins/lemonslice/lemonslice_avatar_publisher.pytests/test_utils/__init__.pytests/test_utils/test_av_synchronizer.py
plugins/anam/vision_agents/plugins/anam/anam_avatar_publisher.py
Outdated
Show resolved
Hide resolved
plugins/lemonslice/vision_agents/plugins/lemonslice/lemonslice_avatar_publisher.py
Show resolved
Hide resolved
bdc0eb7 to
0b91522
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (4)
plugins/anam/vision_agents/plugins/anam/anam_avatar_publisher.py (2)
193-194: Chunk size reduction to 100ms noted.The change from 24000 to 2400 samples (100ms at 24kHz mono) was flagged in a past review as a separate fix. Ensure this is mentioned in the PR description for the squash commit.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/anam/vision_agents/plugins/anam/anam_avatar_publisher.py` around lines 193 - 194, The loop using pcm.resample(...).chunks(2400) reduces the audio chunk size to 2400 samples (100ms at 24kHz mono); update the PR/squash commit description to explicitly mention this chunk size change from the previous 24000 sample value and note that this was previously flagged for separate fixing so reviewers can track the intentional change to 100ms in anam_avatar_publisher.py (reference the pcm.resample(...).chunks(2400) call).
167-170: Duplicate_is_stale_epochimplementation.This method is identical to the one in
LemonSliceAvatarPublisher. Consider extracting it to a shared base class or utility in a follow-up to avoid drift.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/anam/vision_agents/plugins/anam/anam_avatar_publisher.py` around lines 167 - 170, The _is_stale_epoch method in AnamAvatarPublisher duplicates the implementation in LemonSliceAvatarPublisher; extract this logic into a shared utility or base class to avoid duplication and future drift: create a new helper (e.g., is_stale_epoch(llm, epoch) or add the method to a common base class that both AnamAvatarPublisher and LemonSliceAvatarPublisher inherit from) and replace the current _is_stale_epoch implementation in both classes to call the shared helper; ensure the helper uses the same behavior (return isinstance(llm, Realtime) and epoch != llm.epoch) and update imports/usages accordingly.agents-core/vision_agents/core/utils/av_synchronizer.py (2)
82-85:flush()clears video but also flushes audio—document this coupling.The method discards pending video frames and flushes the companion audio track. This cross-track side effect might surprise callers expecting video-only behavior. The docstring is accurate, but the method name
flushon a video track could be misleading.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/utils/av_synchronizer.py` around lines 82 - 85, The flush method on the video synchronizer (async def flush) currently clears video frames via self._pending.clear() and also flushes the companion audio track via await self._audio_track.flush(), which is a surprising cross-track side effect; either document this coupling in the method name/docstring or split behavior: rename flush to flush_video (or add a parameter like flush_audio: bool=False) and implement separate flush_audio logic that calls self._audio_track.flush(), or keep flush as-is but update the docstring to explicitly state that flush clears _pending and also flushes _audio_track so callers know about the audio-side effect.
27-32: SDK internals dependency noted.The
bufferedproperty reads_bufferand_bytes_per_sampledirectly from the SDK. A past review already flagged this—good that there's a plan to move this property upstream. Consider adding a brief comment here so future SDK changes get caught.📝 Suggested comment
`@property` def buffered(self) -> float: """Return the amount of seconds of audio pending in the buffer.""" + # NOTE: Accesses SDK internals (_buffer, _bytes_per_sample). + # Update if AudioStreamTrack changes. return len(self._buffer) / ( self.sample_rate * self.channels * self._bytes_per_sample )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@agents-core/vision_agents/core/utils/av_synchronizer.py` around lines 27 - 32, The buffered property directly reads SDK internals (_buffer and _bytes_per_sample) which couples this code to the SDK; add a concise TODO comment above the buffered property (referencing buffered, _buffer, _bytes_per_sample, sample_rate, channels) stating that this access is temporary, should be moved upstream into the SDK, and that any SDK change must update this calculation accordingly (include guidance to replace direct internals with a public API on the SDK when available).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@agents-core/vision_agents/core/utils/av_synchronizer.py`:
- Around line 61-80: The recv() override currently advances timestamps and
returns last_frame immediately, so pacing does not match QueuedVideoTrack;
either implement the same rate-limiting used by QueuedVideoTrack.recv() or
update the docstring to remove the misleading claim. To fix, wrap the waiting
logic when no pending frame is ready: if _pending is empty (or when re-using
last_frame), use asyncio.wait_for or asyncio.sleep to enforce a max wait of
1/self.fps before returning (mirror QueuedVideoTrack.recv()), calling
next_timestamp() after the wait; alternatively, if you choose not to enforce
pacing, update the method docstring in _SyncedVideoTrack.recv() to state that
callers drive the frame rate and that fps does not throttle recv() here. Ensure
references to recv, next_timestamp, _pending, last_frame and fps are updated
accordingly.
---
Nitpick comments:
In `@agents-core/vision_agents/core/utils/av_synchronizer.py`:
- Around line 82-85: The flush method on the video synchronizer (async def
flush) currently clears video frames via self._pending.clear() and also flushes
the companion audio track via await self._audio_track.flush(), which is a
surprising cross-track side effect; either document this coupling in the method
name/docstring or split behavior: rename flush to flush_video (or add a
parameter like flush_audio: bool=False) and implement separate flush_audio logic
that calls self._audio_track.flush(), or keep flush as-is but update the
docstring to explicitly state that flush clears _pending and also flushes
_audio_track so callers know about the audio-side effect.
- Around line 27-32: The buffered property directly reads SDK internals (_buffer
and _bytes_per_sample) which couples this code to the SDK; add a concise TODO
comment above the buffered property (referencing buffered, _buffer,
_bytes_per_sample, sample_rate, channels) stating that this access is temporary,
should be moved upstream into the SDK, and that any SDK change must update this
calculation accordingly (include guidance to replace direct internals with a
public API on the SDK when available).
In `@plugins/anam/vision_agents/plugins/anam/anam_avatar_publisher.py`:
- Around line 193-194: The loop using pcm.resample(...).chunks(2400) reduces the
audio chunk size to 2400 samples (100ms at 24kHz mono); update the PR/squash
commit description to explicitly mention this chunk size change from the
previous 24000 sample value and note that this was previously flagged for
separate fixing so reviewers can track the intentional change to 100ms in
anam_avatar_publisher.py (reference the pcm.resample(...).chunks(2400) call).
- Around line 167-170: The _is_stale_epoch method in AnamAvatarPublisher
duplicates the implementation in LemonSliceAvatarPublisher; extract this logic
into a shared utility or base class to avoid duplication and future drift:
create a new helper (e.g., is_stale_epoch(llm, epoch) or add the method to a
common base class that both AnamAvatarPublisher and LemonSliceAvatarPublisher
inherit from) and replace the current _is_stale_epoch implementation in both
classes to call the shared helper; ensure the helper uses the same behavior
(return isinstance(llm, Realtime) and epoch != llm.epoch) and update
imports/usages accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 0d9bc108-50fe-4525-946d-6125d2da8d4c
📒 Files selected for processing (7)
agents-core/vision_agents/core/utils/av_synchronizer.pyplugins/anam/vision_agents/plugins/anam/anam_avatar_publisher.pyplugins/lemonslice/example/lemonslice_avatar_example.pyplugins/lemonslice/vision_agents/plugins/lemonslice/lemonslice_avatar_publisher.pyplugins/lemonslice/vision_agents/plugins/lemonslice/lemonslice_rtc_manager.pytests/test_utils/__init__.pytests/test_utils/test_av_synchronizer.py
🚧 Files skipped from review as they are similar to previous changes (2)
- plugins/lemonslice/example/lemonslice_avatar_example.py
- tests/test_utils/test_av_synchronizer.py
What's changed
Added
AVSynchronizerclass to sync audio and video playback for avatars.It comes together with two helper classes
_SyncedVideoTrackand_BufferTrackingAudioTrackthat work together in order to control when the video frames are to be played.AVSynchronizercreates both video and audio tracks internally and exposes them to Avatars.Updated Anam and LemonSlice avatar plugins to use
AVSynchronizerReduced the audio chunk size while feeding audio to Anam avatar to 100ms
Summary by CodeRabbit
New Features
Bug Fixes
Tests
Chores