Skip to content

Commit

Permalink
Fully initialize the aggregate device before using it.
Browse files Browse the repository at this point in the history
In the current implementation, we assume the aggregate device is created
immediately after calling the system API to create it, and we also
assume the sub-devices of the aggregate device are added immediately
upon we call the system API to set the sub-devices. Unfortunately, these
assumptions are not correct. Occasionally these settings are not
executed immediately, especially when they are not called on the main
thread (e.g., we may create an aggregate device off the main thread when
switching devices). Using the listeners monitoring the devices-changed
events can make sure those settings are done.
  • Loading branch information
ChunMinChang committed Jun 21, 2019
1 parent 1cff8dc commit 96ea69c
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 88 deletions.
170 changes: 161 additions & 9 deletions src/backend/mod.rs
Expand Up @@ -40,8 +40,8 @@ use std::os::raw::{c_char, c_void};
use std::ptr;
use std::slice;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

const NO_ERR: OSStatus = 0;

Expand Down Expand Up @@ -1094,6 +1094,74 @@ fn audiounit_create_blank_aggregate_device(
Ok(())
}

fn audiounit_create_blank_aggregate_device_sync(
plugin_id: &mut AudioObjectID,
aggregate_device_id: &mut AudioDeviceID,
) -> Result<()> {
let waiting_time = Duration::new(5, 0);

let condvar_pair = Arc::new((Mutex::new(Vec::<AudioObjectID>::new()), Condvar::new()));
let mut cloned_condvar_pair = condvar_pair.clone();
let data_ptr = &mut cloned_condvar_pair;

assert_eq!(
audio_object_add_property_listener(
kAudioObjectSystemObject,
&DEVICES_PROPERTY_ADDRESS,
devices_changed_callback,
data_ptr,
),
NO_ERR
);

let _teardown = finally(|| {
assert_eq!(
audio_object_remove_property_listener(
kAudioObjectSystemObject,
&DEVICES_PROPERTY_ADDRESS,
devices_changed_callback,
data_ptr,
),
NO_ERR
);
});

audiounit_create_blank_aggregate_device(plugin_id, aggregate_device_id)?;

// Wait until the aggregate is created.
let &(ref lock, ref cvar) = &*condvar_pair;
let devices = lock.lock().unwrap();
if !devices.contains(aggregate_device_id) {
let (devs, timeout_res) = cvar.wait_timeout(devices, waiting_time).unwrap();
if timeout_res.timed_out() {
cubeb_log!(
"Time out for waiting the creation of aggregate device {}!",
aggregate_device_id
);
}
if !devs.contains(aggregate_device_id) {
return Err(Error::device_unavailable());
}
}

extern "C" fn devices_changed_callback(
id: AudioObjectID,
_number_of_addresses: u32,
_addresses: *const AudioObjectPropertyAddress,
data: *mut c_void,
) -> OSStatus {
assert_eq!(id, kAudioObjectSystemObject);
let pair = unsafe { &mut *(data as *mut Arc<(Mutex<Vec<AudioObjectID>>, Condvar)>) };
let &(ref lock, ref cvar) = &**pair;
let mut devices = lock.lock().unwrap();
*devices = audiounit_get_devices();
cvar.notify_one();
NO_ERR
}

Ok(())
}

