diff --git a/crates/firewheel-core/src/node.rs b/crates/firewheel-core/src/node.rs index 2357b59e..53e769bf 100644 --- a/crates/firewheel-core/src/node.rs +++ b/crates/firewheel-core/src/node.rs @@ -28,12 +28,13 @@ impl Default for NodeID { /// /// This struct enforces the use of the builder pattern for future-proofness, as /// it is likely that more fields will be added in the future. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug)] pub struct AudioNodeInfo { debug_name: &'static str, channel_config: ChannelConfig, uses_events: bool, call_update_method: bool, + custom_state: Option>, } impl AudioNodeInfo { @@ -47,6 +48,7 @@ impl AudioNodeInfo { }, uses_events: false, call_update_method: false, + custom_state: None, } } @@ -84,15 +86,26 @@ impl AudioNodeInfo { self.call_update_method = call_update_method; self } + + /// Custom `!Send` state that can be stored in the Firewheel context and accessed + /// by the user. + /// + /// The user accesses this state via `FirewheelCtx::node_state` and + /// `FirewheelCtx::node_state_mut`. + pub fn custom_state(mut self, custom_state: T) -> Self { + self.custom_state = Some(Box::new(custom_state)); + self + } } /// Information about an [`AudioNode`]. Used internally by the Firewheel context. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug)] pub struct AudioNodeInfoInner { pub debug_name: &'static str, pub channel_config: ChannelConfig, pub uses_events: bool, pub call_update_method: bool, + pub custom_state: Option>, } impl Into for AudioNodeInfo { @@ -102,15 +115,61 @@ impl Into for AudioNodeInfo { channel_config: self.channel_config, uses_events: self.uses_events, call_update_method: self.call_update_method, + custom_state: self.custom_state, } } } +/// A trait representing a node in a Firewheel audio graph. +/// +/// # Notes about ECS +/// +/// In order to be friendlier to ECS's (entity component systems), it is encouraged +/// that any struct deriving this trait be POD (plain ol' data). If you want your +/// audio node to be usable in the Bevy game engine, also derive +/// `bevy_ecs::prelude::Component`. (You can hide this derive behind a feature flag +/// by using `#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]`). +/// +/// # Audio Node Lifecycle +/// +/// 1. The user constructs the node as POD or from a custom constructor method for +/// that node. +/// 2. The user adds the node to the graph using `FirewheelCtx::add_node`. If the +/// node has any custom configuration, then the user passes that configuration to this +/// method as well. In this method, the Firewheel context calls [`AudioNode::info`] to +/// get information about the node. The node can also store any custom state in the +/// [`AudioNodeInfo`] struct. +/// 3. At this point the user may now call `FirewheelCtx::node_state` and +/// `FirewheelCtx::node_state_mut` to retrieve the node's custom state. +/// 4. If [`AudioNodeInfo::call_update_method`] was set to `true`, then +/// [`AudioNode::update`] will be called every time the Firewheel context updates. +/// The node's custom state is also accessible in this method. +/// 5. When the Firewheel context is ready for the node to start processing data, +/// it calls [`AudioNode::construct_processor`] to retrieve the realtime +/// [`AudioNodeProcessor`] counterpart of the node. This processor counterpart is +/// then sent to the audio thread. +/// 6. The Firewheel processor calls [`AudioNodeProcessor::process`] whenever there +/// is a new block of audio data to process. +/// 7. (Graceful shutdown) +/// +/// 7a. The Firewheel processor calls [`AudioNodeProcessor::stream_stopped`]. +/// The processor is then sent back to the main thread. +/// +/// 7b. If a new audio stream is started, then the context will call +/// [`AudioNodeProcessor::new_stream`] on the main thread, and then send the +/// processor back to the audio thread for processing. +/// +/// 7c. If the Firewheel context is dropped before a new stream is started, then +/// both the node and the processor counterpart are dropped. +/// 8. (Audio thread crashes or stops unexpectedly) - The node's processor counterpart +/// may or may not be dropped. The user may try to create a new audio stream, in which +/// case [`AudioNode::construct_processor`] might be called again. If a second processor +/// instance is not able to be created, then the node may panic. pub trait AudioNode { /// A type representing this constructor's configuration. /// /// This is intended as a one-time configuration to be used - /// when constructing an audio processor. When no configuration + /// when constructing an audio node. When no configuration /// is required, [`EmptyConfig`] should be used. type Configuration: Default; @@ -119,17 +178,20 @@ pub trait AudioNode { /// This method is only called once after the node is added to the audio graph. fn info(&self, configuration: &Self::Configuration) -> AudioNodeInfo; - /// Construct a processor for this node. - fn processor( + /// Construct a realtime processor for this node. + /// + /// * `configuration` - The custom configuration of this node. + /// * `cx` - A context for interacting with the Firewheel context. This context + /// also includes information about the audio stream. + fn construct_processor( &self, configuration: &Self::Configuration, - stream_info: &StreamInfo, + cx: ConstructProcessorContext, ) -> impl AudioNodeProcessor; /// If [`AudioNodeInfo::call_update_method`] was set to `true`, then the Firewheel /// context will call this method on every update cycle. /// - /// * `id` - The ID of this node. /// * `configuration` - The custom configuration of this node. /// * `cx` - A context for interacting with the Firewheel context. fn update(&mut self, configuration: &Self::Configuration, cx: UpdateContext) { @@ -138,6 +200,45 @@ pub trait AudioNode { } } +/// A context for [`AudioNode::construct_processor`]. +pub struct ConstructProcessorContext<'a> { + /// The ID of this audio node. + pub node_id: NodeID, + /// Information about the running audio stream. + pub stream_info: &'a StreamInfo, + custom_state: &'a mut Option>, +} + +impl<'a> ConstructProcessorContext<'a> { + pub fn new( + node_id: NodeID, + stream_info: &'a StreamInfo, + custom_state: &'a mut Option>, + ) -> Self { + Self { + node_id, + stream_info, + custom_state, + } + } + + /// Get an immutable reference to the custom state that was created in + /// [`AudioNodeInfo::custom_state`]. + pub fn custom_state(&self) -> Option<&T> { + self.custom_state + .as_ref() + .and_then(|s| s.downcast_ref::()) + } + + /// Get a mutable reference to the custom state that was created in + /// [`AudioNodeInfo::custom_state`]. + pub fn custom_state_mut(&mut self) -> Option<&mut T> { + self.custom_state + .as_mut() + .and_then(|s| s.downcast_mut::()) + } +} + /// A context for [`AudioNode::update`]. pub struct UpdateContext<'a> { /// The ID of this audio node. @@ -145,9 +246,7 @@ pub struct UpdateContext<'a> { /// Information about the running audio stream. If no audio stream is running, /// then this will be `None`. pub stream_info: Option<&'a StreamInfo>, - /// Custom `!Send` data that can be stored in the Firewheel - /// context. - pub custom_data: &'a mut Option>, + custom_state: &'a mut Option>, event_queue: &'a mut Vec, } @@ -155,13 +254,13 @@ impl<'a> UpdateContext<'a> { pub fn new( node_id: NodeID, stream_info: Option<&'a StreamInfo>, - custom_data: &'a mut Option>, + custom_state: &'a mut Option>, event_queue: &'a mut Vec, ) -> Self { Self { node_id, stream_info, - custom_data, + custom_state, event_queue, } } @@ -173,6 +272,22 @@ impl<'a> UpdateContext<'a> { event, }); } + + /// Get an immutable reference to the custom state that was created in + /// [`AudioNodeInfo::custom_state`]. + pub fn custom_state(&self) -> Option<&T> { + self.custom_state + .as_ref() + .and_then(|s| s.downcast_ref::()) + } + + /// Get a mutable reference to the custom state that was created in + /// [`AudioNodeInfo::custom_state`]. + pub fn custom_state_mut(&mut self) -> Option<&mut T> { + self.custom_state + .as_mut() + .and_then(|s| s.downcast_mut::()) + } } /// An empty constructor configuration. @@ -184,21 +299,23 @@ impl<'a> UpdateContext<'a> { #[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))] pub struct EmptyConfig; -/// A dyn-compatible [`AudioNode`]. +/// A type-erased dyn-compatible [`AudioNode`]. pub trait DynAudioNode { /// Get information about this node. /// /// This method is only called once after the node is added to the audio graph. fn info(&self) -> AudioNodeInfo; - /// Construct a processor for this node. - fn processor(&self, stream_info: &StreamInfo) -> Box; + /// Construct a realtime processor for this node. + /// + /// * `cx` - A context for interacting with the Firewheel context. This context + /// also includes information about the audio stream. + fn construct_processor(&self, cx: ConstructProcessorContext) -> Box; /// If [`AudioNodeInfo::call_update_method`] was set to `true`, then the Firewheel /// context will call this method on every update cycle. /// - /// * `id` - The ID of this node. - /// * `configuration` - The custom configuration of this node. + /// * `cx` - A context for interacting with the Firewheel context. fn update(&mut self, cx: UpdateContext) { let _ = cx; } @@ -226,8 +343,11 @@ impl DynAudioNode for Constructor { self.constructor.info(&self.configuration) } - fn processor(&self, stream_info: &StreamInfo) -> Box { - Box::new(self.constructor.processor(&self.configuration, stream_info)) + fn construct_processor(&self, cx: ConstructProcessorContext) -> Box { + Box::new( + self.constructor + .construct_processor(&self.configuration, cx), + ) } fn update(&mut self, cx: UpdateContext) { diff --git a/crates/firewheel-core/src/node/dummy.rs b/crates/firewheel-core/src/node/dummy.rs index 2c190535..29361858 100644 --- a/crates/firewheel-core/src/node/dummy.rs +++ b/crates/firewheel-core/src/node/dummy.rs @@ -1,6 +1,9 @@ -use crate::{channel_config::ChannelConfig, event::NodeEventList, StreamInfo}; +use crate::{channel_config::ChannelConfig, event::NodeEventList}; -use super::{AudioNode, AudioNodeInfo, AudioNodeProcessor, ProcBuffers, ProcInfo, ProcessStatus}; +use super::{ + AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, ProcBuffers, ProcInfo, + ProcessStatus, +}; /// A "dummy" [`AudioNode`], a node which does nothing. #[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] @@ -22,10 +25,10 @@ impl AudioNode for DummyNode { .uses_events(false) } - fn processor( + fn construct_processor( &self, _config: &Self::Configuration, - _stream_info: &StreamInfo, + _cx: ConstructProcessorContext, ) -> impl AudioNodeProcessor { DummyProcessor } diff --git a/crates/firewheel-graph/src/context.rs b/crates/firewheel-graph/src/context.rs index 589edfb8..99b0f5bb 100644 --- a/crates/firewheel-graph/src/context.rs +++ b/crates/firewheel-graph/src/context.rs @@ -508,6 +508,16 @@ impl FirewheelCtx { self.graph.node_info(id) } + /// Get an immutable reference to the custom state of a node. + pub fn node_state(&self, id: NodeID) -> Option<&T> { + self.graph.node_state(id) + } + + /// Get a mutable reference to the custom state of a node. + pub fn node_state_mut(&mut self, id: NodeID) -> Option<&mut T> { + self.graph.node_state_mut(id) + } + /// Get a list of all the existing nodes in the graph. pub fn nodes<'a>(&'a self) -> impl Iterator { self.graph.nodes() diff --git a/crates/firewheel-graph/src/graph.rs b/crates/firewheel-graph/src/graph.rs index 5ad505d1..27f92a3a 100644 --- a/crates/firewheel-graph/src/graph.rs +++ b/crates/firewheel-graph/src/graph.rs @@ -6,7 +6,7 @@ use std::hash::Hash; use ahash::AHashMap; use firewheel_core::channel_config::{ChannelConfig, ChannelCount}; use firewheel_core::event::NodeEvent; -use firewheel_core::node::UpdateContext; +use firewheel_core::node::{ConstructProcessorContext, UpdateContext}; use firewheel_core::StreamInfo; use smallvec::SmallVec; use thunderdome::Arena; @@ -121,6 +121,7 @@ impl AudioGraph { ) -> NodeID { let constructor = Constructor::new(node, config); let info: AudioNodeInfoInner = constructor.info().into(); + let call_update_method = info.call_update_method; let new_id = NodeID( self.nodes @@ -128,7 +129,7 @@ impl AudioGraph { ); self.nodes[new_id.0].id = new_id; - if info.call_update_method { + if call_update_method { self.nodes_to_call_update_method.push(new_id); } @@ -140,11 +141,12 @@ impl AudioGraph { /// Add a node to the audio graph which implements the type-erased [`DynAudioNode`] trait. pub fn add_dyn_node(&mut self, node: T) -> NodeID { let info: AudioNodeInfoInner = node.info().into(); + let call_update_method = info.call_update_method; let new_id = NodeID(self.nodes.insert(NodeEntry::new(info, Box::new(node)))); self.nodes[new_id.0].id = new_id; - if info.call_update_method { + if call_update_method { self.nodes_to_call_update_method.push(new_id); } @@ -193,6 +195,28 @@ impl AudioGraph { self.nodes.get(id.0) } + /// Get an immutable reference to the custom state of a node. + pub fn node_state(&self, id: NodeID) -> Option<&T> { + self.nodes.get(id.0).and_then(|node_entry| { + node_entry + .info + .custom_state + .as_ref() + .and_then(|s| s.downcast_ref::()) + }) + } + + /// Get a mutable reference to the custom state of a node. + pub fn node_state_mut(&mut self, id: NodeID) -> Option<&mut T> { + self.nodes.get_mut(id.0).and_then(|node_entry| { + node_entry + .info + .custom_state + .as_mut() + .and_then(|s| s.downcast_mut::()) + }) + } + /// Get a list of all the existing nodes in the graph. pub fn nodes<'a>(&'a self) -> impl Iterator { self.nodes.iter().map(|(_, n)| n) @@ -529,9 +553,15 @@ impl AudioGraph { Vec::new() }; + let cx = ConstructProcessorContext::new( + entry.id, + stream_info, + &mut entry.info.custom_state, + ); + new_node_processors.push(NodeHeapData { id: entry.id, - processor: entry.dyn_node.processor(&stream_info), + processor: entry.dyn_node.construct_processor(cx), event_buffer_indices, }); } @@ -582,7 +612,7 @@ impl AudioGraph { node_entry.dyn_node.update(UpdateContext::new( *node_id, stream_info, - &mut node_entry.custom_data, + &mut node_entry.info.custom_state, event_queue, )); } else { diff --git a/crates/firewheel-graph/src/graph/compiler.rs b/crates/firewheel-graph/src/graph/compiler.rs index a65ba400..5b947297 100644 --- a/crates/firewheel-graph/src/graph/compiler.rs +++ b/crates/firewheel-graph/src/graph/compiler.rs @@ -1,6 +1,6 @@ use firewheel_core::node::{AudioNodeInfoInner, DynAudioNode, NodeID}; use smallvec::SmallVec; -use std::{any::Any, collections::VecDeque, rc::Rc}; +use std::{collections::VecDeque, rc::Rc}; use thunderdome::Arena; use crate::error::CompileGraphError; @@ -15,7 +15,6 @@ pub struct NodeEntry { pub info: AudioNodeInfoInner, pub dyn_node: Box, pub activated: bool, - pub custom_data: Option>, /// The edges connected to this node's input ports. incoming: SmallVec<[Edge; 4]>, /// The edges connected to this node's output ports. @@ -29,7 +28,6 @@ impl NodeEntry { info, dyn_node, activated: false, - custom_data: None, incoming: SmallVec::new(), outgoing: SmallVec::new(), } diff --git a/crates/firewheel-nodes/src/beep_test.rs b/crates/firewheel-nodes/src/beep_test.rs index b84069cf..4c8ecf82 100644 --- a/crates/firewheel-nodes/src/beep_test.rs +++ b/crates/firewheel-nodes/src/beep_test.rs @@ -4,8 +4,8 @@ use firewheel_core::{ dsp::volume::{Volume, DEFAULT_AMP_EPSILON}, event::NodeEventList, node::{ - AudioNode, AudioNodeInfo, AudioNodeProcessor, EmptyConfig, ProcBuffers, ProcInfo, - ProcessStatus, + AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, EmptyConfig, + ProcBuffers, ProcInfo, ProcessStatus, }, }; @@ -54,16 +54,17 @@ impl AudioNode for BeepTestNode { .uses_events(true) } - fn processor( + fn construct_processor( &self, _config: &Self::Configuration, - stream_info: &firewheel_core::StreamInfo, + cx: ConstructProcessorContext, ) -> impl AudioNodeProcessor { Processor { phasor: 0.0, - phasor_inc: self.freq_hz.clamp(20.0, 20_000.0) * stream_info.sample_rate_recip as f32, + phasor_inc: self.freq_hz.clamp(20.0, 20_000.0) + * cx.stream_info.sample_rate_recip as f32, gain: self.volume.amp_clamped(DEFAULT_AMP_EPSILON), - sample_rate_recip: (stream_info.sample_rate.get() as f32).recip(), + sample_rate_recip: (cx.stream_info.sample_rate.get() as f32).recip(), params: *self, } } diff --git a/crates/firewheel-nodes/src/peak_meter.rs b/crates/firewheel-nodes/src/peak_meter.rs index 7ff51778..ce02d47e 100644 --- a/crates/firewheel-nodes/src/peak_meter.rs +++ b/crates/firewheel-nodes/src/peak_meter.rs @@ -1,17 +1,16 @@ -use std::sync::atomic::{AtomicBool, Ordering}; - use atomic_float::AtomicF32; use firewheel_core::{ channel_config::{ChannelConfig, ChannelCount}, collector::ArcGc, + diff::{Diff, Patch}, dsp::volume::{amp_to_db, DbMeterNormalizer}, event::NodeEventList, node::{ - AudioNode, AudioNodeInfo, AudioNodeProcessor, EmptyConfig, ProcBuffers, ProcInfo, - ProcessStatus, + AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, EmptyConfig, + ProcBuffers, ProcInfo, ProcessStatus, }, - StreamInfo, }; +use std::sync::atomic::Ordering; #[derive(Debug, Clone, Copy, PartialEq)] pub struct PeakMeterSmootherConfig { @@ -152,25 +151,24 @@ impl PeakMeterSmoother { } } -#[derive(Clone)] +#[derive(Diff, Patch, Debug, Clone, Copy)] pub struct PeakMeterNode { + pub enabled: bool, +} + +#[derive(Clone)] +pub struct PeakMeterState { shared_state: ArcGc>, } -impl PeakMeterNode { - /// Create a new [`PeakMeterNode`]. - /// - /// # Panics - /// - /// Panics if `NUM_CHANNELS == 0` or `NUM_CHANNELS > 64`. - pub fn new(enabled: bool) -> Self { +impl PeakMeterState { + fn new() -> Self { assert_ne!(NUM_CHANNELS, 0); assert!(NUM_CHANNELS <= 64); Self { shared_state: ArcGc::new(SharedState { peak_gains: std::array::from_fn(|_| AtomicF32::new(0.0)), - enabled: AtomicBool::new(enabled), }), } } @@ -192,19 +190,6 @@ impl PeakMeterNode { } }) } - - /// Whether or not the node is currently enabled. - pub fn enabled(&self) -> bool { - self.shared_state.enabled.load(Ordering::Relaxed) - } - - /// Enable/disable this node. - /// - /// It is a good idea to disable this node when not in use to save - /// on CPU. - pub fn set_enabled(&mut self, enabled: bool) { - self.shared_state.enabled.store(enabled, Ordering::Relaxed); - } } impl AudioNode for PeakMeterNode { @@ -217,29 +202,33 @@ impl AudioNode for PeakMeterNode { num_inputs: ChannelCount::new(NUM_CHANNELS as u32).unwrap(), num_outputs: ChannelCount::new(NUM_CHANNELS as u32).unwrap(), }) - .uses_events(false) + .uses_events(true) + .custom_state(PeakMeterState::::new()) } - fn processor( + fn construct_processor( &self, _config: &Self::Configuration, - _stream_info: &StreamInfo, + cx: ConstructProcessorContext, ) -> impl AudioNodeProcessor { Processor { - shared_state: ArcGc::clone(&self.shared_state), - enabled: self.shared_state.enabled.load(Ordering::Relaxed), + params: self.clone(), + shared_state: ArcGc::clone( + &cx.custom_state::>() + .unwrap() + .shared_state, + ), } } } struct SharedState { peak_gains: [AtomicF32; NUM_CHANNELS], - enabled: AtomicBool, } struct Processor { + params: PeakMeterNode, shared_state: ArcGc>, - enabled: bool, } impl AudioNodeProcessor for Processor { @@ -247,18 +236,19 @@ impl AudioNodeProcessor for Processor { &mut self, buffers: ProcBuffers, proc_info: &ProcInfo, - _events: NodeEventList, + events: NodeEventList, ) -> ProcessStatus { - let enabled = self.shared_state.enabled.load(Ordering::Relaxed); + let was_enabled = self.params.enabled; + + self.params.patch_list(events); - if self.enabled && !enabled { + if was_enabled && !self.params.enabled { for ch in self.shared_state.peak_gains.iter() { ch.store(0.0, Ordering::Relaxed); } } - self.enabled = enabled; - if !self.enabled { + if !self.params.enabled { return ProcessStatus::Bypass; } diff --git a/crates/firewheel-nodes/src/sampler.rs b/crates/firewheel-nodes/src/sampler.rs index 53880fc9..7d409e8b 100644 --- a/crates/firewheel-nodes/src/sampler.rs +++ b/crates/firewheel-nodes/src/sampler.rs @@ -19,7 +19,10 @@ use firewheel_core::{ volume::{Volume, DEFAULT_AMP_EPSILON}, }, event::{NodeEventList, NodeEventType, SequenceCommand}, - node::{AudioNode, AudioNodeInfo, AudioNodeProcessor, ProcBuffers, ProcInfo, ProcessStatus}, + node::{ + AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, ProcBuffers, + ProcInfo, ProcessStatus, + }, sample_resource::SampleResource, SilenceMask, StreamInfo, }; @@ -100,7 +103,6 @@ impl Default for SharedState { pub struct SamplerNode { /// The current sequence loaded into the sampler. pub sequence: Option, - shared_state: ArcGc, } impl SamplerNode { @@ -128,14 +130,18 @@ impl SamplerNode { impl Default for SamplerNode { fn default() -> Self { - Self::new() + Self { sequence: None } } } -impl SamplerNode { - pub fn new() -> Self { +#[derive(Clone)] +pub struct SamplerState { + shared_state: ArcGc, +} + +impl SamplerState { + fn new() -> Self { Self { - sequence: None, shared_state: ArcGc::new(SharedState::default()), } } @@ -145,8 +151,8 @@ impl SamplerNode { /// Only returns `Some` when the sequence is [`SequenceType::SingleSample`]. /// /// * `sample_rate` - The sample rate of the current audio stream. - pub fn playhead_seconds(&self, sample_rate: NonZeroU32) -> Option { - if let Some(SequenceType::SingleSample { .. }) = &self.sequence { + pub fn playhead_seconds(&self, params: &SamplerNode, sample_rate: NonZeroU32) -> Option { + if let Some(SequenceType::SingleSample { .. }) = ¶ms.sequence { let frames = self.shared_state.playhead_frames.load(Ordering::Relaxed); Some(frames as f64 / sample_rate.get() as f64) @@ -159,8 +165,8 @@ impl SamplerNode { /// a single channel of audio). /// /// Only returns `Some` when the sequence is [`SequenceType::SingleSample`]. - pub fn playhead_samples(&self) -> Option { - if let Some(SequenceType::SingleSample { .. }) = &self.sequence { + pub fn playhead_samples(&self, params: &SamplerNode) -> Option { + if let Some(SequenceType::SingleSample { .. }) = ¶ms.sequence { Some(self.shared_state.playhead_frames.load(Ordering::Relaxed)) } else { None @@ -174,15 +180,15 @@ impl SamplerNode { /// A score of how suitible this node is to start new work (Play a new sample). The /// higher the score, the better the candidate. - pub fn worker_score(&self) -> u64 { - if self.sequence.is_some() { + pub fn worker_score(&self, params: &SamplerNode) -> u64 { + if params.sequence.is_some() { match self.playback_state() { PlaybackState::Stopped => u64::MAX - 1, PlaybackState::Paused => u64::MAX - 2, PlaybackState::Playing => { // The older the sample is, the better it is as a candidate to steal // work from. - self.playhead_samples().unwrap_or(0) + self.playhead_samples(params).unwrap_or(0) } } } else { @@ -194,9 +200,13 @@ impl SamplerNode { /// /// * `start_immediately` - If `true`, then the new sequence will be started /// immediately when the processor receives the event. - pub fn sync_params_event(&self, start_immediately: bool) -> NodeEventType { + pub fn sync_params_event( + &self, + params: &SamplerNode, + start_immediately: bool, + ) -> NodeEventType { if start_immediately { - self._flag_playback_state(if self.sequence.is_some() { + self._flag_playback_state(if params.sequence.is_some() { PlaybackState::Playing } else { PlaybackState::Stopped @@ -204,7 +214,7 @@ impl SamplerNode { } SamplerEvent::SetParams { - params: self.clone(), + params: params.clone(), start_immediately, } .into() @@ -214,8 +224,12 @@ impl SamplerNode { /// /// * `delay` - The exact moment when the sequence should start. Set to /// `None` to have the sequence start as soon as the event is recieved. - pub fn start_or_restart_event(&self, delay: Option) -> NodeEventType { - self._flag_playback_state(if self.sequence.is_some() { + pub fn start_or_restart_event( + &self, + params: &SamplerNode, + delay: Option, + ) -> NodeEventType { + self._flag_playback_state(if params.sequence.is_some() { PlaybackState::Playing } else { PlaybackState::Stopped @@ -232,8 +246,8 @@ impl SamplerNode { } /// Return an event type to resume the current sequence. - pub fn resume_event(&self) -> NodeEventType { - self._flag_playback_state(if self.sequence.is_some() { + pub fn resume_event(&self, params: &SamplerNode) -> NodeEventType { + self._flag_playback_state(if params.sequence.is_some() { PlaybackState::Playing } else { PlaybackState::Stopped @@ -266,7 +280,7 @@ impl SamplerNode { /// Manually mark the playback state of this node. This can be used to account /// for the delay between when creating a [`SamplerEvent`] and when the processor - /// receives the event when using [`SamplerNode::worker_score`]. + /// receives the event when using [`SamplerState::worker_score`]. /// /// Note, if you use the methods on this struct to construct the events, then /// this is automatically done for you. @@ -371,12 +385,13 @@ impl AudioNode for SamplerNode { num_outputs: config.channels.get(), }) .uses_events(true) + .custom_state(SamplerState::new()) } - fn processor( + fn construct_processor( &self, config: &Self::Configuration, - stream_info: &StreamInfo, + cx: ConstructProcessorContext, ) -> impl AudioNodeProcessor { let stop_declicker_buffers = if config.num_declickers == 0 { None @@ -384,13 +399,14 @@ impl AudioNode for SamplerNode { Some(InstanceBuffer::::new( config.num_declickers as usize, NonZeroUsize::new(config.channels.get().get() as usize).unwrap(), - stream_info.declick_frames.get() as usize, + cx.stream_info.declick_frames.get() as usize, )) }; let mut sampler = SamplerProcessor { config: config.clone(), params: self.clone(), + shared_state: ArcGc::clone(&cx.custom_state::().unwrap().shared_state), loaded_sample_state: None, declicker: Declicker::SettledAt1, playback_state: PlaybackState::Stopped, @@ -402,7 +418,7 @@ impl AudioNode for SamplerNode { playback_start_time_frames: ClockSamples::default(), playback_pause_time_frames: ClockSamples::default(), start_delay: None, - sample_rate: stream_info.sample_rate.get() as f64, + sample_rate: cx.stream_info.sample_rate.get() as f64, amp_epsilon: config.amp_epsilon, }; @@ -418,6 +434,7 @@ impl AudioNode for SamplerNode { pub struct SamplerProcessor { config: SamplerConfig, params: SamplerNode, + shared_state: ArcGc, loaded_sample_state: Option, @@ -657,8 +674,7 @@ impl SamplerProcessor { state.playhead = playhead_frames; self.playback_state = playback_state; - self.params - .shared_state + self.shared_state .playhead_frames .store(playhead_frames, Ordering::Relaxed); @@ -682,8 +698,7 @@ impl SamplerProcessor { match &self.params.sequence { None => { self.playback_state = PlaybackState::Stopped; - self.params - .shared_state + self.shared_state .playback_state .store(PlaybackState::Stopped); } @@ -871,10 +886,7 @@ impl AudioNodeProcessor for SamplerProcessor { _ => {} }); - self.params - .shared_state - .playback_state - .store(self.playback_state); + self.shared_state.playback_state.store(self.playback_state); let start_on_frame = if let Some(delay) = self.start_delay { if let Some(frame) = delay.elapsed_on_frame(&proc_info, self.sample_rate as u32) { @@ -918,15 +930,14 @@ impl AudioNodeProcessor for SamplerProcessor { num_filled_channels = n_channels; - self.params.shared_state.playhead_frames.store( + self.shared_state.playhead_frames.store( self.loaded_sample_state.as_ref().unwrap().playhead, Ordering::Relaxed, ); if finished { self.playback_state = PlaybackState::Stopped; - self.params - .shared_state + self.shared_state .playback_state .store(PlaybackState::Stopped); } @@ -1016,8 +1027,7 @@ impl AudioNodeProcessor for SamplerProcessor { // the incorrect sample rate and the user must reload them. self.params.sequence = None; self.loaded_sample_state = None; - self.params - .shared_state + self.shared_state .playback_state .store(PlaybackState::Stopped); } diff --git a/crates/firewheel-nodes/src/spatial_basic.rs b/crates/firewheel-nodes/src/spatial_basic.rs index 3d4140aa..48a3913f 100644 --- a/crates/firewheel-nodes/src/spatial_basic.rs +++ b/crates/firewheel-nodes/src/spatial_basic.rs @@ -11,7 +11,10 @@ use firewheel_core::{ volume::{Volume, DEFAULT_AMP_EPSILON}, }, event::{NodeEventList, Vec3}, - node::{AudioNode, AudioNodeInfo, AudioNodeProcessor, ProcBuffers, ProcInfo, ProcessStatus}, + node::{ + AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, ProcBuffers, + ProcInfo, ProcessStatus, + }, param::smoother::{SmoothedParam, SmootherConfig}, SilenceMask, }; @@ -236,15 +239,13 @@ impl AudioNode for SpatialBasicNode { .uses_events(true) } - fn processor( + fn construct_processor( &self, config: &Self::Configuration, - stream_info: &firewheel_core::StreamInfo, + cx: ConstructProcessorContext, ) -> impl AudioNodeProcessor { let computed_values = self.compute_values(config.amp_epsilon); - dbg!(stream_info.sample_rate); - Processor { gain_l: SmoothedParam::new( computed_values.gain_l, @@ -252,7 +253,7 @@ impl AudioNode for SpatialBasicNode { smooth_secs: config.smooth_secs, ..Default::default() }, - stream_info.sample_rate, + cx.stream_info.sample_rate, ), gain_r: SmoothedParam::new( computed_values.gain_r, @@ -260,7 +261,7 @@ impl AudioNode for SpatialBasicNode { smooth_secs: config.smooth_secs, ..Default::default() }, - stream_info.sample_rate, + cx.stream_info.sample_rate, ), damping_cutoff_hz: SmoothedParam::new( computed_values @@ -270,14 +271,14 @@ impl AudioNode for SpatialBasicNode { smooth_secs: config.smooth_secs, ..Default::default() }, - stream_info.sample_rate, + cx.stream_info.sample_rate, ), damping_disabled: computed_values.damping_cutoff_hz.is_none(), filter_l: SinglePoleIirLPF::default(), filter_r: SinglePoleIirLPF::default(), params: *self, prev_block_was_silent: true, - sample_rate_recip: stream_info.sample_rate_recip as f32, + sample_rate_recip: cx.stream_info.sample_rate_recip as f32, amp_epsilon: config.amp_epsilon, } } diff --git a/crates/firewheel-nodes/src/stereo_to_mono.rs b/crates/firewheel-nodes/src/stereo_to_mono.rs index 30232ddf..f1f14c21 100644 --- a/crates/firewheel-nodes/src/stereo_to_mono.rs +++ b/crates/firewheel-nodes/src/stereo_to_mono.rs @@ -2,8 +2,8 @@ use firewheel_core::{ channel_config::{ChannelConfig, ChannelCount}, event::NodeEventList, node::{ - AudioNode, AudioNodeInfo, AudioNodeProcessor, EmptyConfig, ProcBuffers, ProcInfo, - ProcessStatus, + AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, EmptyConfig, + ProcBuffers, ProcInfo, ProcessStatus, }, }; @@ -24,10 +24,10 @@ impl AudioNode for StereoToMonoNode { .uses_events(false) } - fn processor( + fn construct_processor( &self, _config: &Self::Configuration, - _stream_info: &firewheel_core::StreamInfo, + _cx: ConstructProcessorContext, ) -> impl AudioNodeProcessor { StereoToMonoProcessor } diff --git a/crates/firewheel-nodes/src/stream/reader.rs b/crates/firewheel-nodes/src/stream/reader.rs index e4639f67..e16105e7 100644 --- a/crates/firewheel-nodes/src/stream/reader.rs +++ b/crates/firewheel-nodes/src/stream/reader.rs @@ -9,23 +9,21 @@ use std::{ use firewheel_core::{ channel_config::{ChannelConfig, ChannelCount, NonZeroChannelCount}, + collector::ArcGc, event::{NodeEventList, NodeEventType}, node::{ - AudioNode, AudioNodeInfo, AudioNodeProcessor, EmptyConfig, ProcBuffers, ProcInfo, - ProcessStatus, + AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, ProcBuffers, + ProcInfo, ProcessStatus, }, sync_wrapper::SyncWrapper, - StreamInfo, }; use fixed_resample::{ReadStatus, ResamplingChannelConfig}; pub const MAX_CHANNELS: usize = 16; #[derive(Debug, Clone, Copy, PartialEq)] +#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))] pub struct StreamReaderConfig { - /// The configuration of the input to output channel. - pub channel_config: ResamplingChannelConfig, - /// The number of channels. pub channels: NonZeroChannelCount, } @@ -33,34 +31,30 @@ pub struct StreamReaderConfig { impl Default for StreamReaderConfig { fn default() -> Self { Self { - channel_config: ResamplingChannelConfig::default(), channels: NonZeroChannelCount::STEREO, } } } -#[derive(Clone)] +#[derive(Default, Debug, Clone, Copy)] #[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))] -pub struct StreamReaderNode { - /// The configuration of the stream. - /// - /// Changing this will have no effect until a new stream is started. - pub config: StreamReaderConfig, +pub struct StreamReaderNode; +#[derive(Clone)] +pub struct StreamReaderState { channels: NonZeroChannelCount, active_state: Option, - shared_state: Arc, + shared_state: ArcGc, } -impl StreamReaderNode { - pub fn new(config: StreamReaderConfig, channels: NonZeroChannelCount) -> Self { +impl StreamReaderState { + pub fn new(channels: NonZeroChannelCount) -> Self { assert!((channels.get().get() as usize) < MAX_CHANNELS); Self { - config, channels, active_state: None, - shared_state: Arc::new(SharedState::new()), + shared_state: ArcGc::new(SharedState::new()), } } @@ -73,7 +67,7 @@ impl StreamReaderNode { /// running faster than the input stream). /// /// If this happens excessively in Release mode, you may want to consider - /// increasing [`StreamReaderConfig::channel_config.latency_seconds`]. + /// increasing [`ResamplingChannelConfig::latency_seconds`]. /// /// (Calling this will also reset the flag indicating whether an /// underflow occurred.)out @@ -87,7 +81,7 @@ impl StreamReaderNode { /// running faster than the output stream). /// /// If this happens excessively in Release mode, you may want to consider - /// increasing [`StreamReaderConfig::channel_config.capacity_seconds`]. For + /// increasing [`ResamplingChannelConfig::capacity_seconds`]. For /// example, if you are streaming data from a network, you may want to /// increase the capacity to several seconds. /// @@ -105,6 +99,7 @@ impl StreamReaderNode { /// /// * `sample_rate` - The sample rate of this node. /// * `output_stream_sample_rate` - The sample rate of the active output audio stream. + /// * `channel_config` - The configuration of the input to output channel. /// /// If there is already an active stream running on this node, then this will return /// an error. @@ -112,6 +107,7 @@ impl StreamReaderNode { &mut self, sample_rate: NonZeroU32, output_stream_sample_rate: NonZeroU32, + channel_config: ResamplingChannelConfig, ) -> Result { if self.is_active() { return Err(()); @@ -123,7 +119,7 @@ impl StreamReaderNode { NonZeroUsize::new(self.channels.get().get() as usize).unwrap(), output_stream_sample_rate.get(), sample_rate.get(), - self.config.channel_config, + channel_config, ); self.active_state = Some(ActiveState { @@ -155,7 +151,7 @@ impl StreamReaderNode { /// The amount of data in seconds that is currently occupied in the channel. /// - /// This value will be in the range `[0.0, ResamplingCons::capacity_seconds()]`. + /// This value will be in the range `[0.0, ResamplingChannelConfig::capacity_seconds]`. /// /// This can also be used to detect when an extra packet of data should be read or /// discarded to correct for jitter. @@ -167,12 +163,6 @@ impl StreamReaderNode { .map(|s| s.cons.lock().unwrap().occupied_seconds()) } - /// The value of [`ResamplingChannelConfig::latency_seconds`] that was passed when - /// this channel was created. - pub fn latency_seconds(&self) -> f64 { - self.config.channel_config.latency_seconds - } - /// The number of channels in this node. pub fn num_channels(&self) -> NonZeroChannelCount { self.channels @@ -255,15 +245,15 @@ impl StreamReaderNode { } } - /// If the value of [`StreamReaderNode::occupied_seconds()`] is greater than the + /// If the value of [`StreamReaderState::occupied_seconds()`] is greater than the /// given threshold in seconds, then discard the number of input frames needed to - /// bring the value back down to [`StreamReaderNode::latency_seconds()`] to avoid + /// bring the value back down to [`ResamplingChannelConfig::latency_seconds`] to avoid /// excessive overflows and reduce perceived audible glitchiness. /// /// Returns the number of input frames from the producer (not output frames from /// this consumer) that were discarded. /// - /// If `threshold_seconds` is less than [`StreamReaderNode::latency_seconds()`], + /// If `threshold_seconds` is less than [`ResamplingChannelConfig::latency_seconds`], /// then this will do nothing. pub fn discard_jitter(&mut self, threshold_seconds: f64) -> usize { if let Some(state) = &mut self.active_state { @@ -298,35 +288,42 @@ impl StreamReaderNode { self.active_state = None; self.shared_state.reset(); } + + pub fn handle(&self) -> Mutex { + Mutex::new((*self).clone()) + } } -impl Drop for StreamReaderNode { +impl Drop for StreamReaderState { fn drop(&mut self) { self.stop_stream(); } } impl AudioNode for StreamReaderNode { - type Configuration = EmptyConfig; + type Configuration = StreamReaderConfig; - fn info(&self, _config: &Self::Configuration) -> AudioNodeInfo { + fn info(&self, config: &Self::Configuration) -> AudioNodeInfo { AudioNodeInfo::new() .debug_name("stream_reader") .channel_config(ChannelConfig { - num_inputs: self.channels.get(), + num_inputs: config.channels.get(), num_outputs: ChannelCount::ZERO, }) .uses_events(true) + .custom_state(StreamReaderState::new(config.channels)) } - fn processor( + fn construct_processor( &self, _config: &Self::Configuration, - _stream_info: &StreamInfo, + cx: ConstructProcessorContext, ) -> impl AudioNodeProcessor { Processor { prod: None, - shared_state: Arc::clone(&self.shared_state), + shared_state: ArcGc::clone( + &cx.custom_state::().unwrap().shared_state, + ), } } } @@ -367,7 +364,7 @@ impl SharedState { struct Processor { prod: Option>, - shared_state: Arc, + shared_state: ArcGc, } impl AudioNodeProcessor for Processor { diff --git a/crates/firewheel-nodes/src/stream/writer.rs b/crates/firewheel-nodes/src/stream/writer.rs index 74740a59..acda293a 100644 --- a/crates/firewheel-nodes/src/stream/writer.rs +++ b/crates/firewheel-nodes/src/stream/writer.rs @@ -9,23 +9,25 @@ use std::{ use firewheel_core::{ channel_config::{ChannelConfig, ChannelCount, NonZeroChannelCount}, + collector::ArcGc, dsp::declick::{Declicker, FadeType}, event::{NodeEventList, NodeEventType}, node::{ - AudioNode, AudioNodeInfo, AudioNodeProcessor, EmptyConfig, ProcBuffers, ProcInfo, - ProcessStatus, + AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, ProcBuffers, + ProcInfo, ProcessStatus, }, sync_wrapper::SyncWrapper, - SilenceMask, StreamInfo, + SilenceMask, }; use fixed_resample::{ReadStatus, ResamplingChannelConfig}; pub const MAX_CHANNELS: usize = 16; #[derive(Debug, Clone, Copy, PartialEq)] +#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))] pub struct StreamWriterConfig { - /// The configuration of the input to output channel. - pub channel_config: ResamplingChannelConfig, + /// The number of channels. + pub channels: NonZeroChannelCount, /// If the value of ResamplingCons::occupied_seconds() is greater than the /// given threshold in seconds, then discard the number of input frames @@ -52,33 +54,30 @@ pub struct StreamWriterConfig { impl Default for StreamWriterConfig { fn default() -> Self { Self { - channel_config: ResamplingChannelConfig::default(), + channels: NonZeroChannelCount::STEREO, discard_jitter_threshold_seconds: None, check_for_silence: true, } } } -#[derive(Clone)] +#[derive(Default, Debug, Clone, Copy)] #[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))] -pub struct StreamWriterNode { - /// The configuration of the stream. - /// - /// Changing this will have no effect until a new stream is started. - pub config: StreamWriterConfig, +pub struct StreamWriterNode; +#[derive(Clone)] +pub struct StreamWriterState { channels: NonZeroChannelCount, active_state: Option, - shared_state: Arc, + shared_state: ArcGc, } -impl StreamWriterNode { - pub fn new(config: StreamWriterConfig, channels: NonZeroChannelCount) -> Self { +impl StreamWriterState { + fn new(channels: NonZeroChannelCount) -> Self { Self { - config, channels, active_state: None, - shared_state: Arc::new(SharedState::new()), + shared_state: ArcGc::new(SharedState::new()), } } @@ -91,7 +90,7 @@ impl StreamWriterNode { /// running faster than the input stream). /// /// If this happens excessively in Release mode, you may want to consider - /// increasing [`StreamWriterConfig::channel_config.latency_seconds`]. + /// increasing [`ResamplingChannelConfig::latency_seconds`]. /// /// (Calling this will also reset the flag indicating whether an /// underflow occurred.) @@ -105,7 +104,7 @@ impl StreamWriterNode { /// running faster than the output stream). /// /// If this happens excessively in Release mode, you may want to consider - /// increasing [`StreamWriterConfig::channel_config.capacity_seconds`]. For + /// increasing [`ResamplingChannelConfig::capacity_seconds`]. For /// example, if you are streaming data from a network, you may want to /// increase the capacity to several seconds. /// @@ -134,7 +133,7 @@ impl StreamWriterNode { /// The amount of data in seconds that is currently occupied in the channel. /// - /// This value will be in the range `[0.0, ResamplingCons::capacity_seconds()]`. + /// This value will be in the range `[0.0, ResamplingChannelConfig::capacity_seconds]`. /// /// If there is no active stream, then this will return `None`. pub fn occupied_seconds(&self) -> Option { @@ -143,12 +142,6 @@ impl StreamWriterNode { .map(|s| s.prod.lock().unwrap().occupied_seconds()) } - /// The value of [`ResamplingChannelConfig::latency_seconds`] that was passed when - /// this channel was created. - pub fn latency_seconds(&self) -> f64 { - self.config.channel_config.latency_seconds - } - /// The number of channels in this node. pub fn num_channels(&self) -> NonZeroChannelCount { self.channels @@ -167,6 +160,7 @@ impl StreamWriterNode { /// /// * `sample_rate` - The sample rate of this node. /// * `output_stream_sample_rate` - The sample rate of the active output audio stream. + /// * `channel_config` - The configuration of the input to output channel. /// /// If there is already an active stream running on this node, then this will return /// an error. @@ -174,6 +168,7 @@ impl StreamWriterNode { &mut self, sample_rate: NonZeroU32, output_stream_sample_rate: NonZeroU32, + channel_config: ResamplingChannelConfig, ) -> Result { if self.is_active() { return Err(()); @@ -185,7 +180,7 @@ impl StreamWriterNode { NonZeroUsize::new(self.channels.get().get() as usize).unwrap(), sample_rate.get(), output_stream_sample_rate.get(), - self.config.channel_config, + channel_config, ); self.active_state = Some(ActiveState { @@ -273,37 +268,44 @@ impl StreamWriterNode { self.active_state = None; self.shared_state.reset(); } + + pub fn handle(&self) -> Mutex { + Mutex::new((*self).clone()) + } } -impl Drop for StreamWriterNode { +impl Drop for StreamWriterState { fn drop(&mut self) { self.stop_stream(); } } impl AudioNode for StreamWriterNode { - type Configuration = EmptyConfig; + type Configuration = StreamWriterConfig; - fn info(&self, _config: &Self::Configuration) -> AudioNodeInfo { + fn info(&self, config: &Self::Configuration) -> AudioNodeInfo { AudioNodeInfo::new() .debug_name("stream_writer") .channel_config(ChannelConfig { num_inputs: ChannelCount::ZERO, - num_outputs: self.channels.get(), + num_outputs: config.channels.get(), }) .uses_events(true) + .custom_state(StreamWriterState::new(config.channels)) } - fn processor( + fn construct_processor( &self, - _config: &Self::Configuration, - _stream_info: &StreamInfo, + config: &Self::Configuration, + cx: ConstructProcessorContext, ) -> impl AudioNodeProcessor { Processor { cons: None, - shared_state: Arc::clone(&self.shared_state), - discard_jitter_threshold_seconds: self.config.discard_jitter_threshold_seconds, - check_for_silence: self.config.check_for_silence, + shared_state: ArcGc::clone( + &cx.custom_state::().unwrap().shared_state, + ), + discard_jitter_threshold_seconds: config.discard_jitter_threshold_seconds, + check_for_silence: config.check_for_silence, pause_declicker: Declicker::SettledAt0, } } @@ -345,7 +347,7 @@ impl SharedState { struct Processor { cons: Option>, - shared_state: Arc, + shared_state: ArcGc, discard_jitter_threshold_seconds: Option, check_for_silence: bool, pause_declicker: Declicker, diff --git a/crates/firewheel-nodes/src/volume.rs b/crates/firewheel-nodes/src/volume.rs index 26b54740..fa691f89 100644 --- a/crates/firewheel-nodes/src/volume.rs +++ b/crates/firewheel-nodes/src/volume.rs @@ -3,7 +3,10 @@ use firewheel_core::{ diff::{Diff, Patch}, dsp::volume::{Volume, DEFAULT_AMP_EPSILON}, event::NodeEventList, - node::{AudioNode, AudioNodeInfo, AudioNodeProcessor, ProcBuffers, ProcInfo, ProcessStatus}, + node::{ + AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, ProcBuffers, + ProcInfo, ProcessStatus, + }, param::smoother::{SmoothedParam, SmootherConfig}, SilenceMask, }; @@ -65,10 +68,10 @@ impl AudioNode for VolumeNode { .uses_events(true) } - fn processor( + fn construct_processor( &self, config: &Self::Configuration, - stream_info: &firewheel_core::StreamInfo, + cx: ConstructProcessorContext, ) -> impl AudioNodeProcessor { let gain = self.volume.amp_clamped(config.amp_epsilon); @@ -80,7 +83,7 @@ impl AudioNode for VolumeNode { smooth_secs: config.smooth_secs, ..Default::default() }, - stream_info.sample_rate, + cx.stream_info.sample_rate, ), prev_block_was_silent: true, amp_epsilon: config.amp_epsilon, diff --git a/crates/firewheel-nodes/src/volume_pan.rs b/crates/firewheel-nodes/src/volume_pan.rs index f6022c9f..1335fefe 100644 --- a/crates/firewheel-nodes/src/volume_pan.rs +++ b/crates/firewheel-nodes/src/volume_pan.rs @@ -3,7 +3,10 @@ use firewheel_core::{ diff::{Diff, Patch}, dsp::{pan_law::PanLaw, volume::Volume}, event::NodeEventList, - node::{AudioNode, AudioNodeInfo, AudioNodeProcessor, ProcBuffers, ProcInfo, ProcessStatus}, + node::{ + AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, ProcBuffers, + ProcInfo, ProcessStatus, + }, param::smoother::{SmoothedParam, SmootherConfig}, SilenceMask, }; @@ -98,10 +101,10 @@ impl AudioNode for VolumePanNode { .uses_events(true) } - fn processor( + fn construct_processor( &self, config: &Self::Configuration, - stream_info: &firewheel_core::StreamInfo, + cx: ConstructProcessorContext, ) -> impl AudioNodeProcessor { let (gain_l, gain_r) = self.compute_gains(config.amp_epsilon); @@ -112,7 +115,7 @@ impl AudioNode for VolumePanNode { smooth_secs: config.smooth_secs, ..Default::default() }, - stream_info.sample_rate, + cx.stream_info.sample_rate, ), gain_r: SmoothedParam::new( gain_r, @@ -120,7 +123,7 @@ impl AudioNode for VolumePanNode { smooth_secs: config.smooth_secs, ..Default::default() }, - stream_info.sample_rate, + cx.stream_info.sample_rate, ), params: *self, prev_block_was_silent: true, diff --git a/examples/custom_nodes/src/nodes/filter.rs b/examples/custom_nodes/src/nodes/filter.rs index c4b0eee3..a957396c 100644 --- a/examples/custom_nodes/src/nodes/filter.rs +++ b/examples/custom_nodes/src/nodes/filter.rs @@ -15,14 +15,22 @@ use firewheel::{ }, event::NodeEventList, node::{ - AudioNode, AudioNodeInfo, AudioNodeProcessor, EmptyConfig, ProcBuffers, ProcInfo, - ProcessStatus, + AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, EmptyConfig, + ProcBuffers, ProcInfo, ProcessStatus, }, param::smoother::{SmoothedParam, SmoothedParamBuffer}, SilenceMask, StreamInfo, }; -// The parameter struct holds all of the parameters of the node as plain values. +// The node struct holds all of the parameters of the node as plain values. +/// +/// # Notes about ECS +/// +/// In order to be friendlier to ECS's (entity component systems), it is encouraged +/// that any struct deriving this trait be POD (plain ol' data). If you want your +/// audio node to be usable in the Bevy game engine, also derive +/// `bevy_ecs::prelude::Component`. (You can hide this derive behind a feature flag +/// by using `#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]`). #[derive(Diff, Patch, Debug, Clone, Copy, PartialEq)] pub struct FilterNode { /// The cutoff frequency in hertz in the range `[20.0, 20_000.0]`. @@ -73,13 +81,13 @@ impl AudioNode for FilterNode { // // This method is called before the node processor is sent to the realtime // thread, so it is safe to do non-realtime things here like allocating. - fn processor( + fn construct_processor( &self, _config: &Self::Configuration, - stream_info: &StreamInfo, + cx: ConstructProcessorContext, ) -> impl AudioNodeProcessor { // The reciprocal of the sample rate. - let sample_rate_recip = stream_info.sample_rate_recip as f32; + let sample_rate_recip = cx.stream_info.sample_rate_recip as f32; let cutoff_hz = self.cutoff_hz.clamp(20.0, 20_000.0); let gain = self.volume.amp_clamped(DEFAULT_AMP_EPSILON); @@ -87,8 +95,12 @@ impl AudioNode for FilterNode { Processor { filter_l: OnePoleLPBiquad::new(cutoff_hz, sample_rate_recip), filter_r: OnePoleLPBiquad::new(cutoff_hz, sample_rate_recip), - cutoff_hz: SmoothedParam::new(cutoff_hz, Default::default(), stream_info.sample_rate), - gain: SmoothedParamBuffer::new(gain, Default::default(), stream_info), + cutoff_hz: SmoothedParam::new( + cutoff_hz, + Default::default(), + cx.stream_info.sample_rate, + ), + gain: SmoothedParamBuffer::new(gain, Default::default(), cx.stream_info), enable_declicker: Declicker::from_enabled(self.enabled), params: *self, sample_rate_recip, diff --git a/examples/custom_nodes/src/nodes/noise_gen.rs b/examples/custom_nodes/src/nodes/noise_gen.rs index ef9b58ea..e98c8a09 100644 --- a/examples/custom_nodes/src/nodes/noise_gen.rs +++ b/examples/custom_nodes/src/nodes/noise_gen.rs @@ -5,11 +5,22 @@ use firewheel::{ diff::{Diff, Patch}, dsp::volume::{Volume, DEFAULT_AMP_EPSILON}, event::NodeEventList, - node::{AudioNode, AudioNodeInfo, AudioNodeProcessor, ProcBuffers, ProcInfo, ProcessStatus}, - SilenceMask, StreamInfo, + node::{ + AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, ProcBuffers, + ProcInfo, ProcessStatus, + }, + SilenceMask, }; -// The parameter struct holds all of the parameters of the node as plain values. +// The node struct holds all of the parameters of the node as plain values. +/// +/// # Notes about ECS +/// +/// In order to be friendlier to ECS's (entity component systems), it is encouraged +/// that any struct deriving this trait be POD (plain ol' data). If you want your +/// audio node to be usable in the Bevy game engine, also derive +/// `bevy_ecs::prelude::Component`. (You can hide this derive behind a feature flag +/// by using `#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]`). #[derive(Diff, Patch, Debug, Clone, Copy, PartialEq)] pub struct NoiseGenNode { /// The overall volume. @@ -74,10 +85,10 @@ impl AudioNode for NoiseGenNode { // // This method is called before the node processor is sent to the realtime // thread, so it is safe to do non-realtime things here like allocating. - fn processor( + fn construct_processor( &self, config: &Self::Configuration, - _stream_info: &StreamInfo, + _cx: ConstructProcessorContext, ) -> impl AudioNodeProcessor { // Seed cannot be zero. let seed = if config.seed == 0 { 17 } else { config.seed }; diff --git a/examples/custom_nodes/src/nodes/rms.rs b/examples/custom_nodes/src/nodes/rms.rs index f4d3b9ea..29cb412c 100644 --- a/examples/custom_nodes/src/nodes/rms.rs +++ b/examples/custom_nodes/src/nodes/rms.rs @@ -2,17 +2,19 @@ //! //! This node calculates the RMS (root-mean-square) of a mono signal. -use std::sync::atomic::Ordering; - use atomic_float::AtomicF32; use firewheel::{ channel_config::{ChannelConfig, ChannelCount}, collector::ArcGc, diff::{Diff, Patch}, event::NodeEventList, - node::{AudioNode, AudioNodeInfo, AudioNodeProcessor, ProcBuffers, ProcInfo, ProcessStatus}, + node::{ + AudioNode, AudioNodeInfo, AudioNodeProcessor, ConstructProcessorContext, ProcBuffers, + ProcInfo, ProcessStatus, + }, StreamInfo, }; +use std::sync::atomic::Ordering; #[derive(Debug)] struct SharedState { @@ -32,31 +34,43 @@ impl Default for RmsConfig { } } -// The parameter struct holds all of the parameters of the node. -#[derive(Debug, Diff, Patch, Clone)] +// The node struct holds all of the parameters of the node. +/// +/// # Notes about ECS +/// +/// In order to be friendlier to ECS's (entity component systems), it is encouraged +/// that any struct deriving this trait be POD (plain ol' data). If you want your +/// audio node to be usable in the Bevy game engine, also derive +/// `bevy_ecs::prelude::Component`. (You can hide this derive behind a feature flag +/// by using `#[cfg_attr(feature = "bevy", derive(bevy_ecs::prelude::Component))]`). +/// +/// To keep this struct POD, this example makes use of the "custom state" API to +/// send the rms value from the processor to the user. +#[derive(Debug, Diff, Patch, Clone, Copy)] pub struct RmsNode { /// Whether or not this node is enabled. pub enabled: bool, - // `ArcGc` is a simple wrapper around `Arc` that automatically collects - // dropped resources from the audio thread and drops them on another - // thread. - // - // Because this type is synchronized with atomics, it - // doesn't need any diffing or patching. - #[diff(skip)] - shared_state: ArcGc, } impl Default for RmsNode { fn default() -> Self { - Self::new() + Self { enabled: true } } } -impl RmsNode { - pub fn new() -> Self { +// The state struct is stored in the Firewheel context, and the user can retrieve +// it using `FirewheelCtx::node_state` and `FirewheelCtx::node_state_mut`. +#[derive(Clone)] +pub struct RmsState { + // `ArcGc` is a simple wrapper around `Arc` that automatically collects + // dropped resources from the audio thread and drops them on another + // thread. + shared_state: ArcGc, +} + +impl RmsState { + fn new() -> Self { Self { - enabled: true, shared_state: ArcGc::new(SharedState { rms_value: AtomicF32::new(0.0), }), @@ -89,6 +103,12 @@ impl AudioNode for RmsNode { // this to `false` will save a bit of memory by not allocating an // event buffer for this node. .uses_events(true) + // Custom !Send state that can be stored in the Firewheel context and + // accessed by the user. + // + // The user accesses this state via `FirewheelCtx::node_state` and + // `FirewheelCtx::node_state_mut`. + .custom_state(RmsState::new()) } // Construct the realtime processor counterpart using the given information @@ -96,16 +116,20 @@ impl AudioNode for RmsNode { // // This method is called before the node processor is sent to the realtime // thread, so it is safe to do non-realtime things here like allocating. - fn processor( + fn construct_processor( &self, config: &Self::Configuration, - stream_info: &StreamInfo, + cx: ConstructProcessorContext, ) -> impl AudioNodeProcessor { let window_frames = - (config.window_size_secs * stream_info.sample_rate.get() as f32).round() as usize; + (config.window_size_secs * cx.stream_info.sample_rate.get() as f32).round() as usize; + + // Extract the custom state so we can get a reference to the shared state. + let custom_state = cx.custom_state::().unwrap(); Processor { params: self.clone(), + shared_state: ArcGc::clone(&custom_state.shared_state), squares: 0.0, num_squared_values: 0, window_frames, @@ -117,6 +141,7 @@ impl AudioNode for RmsNode { // The realtime processor counterpart to your node. struct Processor { params: RmsNode, + shared_state: ArcGc, squares: f32, num_squared_values: usize, window_frames: usize, @@ -137,10 +162,7 @@ impl AudioNodeProcessor for Processor { self.params.patch_list(events); if !self.params.enabled { - self.params - .shared_state - .rms_value - .store(0.0, Ordering::Relaxed); + self.shared_state.rms_value.store(0.0, Ordering::Relaxed); self.squares = 0.0; self.num_squared_values = 0; @@ -165,10 +187,7 @@ impl AudioNodeProcessor for Processor { let mean = self.squares / self.window_frames as f32; let rms = mean.sqrt(); - self.params - .shared_state - .rms_value - .store(rms, Ordering::Relaxed); + self.shared_state.rms_value.store(rms, Ordering::Relaxed); self.squares = 0.0; self.num_squared_values = 0; diff --git a/examples/custom_nodes/src/ui.rs b/examples/custom_nodes/src/ui.rs index 48bf0f4b..1e9e0195 100644 --- a/examples/custom_nodes/src/ui.rs +++ b/examples/custom_nodes/src/ui.rs @@ -2,7 +2,7 @@ use eframe::App; use egui::{Color32, ProgressBar}; use firewheel::dsp::volume::Volume; -use crate::system::AudioSystem; +use crate::{nodes::rms::RmsState, system::AudioSystem}; pub struct DemoApp { audio_system: AudioSystem, @@ -98,7 +98,12 @@ impl App for DemoApp { ); } - let rms_value = self.audio_system.rms_node.rms_value(); + let rms_value = self + .audio_system + .cx + .node_state::(self.audio_system.rms_node_id) + .unwrap() + .rms_value(); // The rms value is quite low, so scale it up to register on the meter better. ui.add(ProgressBar::new(rms_value * 2.0).fill(Color32::DARK_GREEN)); diff --git a/examples/play_sample/src/main.rs b/examples/play_sample/src/main.rs index c8937aa5..da448247 100644 --- a/examples/play_sample/src/main.rs +++ b/examples/play_sample/src/main.rs @@ -3,7 +3,7 @@ use std::time::Duration; use clap::Parser; use firewheel::{ error::UpdateError, - nodes::sampler::{PlaybackState, RepeatMode, SamplerNode}, + nodes::sampler::{PlaybackState, RepeatMode, SamplerNode, SamplerState}, FirewheelContext, Volume, }; use symphonium::SymphoniumLoader; @@ -47,12 +47,11 @@ fn main() { .into_dyn_resource(); sampler_node.set_sample(sample, Volume::UNITY_GAIN, RepeatMode::PlayOnce); - cx.queue_event_for( - sampler_id, - sampler_node.sync_params_event( - true, // start immediately - ), - ); + let event = cx + .node_state::(sampler_id) + .unwrap() + .sync_params_event(&sampler_node, true); + cx.queue_event_for(sampler_id, event); // Alternatively, instead of setting `start_immediately` to `true`, you can // tell the sampler to start playing its sequence like this: @@ -65,7 +64,12 @@ fn main() { // --- Simulated update loop --------------------------------------------------------- loop { - if sampler_node.playback_state() == PlaybackState::Stopped { + if cx + .node_state::(sampler_id) + .unwrap() + .playback_state() + == PlaybackState::Stopped + { // Sample has finished playing. break; } diff --git a/examples/sampler_pool/src/ui.rs b/examples/sampler_pool/src/ui.rs index f534b346..82431470 100644 --- a/examples/sampler_pool/src/ui.rs +++ b/examples/sampler_pool/src/ui.rs @@ -55,7 +55,11 @@ impl App for DemoApp { ); } - let num_active_works = self.audio_system.sampler_pool_1.poll().num_active_workers; + let num_active_works = self + .audio_system + .sampler_pool_1 + .poll(&self.audio_system.cx) + .num_active_workers; ui.label(format!("Num active workers: {}", num_active_works)); @@ -85,7 +89,11 @@ impl App for DemoApp { ); } - let num_active_works = self.audio_system.sampler_pool_2.poll().num_active_workers; + let num_active_works = self + .audio_system + .sampler_pool_2 + .poll(&self.audio_system.cx) + .num_active_workers; ui.label(format!("Num active workers: {}", num_active_works)); }); diff --git a/examples/sampler_test/src/system.rs b/examples/sampler_test/src/system.rs index d849e955..af225a98 100644 --- a/examples/sampler_test/src/system.rs +++ b/examples/sampler_test/src/system.rs @@ -3,8 +3,8 @@ use firewheel::{ error::UpdateError, node::NodeID, nodes::{ - peak_meter::{PeakMeterNode, PeakMeterSmoother}, - sampler::{PlaybackState, RepeatMode, SamplerNode, SequenceType}, + peak_meter::{PeakMeterNode, PeakMeterSmoother, PeakMeterState}, + sampler::{PlaybackState, RepeatMode, SamplerNode, SamplerState, SequenceType}, }, FirewheelContext, }; @@ -27,7 +27,7 @@ pub struct AudioSystem { samplers: Vec, - peak_meter_node: PeakMeterNode<2>, + peak_meter_id: NodeID, peak_meter_smoother: PeakMeterSmoother<2>, peak_meter_normalizer: DbMeterNormalizer, } @@ -43,7 +43,7 @@ impl AudioSystem { let graph_out = cx.graph_out_node_id(); - let peak_meter_node = PeakMeterNode::<2>::new(true); + let peak_meter_node = PeakMeterNode::<2> { enabled: true }; let peak_meter_smoother = PeakMeterSmoother::<2>::new(Default::default()); let peak_meter_id = cx.add_node(peak_meter_node.clone(), None); @@ -76,7 +76,7 @@ impl AudioSystem { Self { cx, samplers, - peak_meter_node, + peak_meter_id, peak_meter_smoother, peak_meter_normalizer: DbMeterNormalizer::default(), } @@ -103,41 +103,53 @@ impl AudioSystem { return; }; + let node_state = self.cx.node_state::(sampler.node_id).unwrap(); + if Volume::Linear(linear_volume) != *old_volume || repeat_mode != *old_repeat_mode { *old_volume = Volume::Linear(linear_volume); *old_repeat_mode = repeat_mode; - self.cx - .queue_event_for(sampler.node_id, sampler.params.sync_params_event(true)); + self.cx.queue_event_for( + sampler.node_id, + node_state.sync_params_event(&sampler.params, true), + ); } else { - self.cx - .queue_event_for(sampler.node_id, sampler.params.start_or_restart_event(None)); + self.cx.queue_event_for( + sampler.node_id, + node_state.start_or_restart_event(&sampler.params, None), + ); } } pub fn pause(&mut self, sampler_i: usize) { let sampler = &self.samplers[sampler_i]; + let node_state = self.cx.node_state::(sampler.node_id).unwrap(); self.cx - .queue_event_for(sampler.node_id, sampler.params.pause_event()); + .queue_event_for(sampler.node_id, node_state.pause_event()); } pub fn resume(&mut self, sampler_i: usize) { let sampler = &self.samplers[sampler_i]; + let node_state = self.cx.node_state::(sampler.node_id).unwrap(); self.cx - .queue_event_for(sampler.node_id, sampler.params.resume_event()); + .queue_event_for(sampler.node_id, node_state.resume_event(&sampler.params)); } pub fn stop(&mut self, sampler_i: usize) { let sampler = &self.samplers[sampler_i]; + let node_state = self.cx.node_state::(sampler.node_id).unwrap(); self.cx - .queue_event_for(sampler.node_id, sampler.params.stop_event()); + .queue_event_for(sampler.node_id, node_state.stop_event()); } pub fn playback_state(&self, sampler_i: usize) -> PlaybackState { - self.samplers[sampler_i].params.playback_state() + self.cx + .node_state::(self.samplers[sampler_i].node_id) + .unwrap() + .playback_state() } pub fn update(&mut self) { @@ -160,7 +172,10 @@ impl AudioSystem { pub fn update_meters(&mut self, delta_seconds: f32) { self.peak_meter_smoother.update( - self.peak_meter_node.peak_gain_db(DEFAULT_DB_EPSILON), + self.cx + .node_state::>(self.peak_meter_id) + .unwrap() + .peak_gain_db(DEFAULT_DB_EPSILON), delta_seconds, ); } diff --git a/examples/spatial_basic/src/system.rs b/examples/spatial_basic/src/system.rs index edea74e5..3f89e3be 100644 --- a/examples/spatial_basic/src/system.rs +++ b/examples/spatial_basic/src/system.rs @@ -3,7 +3,7 @@ use firewheel::{ error::UpdateError, node::NodeID, nodes::{ - sampler::{RepeatMode, SamplerNode}, + sampler::{RepeatMode, SamplerNode, SamplerState}, spatial_basic::SpatialBasicNode, }, FirewheelContext, Volume, @@ -62,7 +62,11 @@ impl AudioSystem { ) .unwrap(); - cx.queue_event_for(sampler_node_id, sampler_node.start_or_restart_event(None)); + let event = cx + .node_state::(sampler_node_id) + .unwrap() + .start_or_restart_event(&sampler_node, None); + cx.queue_event_for(sampler_node_id, event); Self { cx, diff --git a/examples/stream_nodes/src/main.rs b/examples/stream_nodes/src/main.rs index d2331181..9323acac 100644 --- a/examples/stream_nodes/src/main.rs +++ b/examples/stream_nodes/src/main.rs @@ -1,15 +1,11 @@ -use std::{ - num::NonZeroU32, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{num::NonZeroU32, sync::Arc, time::Duration}; use firewheel::{ channel_config::NonZeroChannelCount, error::UpdateError, nodes::stream::{ - reader::{StreamReaderConfig, StreamReaderNode}, - writer::{StreamWriterConfig, StreamWriterNode}, + reader::{StreamReaderConfig, StreamReaderNode, StreamReaderState}, + writer::{StreamWriterConfig, StreamWriterNode, StreamWriterState}, ReadStatus, ResamplingChannelConfig, }, FirewheelContext, @@ -33,23 +29,55 @@ fn main() { let graph_out_node_id = cx.graph_out_node_id(); - let mut stream_writer_node = StreamWriterNode::new( - StreamWriterConfig { - channel_config: ResamplingChannelConfig { + let stream_writer_id = cx.add_node( + StreamWriterNode, + Some(StreamWriterConfig { + channels: NUM_CHANNELS, + ..Default::default() + }), + ); + let stream_reader_id = cx.add_node( + StreamReaderNode, + Some(StreamReaderConfig { + channels: NUM_CHANNELS, + }), + ); + + cx.connect( + stream_writer_id, + graph_out_node_id, + &[(0, 0), (1, 1)], + false, + ) + .unwrap(); + cx.connect(stream_writer_id, stream_reader_id, &[(0, 0), (1, 1)], false) + .unwrap(); + + let event = cx + .node_state_mut::(stream_writer_id) + .unwrap() + .start_stream( + IN_SAMPLE_RATE, + output_stream_sample_rate, + ResamplingChannelConfig { // By default this is set to `0.4` (400 ms). You will probably want a larger // capacity buffer depending on your use case. Generally this value should // be at least twice as large as the size of packets you intend to send. capacity_seconds: CHANNEL_CAPACITY_SECONDS, ..Default::default() }, - ..Default::default() - }, - NUM_CHANNELS, - ); + ) + .unwrap(); + // This event must be sent to the node's processor for the stream to take effect. + cx.queue_event_for(stream_writer_id, event.into()); - let mut stream_reader_node = StreamReaderNode::new( - StreamReaderConfig { - channel_config: ResamplingChannelConfig { + let event = cx + .node_state_mut::(stream_reader_id) + .unwrap() + .start_stream( + OUT_SAMPLE_RATE, + output_stream_sample_rate, + ResamplingChannelConfig { // For stream readers, the `latency_seconds` value should also be at least // the size of packets you intend to read. Here, we use twice that size to // be safe. @@ -62,41 +90,23 @@ fn main() { capacity_seconds: 0.6, ..Default::default() }, - ..Default::default() - }, - NUM_CHANNELS, - ); - - let stream_writer_id = cx.add_node(stream_writer_node.clone(), None); - let stream_reader_id = cx.add_node(stream_reader_node.clone(), None); - - cx.connect( - stream_writer_id, - graph_out_node_id, - &[(0, 0), (1, 1)], - false, - ) - .unwrap(); - cx.connect(stream_writer_id, stream_reader_id, &[(0, 0), (1, 1)], false) - .unwrap(); - - let event = stream_writer_node - .start_stream(IN_SAMPLE_RATE, output_stream_sample_rate) - .unwrap(); - // This event must be sent to the node's processor for the stream to take effect. - cx.queue_event_for(stream_writer_id, event.into()); - - let event = stream_reader_node - .start_stream(OUT_SAMPLE_RATE, output_stream_sample_rate) + ) .unwrap(); // This event must be sent to the node's processor for the stream to take effect. cx.queue_event_for(stream_reader_id, event.into()); // Wrap the handles in an `Arc>>` so that we can send them to other threads. - let stream_writer_handle = Arc::new(Mutex::new(stream_writer_node)); - let stream_reader_handle = Arc::new(Mutex::new(stream_reader_node)); + let stream_writer_handle = Arc::new( + cx.node_state::(stream_writer_id) + .unwrap() + .handle(), + ); + let stream_reader_handle = Arc::new( + cx.node_state::(stream_reader_id) + .unwrap() + .handle(), + ); - let stream_writer_handle_2 = Arc::clone(&stream_writer_handle); std::thread::spawn(move || { let mut phasor: f32 = 0.0; let phasor_inc: f32 = 440.0 / IN_SAMPLE_RATE.get() as f32; @@ -107,7 +117,7 @@ fn main() { let mut in_buf = vec![0.0; packet_frames * NUM_CHANNELS.get().get() as usize]; loop { - let mut handle = stream_writer_handle_2.lock().unwrap(); + let mut handle = stream_writer_handle.lock().unwrap(); // If this happens excessively in Release mode, you may want to consider // increasing [`StreamWriterConfig::channel_config.latency_seconds`]. @@ -157,7 +167,6 @@ fn main() { } }); - let stream_reader_handle_2 = Arc::clone(&stream_reader_handle); std::thread::spawn(move || { // We will read packets of data that are 15 ms long, this time in // de-interleaved format. @@ -168,7 +177,7 @@ fn main() { .collect(); loop { - let mut handle = stream_reader_handle_2.lock().unwrap(); + let mut handle = stream_reader_handle.lock().unwrap(); // If this happens excessively in Release mode, you may want to consider // increasing [`StreamReaderConfig::channel_config.latency_seconds`]. @@ -237,8 +246,12 @@ fn main() { if let UpdateError::StreamStoppedUnexpectedly(_) = e { // Notify the stream node handles that the output stream has stopped. // This will automatically stop any active streams on the nodes. - stream_writer_handle.lock().unwrap().stop_stream(); - stream_reader_handle.lock().unwrap().stop_stream(); + cx.node_state_mut::(stream_writer_id) + .unwrap() + .stop_stream(); + cx.node_state_mut::(stream_reader_id) + .unwrap() + .stop_stream(); // The stream has stopped unexpectedly (i.e the user has // unplugged their headphones.) diff --git a/src/sampler_pool.rs b/src/sampler_pool.rs index 5551f9c5..ef6cbbf9 100644 --- a/src/sampler_pool.rs +++ b/src/sampler_pool.rs @@ -7,7 +7,7 @@ use firewheel_core::{ node::NodeID, }; use firewheel_cpal::FirewheelContext; -use firewheel_nodes::sampler::{PlaybackState, SamplerConfig, SamplerNode}; +use firewheel_nodes::sampler::{PlaybackState, SamplerConfig, SamplerNode, SamplerState}; use smallvec::SmallVec; use thunderdome::Arena; @@ -139,7 +139,10 @@ impl SamplerPool { break; } - let score = worker.sampler_node.worker_score(); + let score = cx + .node_state::(worker.sampler_id) + .unwrap() + .worker_score(&worker.sampler_node); if score == u64::MAX { idx = i; @@ -160,7 +163,10 @@ impl SamplerPool { let was_playing_sequence = if let Some(old_worker_id) = old_worker_id { self.worker_ids.remove(old_worker_id.0); - worker.sampler_node.playback_state().is_playing() + cx.node_state::(worker.sampler_id) + .unwrap() + .playback_state() + .is_playing() } else { false }; @@ -168,21 +174,17 @@ impl SamplerPool { worker.assigned_worker_id = Some(worker_id); worker.sampler_node.sequence = sampler_node.sequence.clone(); + let node_state = cx.node_state::(worker.sampler_id).unwrap(); + if let Some(delay) = delay { - cx.queue_event_for( - worker.sampler_id, - worker.sampler_node.sync_params_event(false), - ); - - cx.queue_event_for( - worker.sampler_id, - worker.sampler_node.start_or_restart_event(Some(delay)), - ); + let event_1 = node_state.sync_params_event(&worker.sampler_node, false); + let event_2 = node_state.start_or_restart_event(&worker.sampler_node, Some(delay)); + cx.queue_event_for(worker.sampler_id, event_1); + + cx.queue_event_for(worker.sampler_id, event_2); } else { - cx.queue_event_for( - worker.sampler_id, - worker.sampler_node.sync_params_event(true), - ); + let event = node_state.sync_params_event(&worker.sampler_node, true); + cx.queue_event_for(worker.sampler_id, event); } (fx_chain)(&mut worker.fx_state, cx); @@ -202,9 +204,11 @@ impl SamplerPool { } } - pub fn playback_state(&self, worker_id: WorkerID) -> PlaybackState { + pub fn playback_state(&self, worker_id: WorkerID, cx: &FirewheelContext) -> PlaybackState { if let Some(idx) = self.worker_ids.get(worker_id.0).copied() { - self.workers[idx].sampler_node.playback_state() + cx.node_state::(self.workers[idx].sampler_id) + .unwrap() + .playback_state() } else { PlaybackState::Stopped } @@ -233,7 +237,11 @@ impl SamplerPool { if let Some(idx) = self.worker_ids.get(worker_id.0).copied() { let worker = &mut self.workers[idx]; - cx.queue_event_for(worker.sampler_id, worker.sampler_node.pause_event()); + let event = cx + .node_state::(worker.sampler_id) + .unwrap() + .pause_event(); + cx.queue_event_for(worker.sampler_id, event); true } else { @@ -248,7 +256,11 @@ impl SamplerPool { if let Some(idx) = self.worker_ids.get(worker_id.0).copied() { let worker = &mut self.workers[idx]; - cx.queue_event_for(worker.sampler_id, worker.sampler_node.resume_event()); + let event = cx + .node_state::(worker.sampler_id) + .unwrap() + .resume_event(&worker.sampler_node); + cx.queue_event_for(worker.sampler_id, event); true } else { @@ -269,7 +281,11 @@ impl SamplerPool { worker.assigned_worker_id = None; - cx.queue_event_for(worker.sampler_id, worker.sampler_node.stop_event()); + let event = cx + .node_state::(worker.sampler_id) + .unwrap() + .stop_event(); + cx.queue_event_for(worker.sampler_id, event); true } else { @@ -281,7 +297,11 @@ impl SamplerPool { pub fn pause_all(&mut self, cx: &mut FirewheelContext) { for worker in self.workers.iter_mut() { if worker.assigned_worker_id.is_some() { - cx.queue_event_for(worker.sampler_id, worker.sampler_node.pause_event()); + let event = cx + .node_state::(worker.sampler_id) + .unwrap() + .pause_event(); + cx.queue_event_for(worker.sampler_id, event); } } } @@ -290,7 +310,11 @@ impl SamplerPool { pub fn resume_all(&mut self, cx: &mut FirewheelContext) { for worker in self.workers.iter_mut() { if worker.assigned_worker_id.is_some() { - cx.queue_event_for(worker.sampler_id, worker.sampler_node.resume_event()); + let event = cx + .node_state::(worker.sampler_id) + .unwrap() + .resume_event(&worker.sampler_node); + cx.queue_event_for(worker.sampler_id, event); } } } @@ -299,7 +323,11 @@ impl SamplerPool { pub fn stop_all(&mut self, cx: &mut FirewheelContext) { for worker in self.workers.iter_mut() { if let Some(_) = worker.assigned_worker_id.take() { - cx.queue_event_for(worker.sampler_id, worker.sampler_node.stop_event()); + let event = cx + .node_state::(worker.sampler_id) + .unwrap() + .stop_event(); + cx.queue_event_for(worker.sampler_id, event); } } @@ -318,10 +346,11 @@ impl SamplerPool { if let Some(idx) = self.worker_ids.get(worker_id.0).copied() { let worker = &mut self.workers[idx]; - cx.queue_event_for( - worker.sampler_id, - worker.sampler_node.set_playhead_event(playhead_seconds), - ); + let event = cx + .node_state::(worker.sampler_id) + .unwrap() + .set_playhead_event(playhead_seconds); + cx.queue_event_for(worker.sampler_id, event); true } else { @@ -341,12 +370,11 @@ impl SamplerPool { if let Some(idx) = self.worker_ids.get(worker_id.0).copied() { let worker = &mut self.workers[idx]; - cx.queue_event_for( - worker.sampler_id, - worker - .sampler_node - .set_playhead_samples_event(playhead_samples), - ); + let event = cx + .node_state::(worker.sampler_id) + .unwrap() + .set_playhead_samples_event(playhead_samples); + cx.queue_event_for(worker.sampler_id, event); true } else { @@ -361,11 +389,14 @@ impl SamplerPool { &mut self, worker_id: WorkerID, sample_rate: NonZeroU32, + cx: &FirewheelContext, ) -> Option { if let Some(idx) = self.worker_ids.get(worker_id.0).copied() { let worker = &self.workers[idx]; - worker.sampler_node.playhead_seconds(sample_rate) + cx.node_state::(worker.sampler_id) + .unwrap() + .playhead_seconds(&worker.sampler_node, sample_rate) } else { None } @@ -375,11 +406,13 @@ impl SamplerPool { /// single channel of audio). /// /// Returns `none` if a worker with the given ID does not exist. - pub fn playhead_samples(&mut self, worker_id: WorkerID) -> Option { + pub fn playhead_samples(&mut self, worker_id: WorkerID, cx: &FirewheelContext) -> Option { if let Some(idx) = self.worker_ids.get(worker_id.0).copied() { let worker = &self.workers[idx]; - worker.sampler_node.playhead_samples() + cx.node_state::(worker.sampler_id) + .unwrap() + .playhead_samples(&worker.sampler_node) } else { None } @@ -389,13 +422,18 @@ impl SamplerPool { /// workers which have finished playing. /// /// Calling this method is optional. - pub fn poll(&mut self) -> PollResult { + pub fn poll(&mut self, cx: &FirewheelContext) -> PollResult { let mut num_active_workers = 0; let mut finished_workers = SmallVec::new(); for worker in self.workers.iter_mut() { if worker.assigned_worker_id.is_some() { - if worker.sampler_node.playback_state() == PlaybackState::Stopped { + let playback_state = cx + .node_state::(worker.sampler_id) + .unwrap() + .playback_state(); + + if playback_state == PlaybackState::Stopped { finished_workers.push(worker.assigned_worker_id.take().unwrap()); } else { num_active_workers += 1;