Skip to content

Commit

Permalink
updatehub-sdk: Add agent listener
Browse files Browse the repository at this point in the history
Signed-off-by: asakiz <asakizin@gmail.com>
  • Loading branch information
Asakiz authored and otavio committed Oct 19, 2020
1 parent 1f15fb6 commit b2baec2
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 1 deletion.
28 changes: 27 additions & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:

steps:
- name: Install Dependencies
run: sudo apt-get update; sudo apt-get install libarchive-dev
run: sudo apt-get update; sudo apt-get install libarchive-dev socat
- name: Checkout sources
uses: actions/checkout@v2
- name: Install ${{ matrix.version }}
Expand Down Expand Up @@ -78,6 +78,32 @@ jobs:
command: test
args: --release --all --all-features --no-fail-fast -- --nocapture

- name: Run listener test
run: |
export UH_LISTENER_TEST=updatehub-statechange.sock
cargo run --example listener &
while [ ! -S "$UH_LISTENER_TEST" ]; do
sleep 1
done
if [[ "$(echo "download" | socat - UNIX-CONNECT:updatehub-statechange.sock)" != "cancel" ]]; then
echo "Unexpected download response"
exit 1
fi
if [[ "$(echo "install" | socat - UNIX-CONNECT:updatehub-statechange.sock)" != "" ]]; then
echo "Unexpected install response"
exit 2
fi
if [[ "$(echo "error" | socat - UNIX-CONNECT:updatehub-statechange.sock)" != "" ]]; then
echo "Unexpected error response"
exit 3
fi
if [[ "$(echo "reboot" | socat - UNIX-CONNECT:updatehub-statechange.sock)" != "" ]]; then
echo "Unexpected reboot response"
exit 4
fi
- name: Run cargo bloat
if: matrix.version == '1.45.0' && 0
uses: orf/cargo-bloat-action@v1
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions updatehub-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ authors = ["asakiz <asakizin@gmail.com>"]
edition = "2018"

[dependencies]
async-std = { version = "1.6", default-features = false, features = ["std"] }
chrono = { version = "0.4", default-features = false, features = ["serde"] }
derive_more = { version = "0.99", default-features = false, features = ["display", "error", "from"] }
log = "0.4"
ms-converter = "1"
serde = { version = "1", default-features = false, features = ["derive"] }
surf = { version = "2", default-features = false, features = ["h1-client"] }
Expand All @@ -19,3 +21,7 @@ surf = { version = "2", default-features = false, features = ["h1-client"] }
async-std = { version = "1", default-features = false, features = ["attributes"] }
tempfile = "3"
testcontainers = "0.11"

[[example]]
name = "listener"
path = "examples/listener.rs"
26 changes: 26 additions & 0 deletions updatehub-sdk/examples/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (C) 2018, 2019, 2020 O.S. Systems Sofware LTDA
//
// SPDX-License-Identifier: Apache-2.0

use updatehub_sdk::{listener, Result};

async fn download_callback(mut handler: listener::Handler) -> Result<()> {
println!("function called when starting the Download state; it will cancel the transition");
handler.cancel().await
}

