Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ extern crate alloc;
extern crate collections;

#[cfg(not(feature = "std"))]
type Vec<T> = collections::vec::Vec<T>;
type BTreeMap<K, V> = collections::btree_map::BTreeMap<K, V>;
#[cfg(feature = "std")]
type Vec<T> = std::vec::Vec<T>;
type BTreeMap<K, V> = std::collections::BTreeMap<K, V>;

#[cfg(not(feature = "std"))]
type VecDeque<T> = collections::vec_deque::VecDeque<T>;
Expand Down
147 changes: 110 additions & 37 deletions src/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! Working with **Signal**s allows for easy, readable creation of rich and complex DSP graphs with
//! a simple and familiar API.

use {Duplex, Frame, Sample, Vec, Rc, VecDeque};
use {BTreeMap, Duplex, Frame, Sample, Rc, VecDeque};
use interpolate::{Converter, Interpolator};
use core;

Expand Down Expand Up @@ -414,24 +414,24 @@ pub trait Signal {
/// use sample::{signal, Signal};
///
/// fn main() {
/// let frames = [[0.1], [0.2], [0.3]];
/// let frames = [[0.1], [0.2], [0.3], [0.4], [0.5], [0.6]];
/// let signal = signal::from_slice(&frames);
/// let bus = signal.bus();
/// let mut a = bus.send();
/// let mut b = bus.send();
/// assert_eq!(a.take(3).collect::<Vec<_>>(), vec![[0.1], [0.2], [0.3]]);
/// assert_eq!(b.take(3).collect::<Vec<_>>(), vec![[0.1], [0.2], [0.3]]);
/// assert_eq!(a.by_ref().take(3).collect::<Vec<_>>(), vec![[0.1], [0.2], [0.3]]);
/// assert_eq!(b.by_ref().take(3).collect::<Vec<_>>(), vec![[0.1], [0.2], [0.3]]);
///
/// let c = bus.send();
/// assert_eq!(c.take(3).collect::<Vec<_>>(), vec![[0.4], [0.5], [0.6]]);
/// assert_eq!(b.take(3).collect::<Vec<_>>(), vec![[0.4], [0.5], [0.6]]);
/// assert_eq!(a.take(3).collect::<Vec<_>>(), vec![[0.4], [0.5], [0.6]]);
/// }
/// ```
fn bus(self) -> Bus<Self>
where Self: Sized,
{
Bus {
node: Rc::new(core::cell::RefCell::new(SharedNode {
signal: self,
buffers: vec![VecDeque::new()],
})),
}
Bus::new(self, BTreeMap::new())
}

/// Converts the `Signal` into an `Iterator` that will yield the given number for `Frame`s
Expand Down Expand Up @@ -708,7 +708,12 @@ struct SharedNode<S>
where S: Signal,
{
signal: S,
buffers: Vec<VecDeque<S::Frame>>,
// The buffer of frames that have not yet been consumed by all outputs.
buffer: VecDeque<S::Frame>,
// The number of frames in `buffer` that have already been read for each output.
frames_read: BTreeMap<usize, usize>,
// The next output key.
next_key: usize,
}

/// An output node to which some signal `S` is `Output`ing its frames.
Expand All @@ -717,7 +722,7 @@ struct SharedNode<S>
pub struct Output<S>
where S: Signal,
{
idx: usize,
key: usize,
node: Rc<core::cell::RefCell<SharedNode<S>>>,
}

Expand Down Expand Up @@ -1679,13 +1684,32 @@ impl<S> Signal for ClipAmp<S>
impl<S> Bus<S>
where S: Signal,
{
fn new(signal: S, frames_read: BTreeMap<usize, usize>) -> Self {
Bus {
node: Rc::new(core::cell::RefCell::new(SharedNode {
signal: signal,
buffer: VecDeque::new(),
frames_read: frames_read,
next_key: 0,
})),
}
}

/// Produce a new Output node to which the signal `S` will output its frames.
#[inline]
pub fn send(&self) -> Output<S> {
let idx = self.node.borrow().buffers.len();
self.node.borrow_mut().buffers.push(VecDeque::new());
let mut node = self.node.borrow_mut();

// Get the key and increment for the next output.
let key = node.next_key;
node.next_key = node.next_key.wrapping_add(1);

// Insert the number of frames read by the new output.
let num_frames = node.buffer.len();
node.frames_read.insert(key, num_frames);

Output {
idx: idx,
key: key,
node: self.node.clone(),
}
}
Expand All @@ -1694,28 +1718,69 @@ impl<S> Bus<S>
impl<S> SharedNode<S>
where S: Signal,
{
/// Requests the next frame for the `Output` whose ring buffer lies at the given index.
///
/// If there are no frames waiting in the front of the ring buffer, a new frame will be
/// requested from the `signal` and appended to the back of each ring buffer.
#[inline]
fn next_frame(&mut self, idx: usize) -> S::Frame {
loop {
match self.buffers[idx].pop_front() {
Some(frame) => return frame,
None => {
let frame = self.signal.next();
for buffer in self.buffers.iter_mut() {
buffer.push_back(frame);
}
}
// Requests the next frame for the `Output` at the given key.
//
// If there are no frames pending for the output, a new frame will be requested from the
// signal and appended to the ring buffer to be received by the other outputs.
fn next_frame(&mut self, key: usize) -> S::Frame {
let num_frames = self.buffer.len();
let frames_read = self.frames_read
.remove(&key)
.expect("no frames_read for Output");

let frame = if frames_read < num_frames {
self.buffer[frames_read]
} else {
let frame = self.signal.next();
self.buffer.push_back(frame);
frame
};

// If the number of frames read by this output is the lowest, then we can pop the frame
// from the front.
let least_frames_read = !self.frames_read
.values()
.any(|&other_frames_read| other_frames_read <= frames_read);

// If this output had read the least number of frames, pop the front frame and decrement
// the frames read counters for each of the other outputs.
let new_frames_read = if least_frames_read {
self.buffer.pop_front();
for other_frames_read in self.frames_read.values_mut() {
*other_frames_read -= 1;
}
}
frames_read
} else {
frames_read + 1
};

self.frames_read.insert(key, new_frames_read);

frame
}

#[inline]
fn pending_frames(&self, idx: usize) -> usize {
self.buffers[idx].len()
fn pending_frames(&self, key: usize) -> usize {
self.buffer.len() - self.frames_read[&key]
}

// Drop the given output from the `Bus`.
//
// Called by the `Output::drop` implementation.
fn drop_output(&mut self, key: usize) {
self.frames_read.remove(&key);
let least_frames_read = self
.frames_read
.values()
.fold(self.buffer.len(), |a, &b| core::cmp::min(a, b));
if least_frames_read > 0 {
for frames_read in self.frames_read.values_mut() {
*frames_read -= least_frames_read;
}
for _ in 0..least_frames_read {
self.buffer.pop_front();
}
}
}
}

Expand All @@ -1730,15 +1795,15 @@ impl<S> Output<S>
///
/// # Example
///
/// ```rust
/// ```
/// extern crate sample;
///
/// use sample::{signal, Signal};
///
/// fn main() {
/// let frames = [[0.1], [0.2], [0.3]];
/// let bus = signal::from_slice(&frames).bus();
/// let mut signal = bus.send();
/// let signal = bus.send();
/// let mut monitor = bus.send();
/// assert_eq!(signal.take(3).collect::<Vec<_>>(), vec![[0.1], [0.2], [0.3]]);
/// assert_eq!(monitor.pending_frames(), 3);
Expand All @@ -1748,7 +1813,7 @@ impl<S> Output<S>
/// ```
#[inline]
pub fn pending_frames(&self) -> usize {
self.node.borrow().pending_frames(self.idx)
self.node.borrow().pending_frames(self.key)
}
}

Expand All @@ -1758,7 +1823,15 @@ impl<S> Signal for Output<S>
type Frame = S::Frame;
#[inline]
fn next(&mut self) -> Self::Frame {
self.node.borrow_mut().next_frame(self.idx)
self.node.borrow_mut().next_frame(self.key)
}
}

impl<S> Drop for Output<S>
where S: Signal,
{
fn drop(&mut self) {
self.node.borrow_mut().drop_output(self.key)
}
}

Expand Down