Skip to content
This repository has been archived by the owner on Feb 2, 2019. It is now read-only.

Commit

Permalink
backend: add subscription and notification system
Browse files Browse the repository at this point in the history
- Now, people can subscribe, and they get notified of stuff
  whenever it happens. Nice!
  • Loading branch information
eeeeeta committed Mar 24, 2017
1 parent aee7690 commit 02d52d9
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 75 deletions.
2 changes: 1 addition & 1 deletion sqa-backend/src/actions/audio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl ActionController for Controller {
fn set_params(&mut self, p: AudioParams) {
self.params = p;
}
fn verify_params(&self, ctx: ActionContext) -> Vec<ParameterError> {
fn verify_params(&self, ctx: &mut ActionContext) -> Vec<ParameterError> {
let mut ret = vec![];
if let Some(ref st) = self.params.url {
let mf = MediaFile::new(ctx.media, &st);
Expand Down
44 changes: 32 additions & 12 deletions sqa-backend/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ use std::fmt::Debug;
use serde_json;
mod audio;

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct ParameterError {
name: Cow<'static, str>,
err: String
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub enum PlaybackState {
Inactive,
Unverified(Vec<ParameterError>),
Expand All @@ -32,8 +32,8 @@ pub enum PlaybackState {
Active,
Errored(String)
}
pub struct ControllerParams<'a> {
ctx: ActionContext<'a>,
pub struct ControllerParams<'a, 'b: 'a> {
ctx: &'a mut ActionContext<'b>,
internal_tx: &'a IntSender,
uuid: Uuid
}
Expand All @@ -46,7 +46,7 @@ pub trait ActionController {
fn desc(&self) -> String;
fn get_params(&self) -> &Self::Parameters;
fn set_params(&mut self, Self::Parameters);
fn verify_params(&self, ctx: ActionContext) -> Vec<ParameterError>;
fn verify_params(&self, ctx: &mut ActionContext) -> Vec<ParameterError>;
fn load(&mut self, _ctx: ControllerParams) -> BackendResult<bool> {
Ok(true)
}
Expand Down Expand Up @@ -91,20 +91,40 @@ impl Action {
pub fn accept_audio_message(&mut self, msg: &mut AudioThreadMessage) -> bool {
unimplemented!()
}
pub fn load(&mut self, ctx: ActionContext, sender: &IntSender) -> BackendResult<bool> {
action!(mut self.ctl).load(ControllerParams { ctx: ctx, internal_tx: sender, uuid: self.uu })
pub fn load(&mut self, ctx: &mut ActionContext, sender: &IntSender) -> BackendResult<bool> {
let cp: ControllerParams = ControllerParams { ctx: ctx, internal_tx: sender, uuid: self.uu };
action!(mut self.ctl).load(cp)
}
pub fn execute(&mut self, time: u64, ctx: ActionContext, sender: &IntSender) -> BackendResult<bool> {
pub fn execute(&mut self, time: u64, ctx: &mut ActionContext, sender: &IntSender) -> BackendResult<bool> {
action!(mut self.ctl).execute(time, None, ControllerParams { ctx: ctx, internal_tx: sender, uuid: self.uu })
}
pub fn state_change(&mut self, ps: PlaybackState) {
self.state = ps;
}
pub fn get_params(&self) -> BackendResult<String> {
serde_json::to_string(action!(self.ctl).get_params()).map_err(|e| e.into())
pub fn get_data(&mut self, ctx: &mut ActionContext) -> BackendResult<serde_json::Value> {
self.verify_params(ctx);
let state = serde_json::to_value(&self.state)?;
let params = serde_json::to_value(action!(self.ctl).get_params())?;
Ok(json!({
"state": state,
"params": params
}))
}
pub fn verify_params(&self, ctx: ActionContext) -> Vec<ParameterError> {
action!(self.ctl).verify_params(ctx)
pub fn verify_params(&mut self, ctx: &mut ActionContext) {
use self::PlaybackState::*;
let mut new = None;
match self.state {
Unverified(..) | Inactive => new = Some(action!(self.ctl).verify_params(ctx)),
_ => {}
}
if let Some(vec) = new {
if vec.len() == 0 {
self.state = Inactive;
}
else {
self.state = Unverified(vec)
}
}
}
pub fn set_params(&mut self, data: &str) -> BackendResult<()> {
match self.ctl {
Expand Down
11 changes: 6 additions & 5 deletions sqa-backend/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ use uuid::Uuid;
pub enum Command {
/// /ping -> /pong
Ping,
/// /pong
Pong,
/// /subscribe -> /reply/subscribe
Subscribe,
/// /create {type} -> /reply/create UUID
CreateAction { typ: String },
/// /action/{uuid} -> /reply/action/{uuid} {parameters}
ActionParams { uuid: Uuid },
/// /action/{uuid} -> /reply/action/{uuid} {details}
ActionInfo { uuid: Uuid },
/// /action/{uuid}/update {parameters} -> /reply/action/{uuid}/update
UpdateActionParams { uuid: Uuid, params: String },
/// /action/{uuid}/delete -> /reply/action/{uuid}/delete
Expand Down Expand Up @@ -60,6 +60,7 @@ fn parse_osc_message(addr: &str, args: Option<Vec<OscType>>) -> BackendResult<Co
}
match &path[1..] {
&["ping"] => Ok(Command::Ping),
&["subscribe"] => Ok(Command::Subscribe),
&["create"] => {
if args.len() != 1 {
bail!(BackendErrorKind::MalformedOSCPath);
Expand All @@ -73,7 +74,7 @@ fn parse_osc_message(addr: &str, args: Option<Vec<OscType>>) -> BackendResult<Co
},
&["action", uuid] => {
let uuid = Uuid::parse_str(uuid)?;
Ok(Command::ActionParams { uuid: uuid })
Ok(Command::ActionInfo { uuid: uuid })
},
&["action", uuid, cmd, ref a..] => {
let uuid = Uuid::parse_str(uuid)?;
Expand Down
31 changes: 27 additions & 4 deletions sqa-backend/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::io::Result as IoResult;
pub const INTERNAL_BUFFER_SIZE: usize = 128;
pub const VERSION: &str = "SQA Backend alpha";

#[derive(Debug)]
pub struct Party {
addr: SocketAddr,
subscribed_at: SteadyTime,
Expand Down Expand Up @@ -59,6 +60,16 @@ impl<M> ConnData<M> {
}))?;
Ok(())
}
pub fn subscribe(&mut self) {
let a = self.addr.clone();
self.parties.retain(|party| {
party.addr != a
});
self.parties.push(Party {
addr: a,
subscribed_at: SteadyTime::now()
});
}
/* pub fn register_interest(&mut self) -> IoResult<oneshot::Receiver<bool>> {
if let Some((pid, pkt)) = self.party_data {
let party = self.parties.get_mut(pid)
Expand All @@ -75,15 +86,19 @@ impl<M> ConnData<M> {
else {
Err(::std::io::Error::new(::std::io::ErrorKind::Other, "API used incorrectly: calling register_interest() at the wrong time"))
}
}*/
pub fn broadcast(&mut self, msg: OscMessage) -> IoResult<usize> {
}*/
pub fn broadcast<T>(&mut self, path: String, data: T) -> IoResult<usize> where T: Serialize {
let mut n_sent = 0;
let now = SteadyTime::now();
self.parties.retain(|party| {
now - party.subscribed_at <= Duration::seconds(30)
});
let j = serde_json::to_string(&data).unwrap(); // FIXME FIXME FIXME
for party in self.parties.iter_mut() {
self.framed.start_send(party.addr.msg_to(msg.clone()))?;
self.framed.start_send(party.addr.msg_to(OscMessage {
addr: path.clone(),
args: Some(vec![OscType::String(j.clone())])
}))?;
n_sent += 1;
}
Ok(n_sent)
Expand All @@ -100,9 +115,17 @@ impl<H> Connection<H> where H: ConnHandler {
self.data.addr = addr;
self.data.path = path;
self.hdlr.external(&mut self.data, pkt);
for party in self.data.parties.iter_mut() {
if party.addr == self.data.addr {
party.subscribed_at = SteadyTime::now();
}
}
},
Err(e) => {
self.data.framed.start_send(addr.msg_to("/error/deserfail".into()))?;
self.data.framed.start_send(addr.msg_to(OscMessage {
addr: "/error/deserfail".into(),
args: Some(vec![OscType::String(e.to_string())])
}))?;
println!("Deser failed: {:?}", e);
}
};
Expand Down
1 change: 1 addition & 0 deletions sqa-backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ extern crate tokio_core;
extern crate serde;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate serde_json;
extern crate time;
extern crate uuid;
Expand Down
99 changes: 46 additions & 53 deletions sqa-backend/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ pub struct ActionContext<'a> {
pub media: &'a mut MediaContext,
pub remote: &'a mut Remote
}
macro_rules! do_with_ctx {
($self:expr, $uu:expr, $clo:expr) => {{
match $self.actions.get_mut($uu) {
Some(a) => {
let ctx = ActionContext {
engine: &mut $self.engine,
media: &mut $self.media,
remote: &mut $self.remote
};
$clo(a, ctx)
},
_ => Err("No action found".into())
}
}}
}
pub enum ServerMessage {
Audio(AudioThreadMessage),
ActionStateChange(Uuid, PlaybackState),
Expand Down Expand Up @@ -63,6 +78,10 @@ impl ConnHandler for Context {
Ping => {
d.respond("/pong".into());
},
Subscribe => {
d.subscribe();
d.respond("/reply/subscribe".into());
},
CreateAction { typ } => {
d.reply::<Result<Uuid, String>>(match &*typ {
"audio" => {
Expand All @@ -74,67 +93,35 @@ impl ConnHandler for Context {
_ => Err("Unknown action type".into())
});
},
ActionParams { uuid } => {
d.reply::<Result<String, String>>(match self.actions.get(&uuid) {
Some(a) => {
a.get_params().map_err(|e| e.to_string())
},
_ => Err("No action found".into())
});
ActionInfo { uuid } => {
d.reply::<Result<::serde_json::Value, String>>(
do_with_ctx!(self, &uuid, |a: &mut Action, mut ctx: ActionContext| {
a.get_data(&mut ctx).map_err(|e| e.to_string())
})
);
},
UpdateActionParams { uuid, params } => {
d.reply::<Result<(), String>>(match self.actions.get_mut(&uuid) {
Some(a) => {
a.set_params(&params).map_err(|e| e.to_string())
},
_ => Err("No action found".into())
});
},
VerifyAction { uuid } => {
d.reply::<Result<Vec<ParameterError>, String>>({
match self.actions.get_mut(&uuid) {
Some(a) => {
let ctx = ActionContext {
engine: &mut self.engine,
media: &mut self.media,
remote: &mut self.remote
};
Ok(a.verify_params(ctx))
},
_ => Err("No action found".into())
}
let x = do_with_ctx!(self, &uuid, |a: &mut Action, mut ctx: ActionContext| {
let ret = a.set_params(&params).map_err(|e| e.to_string());
Self::on_action_changed(d, a, &mut ctx);
ret
});
d.reply::<Result<(), String>>(x);
},
LoadAction { uuid } => {
let x = {
match self.actions.get_mut(&uuid) {
Some(a) => {
let ctx = ActionContext {
engine: &mut self.engine,
media: &mut self.media,
remote: &mut self.remote
};
a.load(ctx, &d.internal_tx).map_err(|e| e.to_string())
},
_ => Err("No action found".into())
}
};
let x = do_with_ctx!(self, &uuid, |a: &mut Action, mut ctx: ActionContext| {
let ret = a.load(&mut ctx, &d.internal_tx).map_err(|e| e.to_string());
Self::on_action_changed(d, a, &mut ctx);
ret
});
d.reply::<Result<bool, String>>(x);
},
ExecuteAction { uuid } => {
let x = {
match self.actions.get_mut(&uuid) {
Some(a) => {
let ctx = ActionContext {
engine: &mut self.engine,
media: &mut self.media,
remote: &mut self.remote
};
a.execute(::sqa_engine::Sender::<()>::precise_time_ns(), ctx, &d.internal_tx).map_err(|e| e.to_string())
},
_ => Err("No action found".into())
}
};
let x = do_with_ctx!(self, &uuid, |a: &mut Action, mut ctx: ActionContext| {
let ret = a.execute(::sqa_engine::Sender::<()>::precise_time_ns(), &mut ctx, &d.internal_tx).map_err(|e| e.to_string());
Self::on_action_changed(d, a, &mut ctx);
ret
});
d.reply::<Result<bool, String>>(x);
},
DeleteAction { uuid } => {
Expand All @@ -159,4 +146,10 @@ impl Context {
}
ctx
}
pub fn on_action_changed(d: &mut CD, action: &mut Action, ctx: &mut ActionContext) {
d.broadcast::<Result<::serde_json::Value, String>>(
format!("/update/action/{}", action.uuid()),
action.get_data(ctx).map_err(|e| e.to_string())
);
}
}

0 comments on commit 02d52d9

Please sign in to comment.