Skip to content

Commit

Permalink
Merge eec8faa into ec63b0e
Browse files Browse the repository at this point in the history
  • Loading branch information
otavio committed Oct 19, 2020
2 parents ec63b0e + eec8faa commit 62f65ed
Showing 1 changed file with 32 additions and 26 deletions.
58 changes: 32 additions & 26 deletions updatehub-sdk/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use async_std::{
prelude::*,
};
use log::warn;
use std::{env, fs, path::Path, pin::Pin};
use std::{collections::HashMap, env, fs, io, path::Path, pin::Pin, result, str::FromStr};

const SDK_TRIGGER_FILENAME: &str =
"/usr/share/updatehub/state-change-callbacks.d/10-updatehub-sdk-statechange-trigger";
Expand All @@ -19,22 +19,37 @@ type CallbackFn = dyn Fn(Handler) -> Pin<Box<dyn Future<Output = Result<()>>>>;

#[derive(Default)]
pub struct StateChangeListener {
probe_callbacks: Vec<Box<CallbackFn>>,
download_callbacks: Vec<Box<CallbackFn>>,
install_callbacks: Vec<Box<CallbackFn>>,
reboot_callbacks: Vec<Box<CallbackFn>>,
error_callbacks: Vec<Box<CallbackFn>>,
callbacks: HashMap<u8, Vec<Box<CallbackFn>>>,
}

#[derive(Debug)]
#[repr(u8)]
pub enum State {
Probe,
Probe = 0,
Download,
Install,
Reboot,
Error,
}

impl FromStr for State {
type Err = io::Error;

fn from_str(s: &str) -> result::Result<Self, Self::Err> {
match s {
"probe" => Ok(State::Probe),
"download" => Ok(State::Download),
"install" => Ok(State::Install),
"reboot" => Ok(State::Reboot),
"error" => Ok(State::Error),
_ => Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("the '{}' is not a valid state", s),
)),
}
}
}

pub struct Handler {
stream: UnixStream,
}
Expand Down Expand Up @@ -64,13 +79,10 @@ impl StateChangeListener {
F: Fn(Handler) -> Fut + 'static,
Fut: Future<Output = Result<()>> + 'static,
{
match state {
State::Probe => self.probe_callbacks.push(Box::new(move |d| Box::pin(f(d)))),
State::Download => self.download_callbacks.push(Box::new(move |d| Box::pin(f(d)))),
State::Install => self.install_callbacks.push(Box::new(move |d| Box::pin(f(d)))),
State::Reboot => self.reboot_callbacks.push(Box::new(move |d| Box::pin(f(d)))),
State::Error => self.error_callbacks.push(Box::new(move |d| Box::pin(f(d)))),
}
self.callbacks
.entry(state as u8)
.or_insert(Vec::new())
.push(Box::new(move |d| Box::pin(f(d))))
}

pub async fn listen(&self) -> Result<()> {
Expand Down Expand Up @@ -102,18 +114,12 @@ impl StateChangeListener {
}

async fn emit(&self, stream: UnixStream, input: &str) -> Result<()> {
let callbacks = match input {
"probe" => &self.probe_callbacks,
"download" => &self.download_callbacks,
"install" => &self.install_callbacks,
"reboot" => &self.reboot_callbacks,
"error" => &self.error_callbacks,
_ => unreachable!("the input is not valid"),
};

for f in callbacks {
let stream = stream.clone();
f(Handler { stream }).await?;
let state = State::from_str(input)?;
if let Some(callbacks) = self.callbacks.get(&(state as u8)) {
for f in callbacks {
let stream = stream.clone();
f(Handler { stream }).await?;
}
}

Ok(())
Expand Down

0 comments on commit 62f65ed

Please sign in to comment.