-
Notifications
You must be signed in to change notification settings - Fork 415
Fix audio pipeline #1662
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix audio pipeline #1662
Conversation
📝 WalkthroughWalkthroughThe PR refactors audio streaming from per-sample to chunked (Vec) output, introduces new Rubato-based resampling modules in audio-utils with static and dynamic rate handling, removes the legacy resampler from the audio crate, and updates downstream components to use the new streaming model and resampling extensions. Changes
Sequence Diagram(s)sequenceDiagram
participant Old as Old Per-Sample Model
participant New as New Chunked Model
participant Src as Audio Source
participant Spk as Speaker
rect rgb(200, 220, 255)
Note over Old: Previous: Per-sample streaming
Src->>Old: Sample 1 (f32)
Old->>Spk: Poll → f32
Src->>Old: Sample 2 (f32)
Old->>Spk: Poll → f32
Src->>Old: Sample 3 (f32)
Old->>Spk: Poll → f32
end
rect rgb(220, 255, 200)
Note over New: Current: Chunked streaming
Src->>New: Chunk [Sample 1..N] (Vec<f32>)
New->>New: Buffer + Resample (if dynamic rate)
New->>Spk: Poll → Vec<f32> (256 samples)
Src->>New: Chunk [Sample N+1..M]
New->>New: Buffer + Resample
New->>Spk: Poll → Vec<f32> (256 samples)
end
sequenceDiagram
participant Src as AsyncSource
participant Ext as ResampleExtDynamicNew
participant Res as ResamplerDynamicNew
participant Driver as RubatoChunkResampler
participant Rubato as FastFixedIn
Src->>Ext: stream().resampled_chunks(target, size)
Ext->>Res: new(source, target_rate, chunk_size)
Res->>Driver: create with FastFixedIn resampler
Res->>Res: Poll source for rate + samples
alt Rate matches
Res->>Driver: push_sample(s) per input
Res->>Driver: process_all_ready_blocks()
Driver->>Rubato: process block
else Rate changed
Res->>Driver: drain_for_rate_change()
Res->>Res: rebuild_resampler(new_rate)
Res->>Driver: rebind_resampler(new Rubato)
end
Driver->>Driver: has_full_chunk() → take_full_chunk()
Res->>Res: Yield Vec<f32> (chunk_size samples)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes
Possibly related PRs
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 2 inconclusive)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (7)
crates/audio-utils/src/resampler/mod.rs (1)
161-187: Keep test artifacts out of the repo root.Each test writes a WAV (
dynamic_*_resampler.wav) into the working directory, which leaves behind files aftercargo testand can break on read-only CI runners. Please switch to a temp location (e.g.,tempfile::NamedTempFile) or gate the dump behind a debug flag so the suite runs cleanly.crates/audio-utils/src/resampler/static_new.rs (2)
54-56: Consider deriving input_block_size from the resampling ratio.Setting
input_block_size = output_chunk_sizeis a simplification. For resampling ratios far from 1.0, the optimal input block size might differ from the desired output chunk size. For upsampling, you may need fewer input samples; for downsampling, more.For example, you could calculate:
let input_block_size = (output_chunk_size as f64 * ratio).ceil() as usize;This would maintain a more consistent relationship between input consumption and output production.
58-64: Hardcoded resampler quality parameters.The resampler is created with fixed quality settings:
2.0(max relative ratio change) andQuinticpolynomial degree. While Quintic provides excellent quality, you might want to consider making these configurable if different use cases require different quality/performance trade-offs.Consider adding quality parameters to the constructor:
pub fn new( source: S, target_rate: u32, output_chunk_size: usize, polynomial_degree: PolynomialDegree, ) -> Result<Self, crate::Error>crates/ws-utils/src/lib.rs (1)
102-146: LGTM! Correct buffered Stream implementation.The buffering pattern correctly:
- Yields samples from the internal buffer
- Refills from WebSocket messages when depleted
- Handles empty data by continuing
- Terminates on End or error
Optional micro-optimization: Lines 113-114 clear and reset the buffer on every depletion. You could avoid the Vec::clear() call by reusing the buffer capacity:
if self.buffer_idx < self.buffer.len() { let sample = self.buffer[self.buffer_idx]; self.buffer_idx += 1; return Poll::Ready(Some(sample)); } self.buffer_idx = 0; self.buffer.clear(); // or just track length insteadThis is a micro-optimization and may not be worth the added complexity.
crates/audio-utils/src/resampler/dynamic_new.rs (2)
38-106: Hardcoded quality parameters and block size assumption.Similar to
static_new.rs, this module:
- Sets
input_block_size = output_chunk_size(line 44), which may not be optimal for all resampling ratios- Uses hardcoded resampler parameters:
2.0andQuintic(lines 99-101)Consider making these configurable if different use cases require different quality/performance trade-offs. The same suggestions from
static_new.rsapply here.
115-179: Complex but correct Stream implementation.The poll_next method handles multiple concerns:
- Rate change detection and draining
- Chunk yielding in various states
- Error propagation
- End-of-stream handling
While the control flow is complex with many branches, the logic appears correct.
The rate change handling block (lines 127-151) is particularly complex. Consider extracting to a helper method:
fn handle_rate_change(&mut self, current_rate: u32) -> Result<Option<Vec<f32>>, crate::Error> { match self.drain_for_rate_change() { Ok(true) => { self.rebuild_resampler(current_rate)?; Ok(None) // Signal to continue polling } Ok(false) => { // Return any available output before completing drain Ok(self.driver.take_full_chunk() .or_else(|| self.driver.take_all_output())) } Err(err) => Err(err), } }This would make the main poll_next loop easier to follow.
crates/audio/src/speaker/windows.rs (1)
202-233: Avoid copying chunks twice.We copy every sample out of
sample_queueintoread_buffer, then immediately clone that slice into a freshVec. That's two passes over the same data plus an extra buffer we never return to callers. We can drain straight into the owned chunk we yield and keep a lightweight capacity field instead.Apply this diff to collect chunks in one pass:
@@ - SpeakerStream { - sample_queue, - waker_state, - capture_thread: Some(capture_thread), - read_buffer: vec![0.0f32; CHUNK_SIZE], - } + SpeakerStream { + sample_queue, + waker_state, + capture_thread: Some(capture_thread), + chunk_capacity: CHUNK_SIZE, + } @@ -pub struct SpeakerStream { - sample_queue: Arc<Mutex<VecDeque<f32>>>, - waker_state: Arc<Mutex<WakerState>>, - capture_thread: Option<thread::JoinHandle<()>>, - read_buffer: Vec<f32>, -} +pub struct SpeakerStream { + sample_queue: Arc<Mutex<VecDeque<f32>>>, + waker_state: Arc<Mutex<WakerState>>, + capture_thread: Option<thread::JoinHandle<()>>, + chunk_capacity: usize, +} @@ - let mut queue = self.sample_queue.lock().unwrap(); - if !queue.is_empty() { - let chunk_len = queue.len().min(self.read_buffer.len()); - for i in 0..chunk_len { - self.read_buffer[i] = queue.pop_front().unwrap(); - } - return Poll::Ready(Some(self.read_buffer[..chunk_len].to_vec())); - } + let mut queue = self.sample_queue.lock().unwrap(); + if !queue.is_empty() { + let chunk_len = queue.len().min(self.chunk_capacity); + let chunk: Vec<f32> = queue.drain(..chunk_len).collect(); + return Poll::Ready(Some(chunk)); + } @@ - let mut queue = self.sample_queue.lock().unwrap(); - if !queue.is_empty() { - let chunk_len = queue.len().min(self.read_buffer.len()); - for i in 0..chunk_len { - self.read_buffer[i] = queue.pop_front().unwrap(); - } - Poll::Ready(Some(self.read_buffer[..chunk_len].to_vec())) - } else { - Poll::Pending - } + let mut queue = self.sample_queue.lock().unwrap(); + if !queue.is_empty() { + let chunk_len = queue.len().min(self.chunk_capacity); + let chunk: Vec<f32> = queue.drain(..chunk_len).collect(); + Poll::Ready(Some(chunk)) + } else { + Poll::Pending + }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (18)
Cargo.toml(1 hunks)crates/audio-utils/Cargo.toml(1 hunks)crates/audio-utils/src/error.rs(0 hunks)crates/audio-utils/src/lib.rs(1 hunks)crates/audio-utils/src/resampler/driver.rs(1 hunks)crates/audio-utils/src/resampler/dynamic_new.rs(1 hunks)crates/audio-utils/src/resampler/dynamic_old.rs(1 hunks)crates/audio-utils/src/resampler/mod.rs(1 hunks)crates/audio-utils/src/resampler/static_new.rs(1 hunks)crates/audio/src/lib.rs(0 hunks)crates/audio/src/resampler.rs(0 hunks)crates/audio/src/speaker/macos.rs(8 hunks)crates/audio/src/speaker/mod.rs(3 hunks)crates/audio/src/speaker/windows.rs(5 hunks)crates/ws-utils/src/lib.rs(3 hunks)plugins/listener/src/actors/source.rs(3 hunks)plugins/local-stt/src/server/external.rs(1 hunks)plugins/local-stt/src/server/supervisor.rs(2 hunks)
💤 Files with no reviewable changes (3)
- crates/audio-utils/src/error.rs
- crates/audio/src/lib.rs
- crates/audio/src/resampler.rs
🧰 Additional context used
🧬 Code graph analysis (10)
plugins/listener/src/actors/source.rs (1)
crates/audio/src/lib.rs (2)
is_using_headphone(210-223)from_speaker(128-135)
crates/audio-utils/src/resampler/dynamic_old.rs (1)
crates/audio-utils/src/resampler/mod.rs (6)
ResamplerDynamicOld(142-143)new(39-47)source(25-25)poll_next(71-101)as_stream(51-53)sample_rate(55-61)
crates/audio/src/speaker/macos.rs (2)
crates/audio/src/speaker/mod.rs (1)
poll_next(83-116)crates/audio/src/speaker/windows.rs (2)
poll_next(190-234)drop(174-185)
plugins/local-stt/src/server/external.rs (1)
owhisper/owhisper-interface/src/stream.rs (1)
text(141-148)
crates/audio-utils/src/resampler/dynamic_new.rs (3)
crates/audio-utils/src/resampler/static_new.rs (3)
resampled_chunks(11-17)new(35-47)poll_next(92-128)crates/audio-utils/src/resampler/driver.rs (1)
new(20-35)crates/audio-utils/src/resampler/dynamic_old.rs (2)
new(22-34)poll_next(53-88)
crates/audio-utils/src/resampler/static_new.rs (2)
crates/audio-utils/src/resampler/dynamic_new.rs (3)
resampled_chunks(10-16)new(38-57)poll_next(115-178)crates/audio-utils/src/resampler/driver.rs (1)
new(20-35)
crates/audio/src/speaker/windows.rs (2)
crates/audio/src/speaker/macos.rs (2)
poll_next(283-301)std(224-224)crates/audio/src/speaker/mod.rs (1)
poll_next(83-116)
crates/ws-utils/src/lib.rs (1)
crates/whisper-local/src/stream.rs (4)
poll_next(95-128)poll_next(138-173)samples(21-21)samples(32-34)
crates/audio-utils/src/resampler/driver.rs (1)
crates/audio-utils/src/resampler/dynamic_new.rs (1)
new(38-57)
crates/audio-utils/src/resampler/mod.rs (4)
crates/audio-utils/src/resampler/dynamic_new.rs (2)
new(38-57)poll_next(115-178)crates/audio-utils/src/resampler/static_new.rs (3)
new(35-47)poll_next(92-128)finalize(70-83)crates/audio-utils/src/resampler/driver.rs (1)
new(20-35)crates/audio-utils/src/resampler/dynamic_old.rs (4)
new(22-34)sample_rate(96-98)as_stream(92-94)poll_next(53-88)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: ci (macos, macos-14)
- GitHub Check: fmt
🔇 Additional comments (20)
plugins/local-stt/src/server/supervisor.rs (2)
22-24: Verify increased restart tolerance is intentional and monitored.The supervisor now allows 15 restarts within 30 seconds (previously 5 in 5 seconds), with a 60-second reset window. While this provides more resilience during the audio pipeline refactoring, ensure that:
- The increased tolerance is intentional and not masking underlying stability issues
- Restart rates are monitored to catch any unexpected actor failures
- These values are appropriate for production or will be tuned based on observed behavior
126-131: LGTM: Increased shutdown wait time.The extended polling window (up to 5 seconds vs. 2 seconds previously) gives actors more time to drain buffers and complete processing, which aligns with the chunked audio processing model introduced in this PR.
plugins/local-stt/src/server/external.rs (1)
130-146: Change is appropriate—filter correctly targets log noise without suppressing diagnostics.The filter
!text.contains("text:")is safely scoped. Error and termination events bypass the filter chain entirely and are always logged viatracing::error!()(lines 154–157). The filter only affects normal stdout/stderr in the event loop, targeting interim transcription updates that produce log noise. Since whisper.cpp error messages (UTF-8 validation, model loading, OOM, audio decode failures) don't typically contain the literal substring "text:", important diagnostics remain visible.crates/audio-utils/Cargo.toml (1)
12-20: Dependency additions align with the new resampler work.dasp powers the legacy dynamic resampler and the hypr-data/tokio dev-deps cover the new async tests. No issues from my side.
Cargo.toml (1)
212-212: Thanks for movingcidreonto the published release.Pinning to 0.11.4 gets us off the git SHA and keeps workspace resolution deterministic. Looks good.
crates/audio-utils/src/lib.rs (1)
8-13: Publicly exposingresamplerlooks correct.The new module wiring keeps the API consistent with downstream imports; no concerns here.
crates/audio-utils/src/resampler/static_new.rs (3)
10-20: LGTM! Clean extension trait pattern.The blanket implementation makes the
resampled_chunksmethod discoverable on all AsyncSource types, providing a ergonomic API for consumers.
70-83: LGTM! Proper EOS finalization.The finalize method correctly handles end-of-stream by processing remaining blocks and zero-padding partial data. The idempotent guard prevents double-processing.
86-129: LGTM! Clean Stream implementation.The polling loop correctly orchestrates chunk production, block processing, and source consumption. Error propagation and backpressure handling are properly implemented.
crates/audio-utils/src/resampler/dynamic_old.rs (3)
36-47: Rate change handling introduces small discontinuity.Resetting
phase = 0.0on rate change causes a small discontinuity in the output stream. The approach of reinitializing the interpolator withlast_sampleminimizes this, but there will still be a brief artifact. This is acceptable for simple linear interpolation but worth noting.
50-89: LGTM! Correct linear interpolation resampling.The phase-based resampling logic correctly:
- Seeds the interpolator on first sample
- Consumes input samples when phase crosses integer boundaries
- Interpolates at fractional phase positions
- Handles async polling properly
91-99: LGTM! Proper AsyncSource trait implementation.crates/ws-utils/src/lib.rs (2)
176-207: LGTM! Consistent buffering pattern.The ChannelAudioSource uses the same buffering approach as WebSocketAudioSource, ensuring consistent behavior across the codebase.
148-156: LGTM! Proper AsyncSource trait implementations.Both WebSocketAudioSource and ChannelAudioSource correctly implement AsyncSource by returning themselves as streams, leveraging their Stream implementations.
crates/audio-utils/src/resampler/driver.rs (4)
17-35: LGTM! Proper buffer initialization.The constructor correctly allocates buffers using Rubato's allocation methods and calculates appropriate capacities for the input/output queues.
37-65: LGTM! Clean output queue management.The output methods provide clear, composable operations for consuming resampled data in both complete chunks and partial/final output.
77-135: LGTM! Well-structured processing methods.The three processing methods provide flexible control over how input is consumed:
process_all_ready_blocks: Batch processing for efficiencyprocess_one_block: Single-block processing for fine controlprocess_partial_block: Handles EOS with zero-padding
145-172: Input queue cleared on resampler rebind - verify this is intentional.Line 171 clears the input queue when rebinding the resampler. This will discard any buffered input samples, potentially causing a brief audio gap during dynamic rate changes.
If this is intentional to prevent mixing samples from different rate contexts, consider documenting this trade-off. If not, you might want to process or drain remaining input before rebinding.
Based on the usage in
dynamic_new.rs, the caller drains before rebinding:fn drain_for_rate_change(&mut self) -> Result<bool, crate::Error> { self.driver.process_all_ready_blocks()?; if self.driver.has_input() { self.driver.process_partial_block(true)?; } Ok(self.driver.output_is_empty()) }So this appears intentional, but documenting it would help future maintainers.
crates/audio-utils/src/resampler/dynamic_new.rs (2)
9-32: LGTM! Well-designed dynamic resampler structure.The extension trait provides a discoverable API, and the struct contains all necessary state for tracking rate changes and stream completion.
68-92: LGTM! Well-structured draining logic.The drain methods correctly handle different scenarios:
try_yield_chunk: Adapts behavior based on draining statedrain_for_rate_change: Processes remaining input before rate changedrain_at_eos: Ensures all input is processed at stream end
No description provided.