#[async_std::main]
async fn main() -> Result<()> {
let mut listener = listener::StateChangeListener::default();

// A function callback which cancels the state transition
listener.on_state(listener::State::Download, download_callback);

// A closure callback which prints
listener.on_state(listener::State::Install, |handler| async move {
println!("closure called when starting the Install state");
handler.proceed().await
});

listener.listen().await
}
4 changes: 4 additions & 0 deletions updatehub-sdk/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ pub enum Error {
UnexpectedResponse(#[error(not(source))] surf::StatusCode),

Client(#[error(not(source))] surf::Error),

Io(std::io::Error),

Env(std::env::VarError),
}
1 change: 1 addition & 0 deletions updatehub-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
pub mod api;
mod client;
mod error;
pub mod listener;
mod serde_helpers;

pub use client::Client;
Expand Down
122 changes: 122 additions & 0 deletions updatehub-sdk/src/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright (C) 2018, 2019, 2020 O.S. Systems Sofware LTDA
//
// SPDX-License-Identifier: Apache-2.0

use crate::{Error, Result};
use async_std::{
io::BufReader,
os::unix::net::{UnixListener, UnixStream},
prelude::*,
};
use log::warn;
use std::{env, fs, path::Path, pin::Pin};

const SDK_TRIGGER_FILENAME: &str =
"/usr/share/updatehub/state-change-callbacks.d/10-updatehub-sdk-statechange-trigger";
const SOCKET_PATH: &str = "/run/updatehub-statechange.sock";

type CallbackFn = dyn Fn(Handler) -> Pin<Box<dyn Future<Output = Result<()>>>>;

#[derive(Default)]
pub struct StateChangeListener {
download_callbacks: Vec<Box<CallbackFn>>,
install_callbacks: Vec<Box<CallbackFn>>,
reboot_callbacks: Vec<Box<CallbackFn>>,
error_callbacks: Vec<Box<CallbackFn>>,
}

#[derive(Debug)]
pub enum State {
Download,
Install,
Reboot,
Error,
}

pub struct Handler {
stream: UnixStream,
}

impl Handler {
// Cancels the current action on the agent.
pub async fn cancel(&mut self) -> Result<()> {
self.stream.write_all(b"cancel").await.map_err(Error::Io)
}

// Tell the agent to proceed with the transition.
pub async fn proceed(&self) -> Result<()> {
// No message need to be sent to the connection in order to the
// agent to proceed handling the current state.
Ok(())
}
}

impl StateChangeListener {
#[inline]
pub fn new() -> Self {
StateChangeListener {
download_callbacks: Vec::new(),
install_callbacks: Vec::new(),
reboot_callbacks: Vec::new(),
error_callbacks: Vec::new(),
}
}

pub fn on_state<F, Fut>(&mut self, state: State, f: F)
where
F: Fn(Handler) -> Fut + 'static,
Fut: Future<Output = Result<()>> + 'static,
{
match state {
State::Download => self.download_callbacks.push(Box::new(move |d| Box::pin(f(d)))),
State::Install => self.install_callbacks.push(Box::new(move |d| Box::pin(f(d)))),
State::Reboot => self.reboot_callbacks.push(Box::new(move |d| Box::pin(f(d)))),
State::Error => self.error_callbacks.push(Box::new(move |d| Box::pin(f(d)))),
}
}

pub async fn listen(&self) -> Result<()> {
let sdk_trigger = Path::new(SDK_TRIGGER_FILENAME);
if !sdk_trigger.exists() {
warn!("WARNING: updatehub-sdk-statechange-trigger not found on {:?}", sdk_trigger);
}

let socket_path = env::var("UH_LISTENER_TEST").unwrap_or_else(|_| SOCKET_PATH.to_string());
let socket_path = Path::new(&socket_path);
if socket_path.exists() {
fs::remove_file(&socket_path)?;
}

let listener = UnixListener::bind(socket_path).await?;
loop {
let (socket, ..) = listener.accept().await?;
self.handle_connection(socket).await?;
}
}

async fn handle_connection(&self, stream: UnixStream) -> Result<()> {
let mut reader = BufReader::new(&stream);
let mut line = String::new();

reader.read_line(&mut line).await?;

self.emit(stream, &line.trim()).await
}

async fn emit(&self, stream: UnixStream, input: &str) -> Result<()> {
let callbacks = match input {
"download" => &self.download_callbacks,
"install" => &self.install_callbacks,
"reboot" => &self.reboot_callbacks,
"error" => &self.error_callbacks,
_ => unreachable!("the input is not valid"),
};

for f in callbacks {
let stream = stream.clone();
f(Handler { stream }).await?;
}

Ok(())
}
}

0 comments on commit b2baec2

Please sign in to comment.