Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `ErrorKind::DeviceBusy` for retryable device access errors (e.g. EBUSY, EAGAIN).
- `ErrorKind::PermissionDenied` for OS-level access denials.
- `ErrorKind::ThreadPriorityUnavailable` for when a thread priority request is not granted.
- `StreamConfig` now implements `Copy`.
- `StreamTrait::buffer_size()` to query the stream's current buffer size in frames per callback.
- `HostTrait::device_by_id()` is now dispatched to each backend's implementation, allowing
Expand Down
31 changes: 31 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ pub enum ErrorKind {
/// A buffer underrun or overrun occurred, causing a potential audio glitch.
Xrun,

/// The requested thread priority is unavailable for the audio callback thread.
/// Audio will still play, but may be subject to increased latency or glitches under load.
ThreadPriorityUnavailable,
Comment thread
roderickvd marked this conversation as resolved.

/// A catch-all for errors that do not fall under any other CPAL error kind.
///
/// CPAL itself emits this variant only for genuinely unclassifiable conditions. Treat them as
Expand Down Expand Up @@ -90,6 +94,10 @@ impl Display for ErrorKind {
Self::PermissionDenied => f.write_str(
"Permission denied. Grant the required access and retry.",
),
Self::ThreadPriorityUnavailable => f.write_str(
"Thread priority elevation is unavailable for the audio thread. \
Audio may be subject to increased latency or glitches under load.",
),
Self::Other => f.write_str("An error occurred."),
}
}
Expand Down Expand Up @@ -147,6 +155,29 @@ impl From<ErrorKind> for Error {
}
}

#[cfg(all(
feature = "audio_thread_priority",
any(
target_os = "windows",
target_os = "linux",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "netbsd"
)
))]
impl From<audio_thread_priority::AudioThreadPriorityError> for Error {
fn from(err: audio_thread_priority::AudioThreadPriorityError) -> Self {
use std::error::Error as StdError;
let msg = match err.source() {
Some(inner) => {
format!("Failed to promote audio thread to real-time priority: {err}: {inner}")
}
None => format!("Failed to promote audio thread to real-time priority: {err}"),
};
Error::with_message(ErrorKind::ThreadPriorityUnavailable, msg)
}
}

