diff --git a/CHANGELOG.md b/CHANGELOG.md index bc618af45..023ce286f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `ErrorKind::DeviceBusy` for retryable device access errors (e.g. EBUSY, EAGAIN). - `ErrorKind::PermissionDenied` for OS-level access denials. +- `ErrorKind::ThreadPriorityUnavailable` for when a thread priority request is not granted. - `StreamConfig` now implements `Copy`. - `StreamTrait::buffer_size()` to query the stream's current buffer size in frames per callback. - `HostTrait::device_by_id()` is now dispatched to each backend's implementation, allowing diff --git a/src/error.rs b/src/error.rs index 848e7c504..343478239 100644 --- a/src/error.rs +++ b/src/error.rs @@ -56,6 +56,10 @@ pub enum ErrorKind { /// A buffer underrun or overrun occurred, causing a potential audio glitch. Xrun, + /// The requested thread priority is unavailable for the audio callback thread. + /// Audio will still play, but may be subject to increased latency or glitches under load. + ThreadPriorityUnavailable, + /// A catch-all for errors that do not fall under any other CPAL error kind. /// /// CPAL itself emits this variant only for genuinely unclassifiable conditions. Treat them as @@ -90,6 +94,10 @@ impl Display for ErrorKind { Self::PermissionDenied => f.write_str( "Permission denied. Grant the required access and retry.", ), + Self::ThreadPriorityUnavailable => f.write_str( + "Thread priority elevation is unavailable for the audio thread. \ + Audio may be subject to increased latency or glitches under load.", + ), Self::Other => f.write_str("An error occurred."), } } @@ -147,6 +155,29 @@ impl From for Error { } } +#[cfg(all( + feature = "audio_thread_priority", + any( + target_os = "windows", + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" + ) +))] +impl From for Error { + fn from(err: audio_thread_priority::AudioThreadPriorityError) -> Self { + use std::error::Error as StdError; + let msg = match err.source() { + Some(inner) => { + format!("Failed to promote audio thread to real-time priority: {err}: {inner}") + } + None => format!("Failed to promote audio thread to real-time priority: {err}"), + }; + Error::with_message(ErrorKind::ThreadPriorityUnavailable, msg) + } +} + /// Extension trait for attaching a context message to a [`Result`] whose error converts into /// [`cpal::Error`]. #[allow(dead_code)] diff --git a/src/host/alsa/mod.rs b/src/host/alsa/mod.rs index 1c7749cce..ab927c0db 100644 --- a/src/host/alsa/mod.rs +++ b/src/host/alsa/mod.rs @@ -904,7 +904,7 @@ fn input_stream_worker( timeout: Option, ) { #[cfg(feature = "audio_thread_priority")] - if let Err(err) = boost_current_thread_priority(&stream.handle, stream.sample_rate) { + if let Err(err) = boost_current_thread_priority(stream) { error_callback(err); } @@ -955,7 +955,7 @@ fn output_stream_worker( timeout: Option, ) { #[cfg(feature = "audio_thread_priority")] - if let Err(err) = boost_current_thread_priority(&stream.handle, stream.sample_rate) { + if let Err(err) = boost_current_thread_priority(stream) { error_callback(err); } @@ -1002,15 +1002,11 @@ fn output_stream_worker( #[cfg(feature = "audio_thread_priority")] fn boost_current_thread_priority( - handle: &alsa::pcm::PCM, - sample_rate: SampleRate, + stream: &StreamInner, ) -> Result { - use audio_thread_priority::promote_current_thread_to_real_time; - - // if the buffer size isn't known, let audio_thread_priority choose a sensible default value - let (buffer_size, _) = handle.get_params().unwrap_or((0, 0)); - let buffer_size = u32::try_from(buffer_size).unwrap_or(0); - promote_current_thread_to_real_time(buffer_size, sample_rate).map_err(Error::from) + let period_frames = u32::try_from(stream.period_size).unwrap_or(0); + audio_thread_priority::promote_current_thread_to_real_time(period_frames, stream.sample_rate) + .map_err(Error::from) } /// Attempt hardware resume from a suspend event (`ESTRPIPE`). @@ -1595,20 +1591,6 @@ fn canonical_pcm_id(pcm_id: &str) -> String { pcm_id.to_owned() } -#[cfg(feature = "audio_thread_priority")] -impl From for Error { - fn from(err: audio_thread_priority::AudioThreadPriorityError) -> Self { - use std::error::Error as StdError; - let msg = match err.source() { - Some(inner) => { - format!("Failed to promote audio thread to real-time priority: {err}: {inner}") - } - None => format!("Failed to promote audio thread to real-time priority: {err}"), - }; - Error::with_message(ErrorKind::Other, msg) - } -} - impl From for Error { fn from(err: alsa::Error) -> Self { match err.errno() { diff --git a/src/host/wasapi/stream.rs b/src/host/wasapi/stream.rs index 7b72a481b..bf23686cc 100644 --- a/src/host/wasapi/stream.rs +++ b/src/host/wasapi/stream.rs @@ -1,5 +1,7 @@ use std::{ - mem, ptr, + mem, + ops::ControlFlow, + ptr, sync::mpsc::{channel, Receiver, SendError, Sender}, thread::{self, JoinHandle}, time::Duration, @@ -14,7 +16,7 @@ use windows::Win32::{ use crate::{ host::{equilibrium::fill_equilibrium, frames_to_duration}, traits::StreamTrait, - BufferSize, Data, Error, ErrorKind, FrameCount, InputCallbackInfo, InputStreamTimestamp, + Data, Error, ErrorKind, FrameCount, InputCallbackInfo, InputStreamTimestamp, OutputCallbackInfo, OutputStreamTimestamp, ResultExt, SampleFormat, SampleRate, StreamConfig, StreamInstant, }; @@ -355,16 +357,18 @@ fn run_input( data_callback: &mut dyn FnMut(&Data, &InputCallbackInfo), error_callback: &mut dyn FnMut(Error), ) { - boost_current_thread_priority( - run_ctxt.stream.config.buffer_size, + if let Err(err) = boost_current_thread_priority( + run_ctxt.stream.period_frames, run_ctxt.stream.config.sample_rate, - ); + ) { + error_callback(err); + } loop { match process_commands_and_await_signal(&mut run_ctxt, error_callback) { - Some(ControlFlow::Break) => break, - Some(ControlFlow::Continue) => continue, - None => (), + ControlFlow::Break(()) => break, + ControlFlow::Continue(false) => continue, + ControlFlow::Continue(true) => {} } let capture_client = match run_ctxt.stream.client_flow { AudioClientFlow::Capture { ref capture_client } => capture_client.clone(), @@ -376,8 +380,8 @@ fn run_input( data_callback, error_callback, ) { - ControlFlow::Break => break, - ControlFlow::Continue => continue, + ControlFlow::Break(_) => break, + ControlFlow::Continue(_) => continue, } } } @@ -387,16 +391,18 @@ fn run_output( data_callback: &mut dyn FnMut(&mut Data, &OutputCallbackInfo), error_callback: &mut dyn FnMut(Error), ) { - boost_current_thread_priority( - run_ctxt.stream.config.buffer_size, + if let Err(err) = boost_current_thread_priority( + run_ctxt.stream.period_frames, run_ctxt.stream.config.sample_rate, - ); + ) { + error_callback(err); + } loop { match process_commands_and_await_signal(&mut run_ctxt, error_callback) { - Some(ControlFlow::Break) => break, - Some(ControlFlow::Continue) => continue, - None => (), + ControlFlow::Break(()) => break, + ControlFlow::Continue(false) => continue, + ControlFlow::Continue(true) => {} } let render_client = match run_ctxt.stream.client_flow { AudioClientFlow::Render { ref render_client } => render_client.clone(), @@ -408,54 +414,42 @@ fn run_output( data_callback, error_callback, ) { - ControlFlow::Break => break, - ControlFlow::Continue => continue, + ControlFlow::Break(_) => break, + ControlFlow::Continue(_) => continue, } } } #[cfg(feature = "audio_thread_priority")] -fn boost_current_thread_priority(buffer_size: BufferSize, sample_rate: SampleRate) { - use audio_thread_priority::promote_current_thread_to_real_time; - - let buffer_size = if let BufferSize::Fixed(buffer_size) = buffer_size { - buffer_size - } else { - // if the buffer size isn't fixed, let audio_thread_priority choose a sensible default value - 0 - }; - - if let Err(err) = promote_current_thread_to_real_time(buffer_size, sample_rate) { - eprintln!("Failed to promote audio thread to real-time priority: {err}"); - } +fn boost_current_thread_priority( + period_frames: FrameCount, + sample_rate: SampleRate, +) -> Result<(), Error> { + audio_thread_priority::promote_current_thread_to_real_time(period_frames, sample_rate) + .map(|_| ()) + .map_err(Error::from) } #[cfg(not(feature = "audio_thread_priority"))] -fn boost_current_thread_priority(_: BufferSize, _: SampleRate) { +fn boost_current_thread_priority(_: FrameCount, _: SampleRate) -> Result<(), Error> { unsafe { let thread_handle = Threading::GetCurrentThread(); - - let _ = - Threading::SetThreadPriority(thread_handle, Threading::THREAD_PRIORITY_TIME_CRITICAL); + Threading::SetThreadPriority(thread_handle, Threading::THREAD_PRIORITY_TIME_CRITICAL) + .context("Failed to promote audio thread to real-time priority") } } -enum ControlFlow { - Break, - Continue, -} - fn process_commands_and_await_signal( run_context: &mut RunContext, error_callback: &mut dyn FnMut(Error), -) -> Option { +) -> ControlFlow<(), bool> { // Process queued commands. match process_commands(run_context) { Ok(true) => (), - Ok(false) => return Some(ControlFlow::Break), + Ok(false) => return ControlFlow::Break(()), Err(err) => { error_callback(err); - return Some(ControlFlow::Break); + return ControlFlow::Break(()); } }; @@ -464,17 +458,15 @@ fn process_commands_and_await_signal( Ok(idx) => idx, Err(err) => { error_callback(err); - return Some(ControlFlow::Break); + return ControlFlow::Break(()); } }; // If `handle_idx` is 0, then it's `pending_scheduled_event` that was signalled in // order for us to pick up the pending commands. Otherwise, a stream needs data. - if handle_idx == 0 { - return Some(ControlFlow::Continue); - } - - None + // Continue(true) = audio event fired, proceed to process audio this iteration. + // Continue(false) = command event fired, loop around and wait again. + ControlFlow::Continue(handle_idx != 0) } // The loop for processing pending input data. @@ -483,18 +475,18 @@ fn process_input( capture_client: Audio::IAudioCaptureClient, data_callback: &mut dyn FnMut(&Data, &InputCallbackInfo), error_callback: &mut dyn FnMut(Error), -) -> ControlFlow { +) -> ControlFlow<()> { unsafe { // Get the available data in the shared buffer. let mut buffer: *mut u8 = ptr::null_mut(); let mut flags = mem::MaybeUninit::uninit(); loop { let mut frames_available = match capture_client.GetNextPacketSize() { - Ok(0) => return ControlFlow::Continue, + Ok(0) => return ControlFlow::Continue(()), Ok(f) => f, Err(err) => { error_callback(Error::from(err)); - return ControlFlow::Break; + return ControlFlow::Break(()); } }; let mut qpc_position: u64 = 0; @@ -511,7 +503,7 @@ fn process_input( Err(e) if e.code() == Audio::AUDCLNT_S_BUFFER_EMPTY => continue, Err(e) => { error_callback(Error::from(e)); - return ControlFlow::Break; + return ControlFlow::Break(()); } Ok(_) => (), } @@ -528,7 +520,7 @@ fn process_input( Ok(ts) => ts, Err(err) => { error_callback(err); - return ControlFlow::Break; + return ControlFlow::Break(()); } }; let info = InputCallbackInfo { timestamp }; @@ -540,7 +532,7 @@ fn process_input( .context("failed to release capture buffer"); if let Err(err) = result { error_callback(err); - return ControlFlow::Break; + return ControlFlow::Break(()); } } } @@ -552,14 +544,14 @@ fn process_output( render_client: Audio::IAudioRenderClient, data_callback: &mut dyn FnMut(&mut Data, &OutputCallbackInfo), error_callback: &mut dyn FnMut(Error), -) -> ControlFlow { +) -> ControlFlow<()> { // The number of frames available for writing. let frames_available = match get_available_frames(stream) { - Ok(0) => return ControlFlow::Continue, // TODO: Can this happen? + Ok(0) => return ControlFlow::Continue(()), // TODO: Can this happen? Ok(n) => n, Err(err) => { error_callback(err); - return ControlFlow::Break; + return ControlFlow::Break(()); } }; @@ -568,7 +560,7 @@ fn process_output( Ok(b) => b, Err(e) => { error_callback(Error::from(e)); - return ControlFlow::Break; + return ControlFlow::Break(()); } }; @@ -586,7 +578,7 @@ fn process_output( Ok(ts) => ts, Err(err) => { error_callback(err); - return ControlFlow::Break; + return ControlFlow::Break(()); } }; let info = OutputCallbackInfo { timestamp }; @@ -594,11 +586,11 @@ fn process_output( if let Err(err) = render_client.ReleaseBuffer(frames_available, 0) { error_callback(err.into()); - return ControlFlow::Break; + return ControlFlow::Break(()); } } - ControlFlow::Continue + ControlFlow::Continue(()) } /// Use the stream's `IAudioClock` to produce the current stream instant.