Skip to content

Commit

Permalink
get op logging going server side
Browse files Browse the repository at this point in the history
  • Loading branch information
Charlie Somerville committed Apr 2, 2020
1 parent 8e19fb4 commit 28652de
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 64 deletions.
42 changes: 42 additions & 0 deletions protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde_derive::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, Debug)]
pub enum ServerMessage {
WorkspaceState(WorkspaceState),
ModelOp(ModelOp),
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand All @@ -13,7 +14,20 @@ pub struct WorkspaceState {

#[derive(Serialize, Deserialize, Debug)]
pub enum ClientMessage {
CreateModule(ModuleParams, WindowGeometry),
UpdateModuleParams(ModuleId, ModuleParams),
UpdateWindowGeometry(ModuleId, WindowGeometry),
DeleteModule(ModuleId),
CreateConnection(InputId, OutputId),
DeleteConnection(InputId),
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum ModelOp {
CreateModule(ModuleId, ModuleParams, WindowGeometry),
UpdateModuleParams(ModuleId, ModuleParams),
UpdateWindowGeometry(ModuleId, WindowGeometry),
DeleteModule(ModuleId),
CreateConnection(InputId, OutputId),
DeleteConnection(InputId),
}
Expand Down Expand Up @@ -72,3 +86,31 @@ pub enum ModuleParams {
pub struct SineGeneratorParams {
pub freq: f64,
}

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub struct Coords {
pub x: i32,
pub y: i32,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WindowGeometry {
pub position: Coords,
pub z_index: usize,
}

impl Coords {
pub fn add(&self, other: Coords) -> Coords {
Coords {
x: self.x + other.x,
y: self.y + other.y,
}
}

pub fn sub(&self, other: Coords) -> Coords {
Coords {
x: self.x - other.x,
y: self.y - other.y,
}
}
}
166 changes: 118 additions & 48 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use std::thread;
use std::time::{Instant, Duration};

use cpal::traits::{HostTrait, DeviceTrait, StreamTrait};
use ringbuf::{RingBuffer, Producer, Consumer};
use tokio::sync::oneshot;
use ringbuf::{RingBuffer, Producer};
use tokio::sync::{oneshot, broadcast};

use mixlab_protocol::{ModuleId, InputId, OutputId, ModuleParams, SineGeneratorParams, ClientMessage, TerminalId, WorkspaceState};
use mixlab_protocol::{ModuleId, InputId, OutputId, ModuleParams, SineGeneratorParams, ClientMessage, TerminalId, WorkspaceState, WindowGeometry, ModelOp};

use crate::util::Sequence;

Expand All @@ -21,9 +21,10 @@ pub const TICKS_PER_SECOND: usize = 10;
pub const SAMPLES_PER_TICK: usize = SAMPLE_RATE / TICKS_PER_SECOND;

pub struct OutputDeviceState {
stream: cpal::Stream,
tx: Producer<f32>,
file: std::fs::File,
// this field is never used directly but must not be dropped for the
// stream to continue playing:
_stream: cpal::Stream,
}

impl Debug for OutputDeviceState {
Expand Down Expand Up @@ -54,7 +55,7 @@ impl Module {
let config = device.default_output_config()
.expect("default_output_format");

let (mut tx, mut rx) = RingBuffer::<f32>::new(SAMPLES_PER_TICK * 8).split();
let (tx, mut rx) = RingBuffer::<f32>::new(SAMPLES_PER_TICK * 8).split();

let stream = device.build_output_stream(
&config.config(),
Expand All @@ -71,15 +72,14 @@ impl Module {
})
.expect("build_output_stream");

stream.play();
stream.play().expect("stream.play");

println!("device: {:?}", device.name());
println!("config: {:?}", config);

let state = OutputDeviceState {
stream,
tx,
file: std::fs::File::create("/Users/charlie/Downloads/2ch.pcm").expect("File::create"),
_stream: stream,
};

Module::OutputDevice(state, ())
Expand All @@ -102,7 +102,7 @@ impl Module {

fn update(&mut self, new_params: ModuleParams) {
match (self, &new_params) {
(Module::SineGenerator(state, ref mut params), ModuleParams::SineGenerator(new_params)) => {
(Module::SineGenerator(_, ref mut params), ModuleParams::SineGenerator(new_params)) => {
*params = new_params.clone();
}
(module, new_params) => {
Expand All @@ -113,7 +113,7 @@ impl Module {

fn run_tick(&mut self, t: u64, inputs: &[&[Sample]], outputs: &mut [&mut [Sample]]) {
match self {
Module::SineGenerator(state, params) => {
Module::SineGenerator(_, params) => {
let t = t as Sample * SAMPLES_PER_TICK as Sample;

for i in 0..SAMPLES_PER_TICK {
Expand All @@ -125,14 +125,14 @@ impl Module {
}
}
}
Module::OutputDevice(state, params) => {
Module::OutputDevice(state, _) => {
state.tx.push_slice(inputs[0]);
// use std::io::Write;
// for sample in inputs[0] {
// state.file.write(&sample.to_le_bytes());
// }
}
Module::Mixer2ch(state, params) => {
Module::Mixer2ch(_, _) => {
for i in 0..SAMPLES_PER_TICK {
for chan in 0..CHANNELS {
let j = i * CHANNELS + chan;
Expand Down Expand Up @@ -160,17 +160,25 @@ impl Module {
}
}

#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq)]
pub struct LogPosition(usize);

pub enum EngineMessage {
DumpState(oneshot::Sender<WorkspaceState>),
ConnectSession(oneshot::Sender<(WorkspaceState, EngineOps)>),
ClientMessage(ClientMessage),
}

pub struct EngineHandle {
commands: SyncSender<EngineMessage>,
cmd_tx: SyncSender<EngineMessage>,
}

pub struct EngineSession {
cmd_tx: SyncSender<EngineMessage>,
}

pub fn start() -> EngineHandle {
let (tx, rx) = mpsc::sync_channel(8);
let (cmd_tx, cmd_rx) = mpsc::sync_channel(8);
let (log_tx, _) = broadcast::channel(64);

thread::spawn(move || {
let mut modules = HashMap::new();
Expand All @@ -180,18 +188,19 @@ pub fn start() -> EngineHandle {
modules.insert(ModuleId(3), Module::create(ModuleParams::Mixer2ch));

let mut engine = Engine {
commands: rx,
cmd_rx,
log_tx,
log_seq: Sequence::new(),
modules: modules,
geometry: HashMap::new(),
module_seq: Sequence::new(),
connections: HashMap::new(),
};

engine.run();
});

EngineHandle {
commands: tx,
}
EngineHandle { cmd_tx }
}

#[derive(Debug)]
Expand All @@ -200,29 +209,46 @@ pub enum EngineError {
Busy,
}

impl<T> From<TrySendError<T>> for EngineError {
fn from(e: TrySendError<T>) -> Self {
match e {
TrySendError::Full(_) => EngineError::Busy,
TrySendError::Disconnected(_) => EngineError::Stopped,
}
}
}

pub type EngineOps = broadcast::Receiver<(LogPosition, ModelOp)>;

impl EngineHandle {
pub async fn dump_state(&self) -> Result<WorkspaceState, EngineError> {
pub async fn connect(&self) -> Result<(WorkspaceState, EngineOps, EngineSession), EngineError> {
let cmd_tx = self.cmd_tx.clone();

let (tx, rx) = oneshot::channel();
self.send_message(EngineMessage::DumpState(tx));
rx.await.map_err(|_| EngineError::Stopped)
cmd_tx.try_send(EngineMessage::ConnectSession(tx))?;
let (state, log_rx) = rx.await.map_err(|_| EngineError::Stopped)?;

Ok((state, log_rx, EngineSession { cmd_tx }))
}
}

impl EngineSession {
/// TODO - maybe pass log position in here and detect conflicts?
pub fn update(&self, msg: ClientMessage) -> Result<(), EngineError> {
self.send_message(EngineMessage::ClientMessage(msg))
}

fn send_message(&self, msg: EngineMessage) -> Result<(), EngineError> {
match self.commands.try_send(msg) {
Ok(()) => Ok(()),
Err(TrySendError::Full(_)) => Err(EngineError::Busy),
Err(TrySendError::Disconnected(_)) => Err(EngineError::Stopped),
}
Ok(self.cmd_tx.try_send(msg)?)
}
}

pub struct Engine {
commands: Receiver<EngineMessage>,
cmd_rx: Receiver<EngineMessage>,
log_tx: broadcast::Sender<(LogPosition, ModelOp)>,
log_seq: Sequence,
modules: HashMap<ModuleId, Module>,
geometry: HashMap<ModuleId, WindowGeometry>,
module_seq: Sequence,
connections: HashMap<InputId, OutputId>,
}
Expand All @@ -245,8 +271,8 @@ impl Engine {
break;
}

match self.commands.recv_timeout(sleep_until - now) {
Ok(EngineMessage::DumpState(tx)) => { tx.send(self.dump_state()); }
match self.cmd_rx.recv_timeout(sleep_until - now) {
Ok(EngineMessage::ConnectSession(tx)) => { let _ = tx.send(self.connect_session()); }
Ok(EngineMessage::ClientMessage(msg)) => { self.client_update(msg); }
Err(RecvTimeoutError::Timeout) => { break; }
Err(RecvTimeoutError::Disconnected) => { return; }
Expand All @@ -255,6 +281,12 @@ impl Engine {
}
}

fn connect_session(&mut self) -> (WorkspaceState, EngineOps) {
let log_rx = self.log_tx.subscribe();
let state = self.dump_state();
(state, log_rx)
}

fn dump_state(&self) -> WorkspaceState {
let mut state = WorkspaceState {
modules: Vec::new(),
Expand All @@ -272,11 +304,56 @@ impl Engine {
state
}

fn log_op(&mut self, op: ModelOp) {
let pos = LogPosition(self.log_seq.next());
let _ = self.log_tx.send((pos, op));
}

fn client_update(&mut self, msg: ClientMessage) {
match msg {
ClientMessage::CreateModule(params, geometry) => {
// TODO - the audio engine is not actually concerned with
// window geometry and so should not own this data and force
// all accesses to it to go via the live audio thread
let id = ModuleId(self.module_seq.next());
self.modules.insert(id, Module::create(params.clone()));
self.geometry.insert(id, geometry.clone());
self.log_op(ModelOp::CreateModule(id, params, geometry));
}
ClientMessage::UpdateModuleParams(module_id, params) => {
if let Some(module) = self.modules.get_mut(&module_id) {
module.update(params);
module.update(params.clone());
self.log_op(ModelOp::UpdateModuleParams(module_id, params));
}
}
ClientMessage::UpdateWindowGeometry(module_id, geometry) => {
if let Some(geom) = self.geometry.get_mut(&module_id) {
*geom = geometry.clone();
self.log_op(ModelOp::UpdateWindowGeometry(module_id, geometry));
}
}
ClientMessage::DeleteModule(module_id) => {
// find any connections connected to this module's inputs or
// outputs and delete them, generating oplog entries

let mut deleted_connections = Vec::new();

for (input, output) in &self.connections {
if input.module_id() == module_id || output.module_id() == module_id {
deleted_connections.push(*input);
}
}

for deleted_connection in deleted_connections {
self.connections.remove(&deleted_connection);
self.log_op(ModelOp::DeleteConnection(deleted_connection));
}

// finally, delete the module:

if self.modules.contains_key(&module_id) {
self.modules.remove(&module_id);
self.log_op(ModelOp::DeleteModule(module_id));
}
}
ClientMessage::CreateConnection(input_id, output_id) => {
Expand All @@ -289,10 +366,16 @@ impl Engine {
return;
}

self.connections.insert(input_id, output_id);
if let Some(_) = self.connections.insert(input_id, output_id) {
self.log_op(ModelOp::DeleteConnection(input_id));
}

self.log_op(ModelOp::CreateConnection(input_id, output_id));
}
ClientMessage::DeleteConnection(input_id) => {
self.connections.remove(&input_id);
if let Some(_) = self.connections.remove(&input_id) {
self.log_op(ModelOp::DeleteConnection(input_id));
}
}
}

Expand All @@ -313,19 +396,6 @@ impl Engine {
}
}

fn add_module(&mut self, module: Module) -> ModuleId {
let id = ModuleId(self.module_seq.next());
self.modules.insert(id, module);
id
}

fn remove_module(&mut self, id: ModuleId) {
self.modules.remove(&id);
self.connections.retain(|input, output| {
input.module_id() != id && output.module_id() != id
});
}

fn run_tick(&mut self, t: u64) {
// find terminal modules - modules which do not send their output to
// the input of any other module
Expand Down Expand Up @@ -378,7 +448,7 @@ impl Engine {
let mut buffers = HashMap::<OutputId, Vec<Sample>>::new();

for module_id in reverse_module_order.iter().rev() {
let mut module = self.modules.get_mut(&module_id)
let module = self.modules.get_mut(&module_id)
.expect("module get_mut");

let connections = &self.connections;
Expand Down

0 comments on commit 28652de

Please sign in to comment.