/// Extension trait for attaching a context message to a [`Result`] whose error converts into
/// [`cpal::Error`].
#[allow(dead_code)]
Expand Down
30 changes: 6 additions & 24 deletions src/host/alsa/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ fn input_stream_worker(
timeout: Option<Duration>,
) {
#[cfg(feature = "audio_thread_priority")]
if let Err(err) = boost_current_thread_priority(&stream.handle, stream.sample_rate) {
if let Err(err) = boost_current_thread_priority(stream) {
error_callback(err);
}

Expand Down Expand Up @@ -955,7 +955,7 @@ fn output_stream_worker(
timeout: Option<Duration>,
) {
#[cfg(feature = "audio_thread_priority")]
if let Err(err) = boost_current_thread_priority(&stream.handle, stream.sample_rate) {
if let Err(err) = boost_current_thread_priority(stream) {
error_callback(err);
}

Expand Down Expand Up @@ -1002,15 +1002,11 @@ fn output_stream_worker(

#[cfg(feature = "audio_thread_priority")]
fn boost_current_thread_priority(
handle: &alsa::pcm::PCM,
sample_rate: SampleRate,
stream: &StreamInner,
) -> Result<audio_thread_priority::RtPriorityHandle, Error> {
use audio_thread_priority::promote_current_thread_to_real_time;

// if the buffer size isn't known, let audio_thread_priority choose a sensible default value
let (buffer_size, _) = handle.get_params().unwrap_or((0, 0));
let buffer_size = u32::try_from(buffer_size).unwrap_or(0);
promote_current_thread_to_real_time(buffer_size, sample_rate).map_err(Error::from)
let period_frames = u32::try_from(stream.period_size).unwrap_or(0);
audio_thread_priority::promote_current_thread_to_real_time(period_frames, stream.sample_rate)
.map_err(Error::from)
}

/// Attempt hardware resume from a suspend event (`ESTRPIPE`).
Expand Down Expand Up @@ -1595,20 +1591,6 @@ fn canonical_pcm_id(pcm_id: &str) -> String {
pcm_id.to_owned()
}

#[cfg(feature = "audio_thread_priority")]
impl From<audio_thread_priority::AudioThreadPriorityError> for Error {
fn from(err: audio_thread_priority::AudioThreadPriorityError) -> Self {
use std::error::Error as StdError;
let msg = match err.source() {
Some(inner) => {
format!("Failed to promote audio thread to real-time priority: {err}: {inner}")
}
None => format!("Failed to promote audio thread to real-time priority: {err}"),
};
Error::with_message(ErrorKind::Other, msg)
}
}

impl From<alsa::Error> for Error {
fn from(err: alsa::Error) -> Self {
match err.errno() {
Expand Down
116 changes: 54 additions & 62 deletions src/host/wasapi/stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{
mem, ptr,
mem,
ops::ControlFlow,
ptr,
sync::mpsc::{channel, Receiver, SendError, Sender},
thread::{self, JoinHandle},
time::Duration,
Expand All @@ -14,7 +16,7 @@ use windows::Win32::{
use crate::{
host::{equilibrium::fill_equilibrium, frames_to_duration},
traits::StreamTrait,
BufferSize, Data, Error, ErrorKind, FrameCount, InputCallbackInfo, InputStreamTimestamp,
Data, Error, ErrorKind, FrameCount, InputCallbackInfo, InputStreamTimestamp,
OutputCallbackInfo, OutputStreamTimestamp, ResultExt, SampleFormat, SampleRate, StreamConfig,
StreamInstant,
};
Expand Down Expand Up @@ -355,16 +357,18 @@ fn run_input(
data_callback: &mut dyn FnMut(&Data, &InputCallbackInfo),
error_callback: &mut dyn FnMut(Error),
) {
boost_current_thread_priority(
run_ctxt.stream.config.buffer_size,
if let Err(err) = boost_current_thread_priority(
run_ctxt.stream.period_frames,
run_ctxt.stream.config.sample_rate,
);
) {
error_callback(err);
}

loop {
match process_commands_and_await_signal(&mut run_ctxt, error_callback) {
Some(ControlFlow::Break) => break,
Some(ControlFlow::Continue) => continue,
None => (),
ControlFlow::Break(()) => break,
ControlFlow::Continue(false) => continue,
ControlFlow::Continue(true) => {}
}
Comment thread
roderickvd marked this conversation as resolved.
let capture_client = match run_ctxt.stream.client_flow {
AudioClientFlow::Capture { ref capture_client } => capture_client.clone(),
Expand All @@ -376,8 +380,8 @@ fn run_input(
data_callback,
error_callback,
) {
ControlFlow::Break => break,
ControlFlow::Continue => continue,
ControlFlow::Break(_) => break,
ControlFlow::Continue(_) => continue,
}
}
}
Expand All @@ -387,16 +391,18 @@ fn run_output(
data_callback: &mut dyn FnMut(&mut Data, &OutputCallbackInfo),
error_callback: &mut dyn FnMut(Error),
) {
boost_current_thread_priority(
run_ctxt.stream.config.buffer_size,
if let Err(err) = boost_current_thread_priority(
run_ctxt.stream.period_frames,
run_ctxt.stream.config.sample_rate,
);
) {
error_callback(err);
}

loop {
match process_commands_and_await_signal(&mut run_ctxt, error_callback) {
Some(ControlFlow::Break) => break,
Some(ControlFlow::Continue) => continue,
None => (),
ControlFlow::Break(()) => break,
ControlFlow::Continue(false) => continue,
ControlFlow::Continue(true) => {}
}
let render_client = match run_ctxt.stream.client_flow {
AudioClientFlow::Render { ref render_client } => render_client.clone(),
Expand All @@ -408,54 +414,42 @@ fn run_output(
data_callback,
error_callback,
) {
ControlFlow::Break => break,
ControlFlow::Continue => continue,
ControlFlow::Break(_) => break,
ControlFlow::Continue(_) => continue,
}
}
}

#[cfg(feature = "audio_thread_priority")]
fn boost_current_thread_priority(buffer_size: BufferSize, sample_rate: SampleRate) {
use audio_thread_priority::promote_current_thread_to_real_time;

let buffer_size = if let BufferSize::Fixed(buffer_size) = buffer_size {
buffer_size
} else {
// if the buffer size isn't fixed, let audio_thread_priority choose a sensible default value
0
};

if let Err(err) = promote_current_thread_to_real_time(buffer_size, sample_rate) {
eprintln!("Failed to promote audio thread to real-time priority: {err}");
}
fn boost_current_thread_priority(
period_frames: FrameCount,
sample_rate: SampleRate,
) -> Result<(), Error> {
audio_thread_priority::promote_current_thread_to_real_time(period_frames, sample_rate)
.map(|_| ())
.map_err(Error::from)
}

#[cfg(not(feature = "audio_thread_priority"))]
fn boost_current_thread_priority(_: BufferSize, _: SampleRate) {
fn boost_current_thread_priority(_: FrameCount, _: SampleRate) -> Result<(), Error> {
unsafe {
let thread_handle = Threading::GetCurrentThread();

let _ =
Threading::SetThreadPriority(thread_handle, Threading::THREAD_PRIORITY_TIME_CRITICAL);
Threading::SetThreadPriority(thread_handle, Threading::THREAD_PRIORITY_TIME_CRITICAL)
.context("Failed to promote audio thread to real-time priority")
}
}

enum ControlFlow {
Break,
Continue,
}

fn process_commands_and_await_signal(
run_context: &mut RunContext,
error_callback: &mut dyn FnMut(Error),
) -> Option<ControlFlow> {
) -> ControlFlow<(), bool> {
// Process queued commands.
match process_commands(run_context) {
Ok(true) => (),
Ok(false) => return Some(ControlFlow::Break),
Ok(false) => return ControlFlow::Break(()),
Err(err) => {
error_callback(err);
return Some(ControlFlow::Break);
return ControlFlow::Break(());
}
};

Expand All @@ -464,17 +458,15 @@ fn process_commands_and_await_signal(
Ok(idx) => idx,
Err(err) => {
error_callback(err);
return Some(ControlFlow::Break);
return ControlFlow::Break(());
}
};

// If `handle_idx` is 0, then it's `pending_scheduled_event` that was signalled in
// order for us to pick up the pending commands. Otherwise, a stream needs data.
if handle_idx == 0 {
return Some(ControlFlow::Continue);
}

None
// Continue(true) = audio event fired, proceed to process audio this iteration.
// Continue(false) = command event fired, loop around and wait again.
ControlFlow::Continue(handle_idx != 0)
}

// The loop for processing pending input data.
Expand All @@ -483,18 +475,18 @@ fn process_input(
capture_client: Audio::IAudioCaptureClient,
data_callback: &mut dyn FnMut(&Data, &InputCallbackInfo),
error_callback: &mut dyn FnMut(Error),
) -> ControlFlow {
) -> ControlFlow<()> {
unsafe {
// Get the available data in the shared buffer.
let mut buffer: *mut u8 = ptr::null_mut();
let mut flags = mem::MaybeUninit::uninit();
loop {
let mut frames_available = match capture_client.GetNextPacketSize() {
Ok(0) => return ControlFlow::Continue,
Ok(0) => return ControlFlow::Continue(()),
Ok(f) => f,
Err(err) => {
error_callback(Error::from(err));
return ControlFlow::Break;
return ControlFlow::Break(());
}
};
let mut qpc_position: u64 = 0;
Expand All @@ -511,7 +503,7 @@ fn process_input(
Err(e) if e.code() == Audio::AUDCLNT_S_BUFFER_EMPTY => continue,
Err(e) => {
error_callback(Error::from(e));
return ControlFlow::Break;
return ControlFlow::Break(());
}
Ok(_) => (),
}
Expand All @@ -528,7 +520,7 @@ fn process_input(
Ok(ts) => ts,
Err(err) => {
error_callback(err);
return ControlFlow::Break;
return ControlFlow::Break(());
}
};
let info = InputCallbackInfo { timestamp };
Expand All @@ -540,7 +532,7 @@ fn process_input(
.context("failed to release capture buffer");
if let Err(err) = result {
error_callback(err);
return ControlFlow::Break;
return ControlFlow::Break(());
}
}
}
Expand All @@ -552,14 +544,14 @@ fn process_output(
render_client: Audio::IAudioRenderClient,
data_callback: &mut dyn FnMut(&mut Data, &OutputCallbackInfo),
error_callback: &mut dyn FnMut(Error),
) -> ControlFlow {
) -> ControlFlow<()> {
// The number of frames available for writing.
let frames_available = match get_available_frames(stream) {
Ok(0) => return ControlFlow::Continue, // TODO: Can this happen?
Ok(0) => return ControlFlow::Continue(()), // TODO: Can this happen?
Ok(n) => n,
Err(err) => {
error_callback(err);
return ControlFlow::Break;
return ControlFlow::Break(());
}
};

Expand All @@ -568,7 +560,7 @@ fn process_output(
Ok(b) => b,
Err(e) => {
error_callback(Error::from(e));
return ControlFlow::Break;
return ControlFlow::Break(());
}
};

Expand All @@ -586,19 +578,19 @@ fn process_output(
Ok(ts) => ts,
Err(err) => {
error_callback(err);
return ControlFlow::Break;
return ControlFlow::Break(());
}
};
let info = OutputCallbackInfo { timestamp };
data_callback(&mut data, &info);

if let Err(err) = render_client.ReleaseBuffer(frames_available, 0) {
error_callback(err.into());
return ControlFlow::Break;
return ControlFlow::Break(());
}
}

ControlFlow::Continue
ControlFlow::Continue(())
}

/// Use the stream's `IAudioClock` to produce the current stream instant.
Expand Down
Loading