fn get_device_name(id: AudioDeviceID) -> CFStringRef {
let mut size = mem::size_of::<CFStringRef>();
let mut uiname: CFStringRef = ptr::null();
Expand Down Expand Up @@ -1180,6 +1248,85 @@ fn audiounit_set_aggregate_sub_device_list(
Ok(())
}

fn audiounit_set_aggregate_sub_device_list_sync(
aggregate_device_id: AudioDeviceID,
input_device_id: AudioDeviceID,
output_device_id: AudioDeviceID,
) -> Result<()> {
let address = AudioObjectPropertyAddress {
mSelector: kAudioAggregateDevicePropertyFullSubDeviceList,
mScope: kAudioObjectPropertyScopeGlobal,
mElement: kAudioObjectPropertyElementMaster,
};

let waiting_time = Duration::new(5, 0);

let condvar_pair = Arc::new((Mutex::new(AudioObjectID::default()), Condvar::new()));
let mut cloned_condvar_pair = condvar_pair.clone();
let data_ptr = &mut cloned_condvar_pair;

assert_eq!(
audio_object_add_property_listener(
aggregate_device_id,
&address,
devices_changed_callback,
data_ptr,
),
NO_ERR
);

let _teardown = finally(|| {
assert_eq!(
audio_object_remove_property_listener(
aggregate_device_id,
&address,
devices_changed_callback,
data_ptr,
),
NO_ERR
);
});

audiounit_set_aggregate_sub_device_list(
aggregate_device_id,
input_device_id,
output_device_id,
)?;

// Wait until the sub devices are added.
let &(ref lock, ref cvar) = &*condvar_pair;
let device = lock.lock().unwrap();
if *device != aggregate_device_id {
let (dev, timeout_res) = cvar.wait_timeout(device, waiting_time).unwrap();
if timeout_res.timed_out() {
cubeb_log!(
"Time out for waiting the devices({}, {}) adding!",
input_device_id,
output_device_id
);
}
if *dev != aggregate_device_id {
return Err(Error::device_unavailable());
}
}

extern "C" fn devices_changed_callback(
id: AudioObjectID,
_number_of_addresses: u32,
_addresses: *const AudioObjectPropertyAddress,
data: *mut c_void,
) -> OSStatus {
let pair = unsafe { &mut *(data as *mut Arc<(Mutex<AudioObjectID>, Condvar)>) };
let &(ref lock, ref cvar) = &**pair;
let mut device = lock.lock().unwrap();
*device = id;
cvar.notify_one();
NO_ERR
}

Ok(())
}

fn audiounit_set_master_aggregate_device(aggregate_device_id: AudioDeviceID) -> Result<()> {
assert_ne!(aggregate_device_id, kAudioObjectUnknown);
let master_aggregate_sub_device = AudioObjectPropertyAddress {
Expand Down Expand Up @@ -1927,9 +2074,7 @@ fn audiounit_device_destroy(device: &mut ffi::cubeb_device_info) {
}
}

fn audiounit_get_devices_of_type(devtype: DeviceType) -> Vec<AudioObjectID> {
assert!(devtype.intersects(DeviceType::INPUT | DeviceType::OUTPUT));

fn audiounit_get_devices() -> Vec<AudioObjectID> {
let mut size: usize = 0;
let mut ret = audio_object_get_property_data_size(
kAudioObjectSystemObject,
Expand All @@ -1950,6 +2095,13 @@ fn audiounit_get_devices_of_type(devtype: DeviceType) -> Vec<AudioObjectID> {
if ret != NO_ERR {
return Vec::new();
}
devices
}

fn audiounit_get_devices_of_type(devtype: DeviceType) -> Vec<AudioObjectID> {
assert!(devtype.intersects(DeviceType::INPUT | DeviceType::OUTPUT));

let mut devices = audiounit_get_devices();

// Remove the aggregate device from the list of devices (if any).
devices.retain(|&device| {
Expand Down Expand Up @@ -3293,7 +3445,7 @@ impl<'ctx> AudioUnitStream<'ctx> {
// [1] https://lists.apple.com/archives/coreaudio-api/2005/Jul/msg00150.html
// [2] CoreAudio.framework/Headers/AudioHardware.h
fn create_aggregate_device(&mut self) -> Result<()> {
if let Err(r) = audiounit_create_blank_aggregate_device(
if let Err(r) = audiounit_create_blank_aggregate_device_sync(
&mut self.plugin_id,
&mut self.aggregate_device_id,
) {
Expand All @@ -3307,7 +3459,7 @@ impl<'ctx> AudioUnitStream<'ctx> {
// The aggregate device may not be created at this point!
// It's better to listen the system devices changing to make sure it's added.

if let Err(r) = audiounit_set_aggregate_sub_device_list(
if let Err(r) = audiounit_set_aggregate_sub_device_list_sync(
self.aggregate_device_id,
self.input_device.id,
self.output_device.id,
Expand Down Expand Up @@ -3374,7 +3526,7 @@ impl<'ctx> AudioUnitStream<'ctx> {
}

fn set_buffer_size(&mut self, new_size_frames: u32, side: io_side) -> Result<()> {
use std::{thread, time};
use std::thread;

assert_ne!(new_size_frames, 0);
let (au, au_scope, au_element) = if side == io_side::INPUT {
Expand Down Expand Up @@ -3465,7 +3617,7 @@ impl<'ctx> AudioUnitStream<'ctx> {
}

let mut count: u32 = 0;
let duration = time::Duration::from_millis(100); // 0.1 sec
let duration = Duration::from_millis(100); // 0.1 sec

while !self.buffer_size_change_state.load(Ordering::SeqCst) && count < 30 {
count += 1;
Expand Down

0 comments on commit 96ea69c

Please sign in to comment.