Skip to content

Commit

Permalink
WIP: Add agent listener
Browse files Browse the repository at this point in the history
Signed-off-by: asakiz <asakizin@gmail.com>
  • Loading branch information
Asakiz committed Oct 7, 2020
1 parent e248fed commit dc2a19b
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 0 deletions.
7 changes: 7 additions & 0 deletions updatehub-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ authors = ["asakiz <asakizin@gmail.com>"]
edition = "2018"

[dependencies]
async-ctrlc = "1.2"
async-std = "1.6"
chrono = { version = "0.4", default-features = false, features = ["serde"] }
derive_more = { version = "0.99", default-features = false, features = ["display", "error", "from"] }
log = "0.4"
ms-converter = "1"
serde = { version = "1", default-features = false, features = ["derive"] }
surf = { version = "2", default-features = false, features = ["h1-client"] }
Expand All @@ -19,3 +22,7 @@ surf = { version = "2", default-features = false, features = ["h1-client"] }
async-std = { version = "1", default-features = false, features = ["attributes"] }
tempfile = "3"
testcontainers = "0.11"

[[example]]
name = "listener"
path = "examples/listener.rs"
21 changes: 21 additions & 0 deletions updatehub-sdk/examples/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (C) 2018, 2019, 2020 O.S. Systems Sofware LTDA
//
// SPDX-License-Identifier: Apache-2.0

use updatehub_sdk::{listener, Result};

#[async_std::main]
async fn main() -> Result<()> {
let mut listener = listener::StateChangeListener::default();
listener.on(listener::StateID::Download, Box::new(|_| {}));
listener.on(listener::StateID::Error, Box::new(|_| {}));
listener.on(listener::StateID::Reboot, Box::new(|_| {}));
listener.on_error(Box::new(|_| {}));

let res = listener.listen().await;

match res {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
4 changes: 4 additions & 0 deletions updatehub-sdk/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ pub enum Error {
UnexpectedResponse(#[error(not(source))] surf::StatusCode),

Client(#[error(not(source))] surf::Error),

IO(#[error(not(source))] std::io::Error),

Ctrlc(#[error(source)] async_ctrlc::Error),
}
1 change: 1 addition & 0 deletions updatehub-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
pub mod api;
mod client;
mod error;
pub mod listener;
mod serde_helpers;

pub use client::Client;
Expand Down
140 changes: 140 additions & 0 deletions updatehub-sdk/src/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (C) 2018, 2019, 2020 O.S. Systems Sofware LTDA
//
// SPDX-License-Identifier: Apache-2.0

use crate::{Error, Result};
use async_ctrlc::CtrlC;
use async_std::{
io::BufReader,
os::unix::net::{UnixListener, UnixStream},
prelude::*,
};
use log::warn;
use std::{collections::HashMap, process};

const SDK_TRIGGER_FILENAME: &str =
"/usr/share/updatehub/state-change-callbacks.d/10-updatehub-sdk-statechange-trigger";
const SOCKET_PATH: &str = "/run/updatehub-statechange.sock";

type StateChangeCallback = Box<dyn Fn(State)>;
type ErrorCallback = Box<dyn Fn(String)>;

#[derive(Default)]
pub struct StateChangeListener {
listeners: HashMap<StateID, Vec<StateChangeCallback>>,
error_handlers: Vec<ErrorCallback>,
}

#[derive(PartialEq, Eq, Hash)]
pub enum StateID {
Download,
Install,
Reboot,
Error,
}

pub struct State {
pub id: StateID,
pub conn: UnixStream,
}

impl State {
pub fn cancel(&mut self) {
self.conn.write_all(b"cancel");
}

pub fn proceed() {}
}

impl StateChangeListener {
pub fn new() -> Self {
StateChangeListener { listeners: HashMap::new(), error_handlers: Vec::new() }
}

pub fn on(&mut self, state: StateID, f: StateChangeCallback) {
match self.listeners.get_mut(&state) {
Some(callbacks) => {
callbacks.push(Box::new(f));
}
None => {
self.listeners.insert(state, vec![Box::new(f)]);
}
}
}

pub async fn listen(&self) -> Result<()> {
let file = std::path::Path::new(&SDK_TRIGGER_FILENAME);
if !file.exists() {
warn!(
"WARNING: updatehub-sdk-statechange-trigger not found on {}",
SDK_TRIGGER_FILENAME
);
}
std::fs::remove_file(SOCKET_PATH)?;
let listener = UnixListener::bind(SOCKET_PATH).await?;

let ctrlc = CtrlC::new()?;

let ctrlc_future = async {
ctrlc.await;
process::exit(1);
};

let socket_listener_future = async {
loop {
match listener.accept().await {
Ok((socket, ..)) => self.handle_connection(socket).await?,
Err(e) => return Err(Error::from(e)),
}
}
};

socket_listener_future.race(ctrlc_future).await?;

Ok(())
}

async fn handle_connection(&self, stream: UnixStream) -> Result<()> {
let mut reader = BufReader::new(stream.clone());
let mut line = String::new();

loop {
let s = stream.clone();
let bytes = reader.read_line(&mut line).await?;

if bytes.to_string().contains("error") {
self.throw_error(&bytes.to_string());
};
self.emit(s, &bytes.to_string());
}
}

pub fn on_error(&mut self, f: ErrorCallback) {
self.error_handlers.push(f);
}

fn emit(&self, conn: UnixStream, state: &str) {
self.listeners.iter().for_each(|(_, callbacks)| {
callbacks.iter().for_each(|f| {
let conn = conn.clone();
f(State {
id: match state {
"download" => StateID::Download,
"install" => StateID::Install,
"reboot" => StateID::Reboot,
"error" => StateID::Error,
_ => panic!("error"),
},
conn,
})
})
});
}

fn throw_error(&self, error: &str) {
self.error_handlers.iter().for_each(|f| {
let error = &error;
f(error.to_string())
})
}
}

0 comments on commit dc2a19b

Please sign in to comment.