Skip to content

Commit

Permalink
Merge 5188bfd into c5829b2
Browse files Browse the repository at this point in the history
  • Loading branch information
Asakiz committed Oct 16, 2020
2 parents c5829b2 + 5188bfd commit 800455d
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 1 deletion.
28 changes: 27 additions & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:

steps:
- name: Install Dependencies
run: sudo apt-get update; sudo apt-get install libarchive-dev
run: sudo apt-get update; sudo apt-get install libarchive-dev socat
- name: Checkout sources
uses: actions/checkout@v2
- name: Install ${{ matrix.version }}
Expand Down Expand Up @@ -78,6 +78,32 @@ jobs:
command: test
args: --release --all --all-features --no-fail-fast -- --nocapture

- name: Run listener test
run: |
export UH_LISTENER_TEST=updatehub-statechange.sock
cargo run --example listener &
while [ ! -S "$UH_LISTENER_TEST" ]; do
sleep 1
done
if [[ "$(echo "entry_point" | socat - UNIX-CONNECT:updatehub-statechange.sock)" != "" ]]; then
echo "Unexpected entry_point response"
exit 1
fi
if [[ "$(echo "download" | socat - UNIX-CONNECT:updatehub-statechange.sock)" != "cancel" ]]; then
echo "Unexpected download response"
exit 2
fi
if [[ "$(echo "error foo" | socat - UNIX-CONNECT:updatehub-statechange.sock)" != "" ]]; then
echo "Unexpected error response"
exit 3
fi
if [[ "$(echo "reboot" | socat - UNIX-CONNECT:updatehub-statechange.sock)" != "" ]]; then
echo "Unexpected reboot response"
exit 4
fi
- name: Run cargo bloat
if: matrix.version == '1.45.0' && 0
uses: orf/cargo-bloat-action@v1
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions updatehub-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ authors = ["asakiz <asakizin@gmail.com>"]
edition = "2018"

[dependencies]
async-std = { version = "1.6", default-features = false, features = ["std"] }
chrono = { version = "0.4", default-features = false, features = ["serde"] }
derive_more = { version = "0.99", default-features = false, features = ["display", "error", "from"] }
log = "0.4"
ms-converter = "1"
serde = { version = "1", default-features = false, features = ["derive"] }
surf = { version = "2", default-features = false, features = ["h1-client"] }
Expand All @@ -19,3 +21,7 @@ surf = { version = "2", default-features = false, features = ["h1-client"] }
async-std = { version = "1", default-features = false, features = ["attributes"] }
tempfile = "3"
testcontainers = "0.11"

[[example]]
name = "listener"
path = "examples/listener.rs"
45 changes: 45 additions & 0 deletions updatehub-sdk/examples/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (C) 2018, 2019, 2020 O.S. Systems Sofware LTDA
//
// SPDX-License-Identifier: Apache-2.0

use std::{future::Future, pin::Pin};
use updatehub_sdk::{listener, Result};

fn download_callback(
mut handler: listener::ListenerHandler,
) -> Pin<Box<dyn Future<Output = Result<()>>>> {
Box::pin(async move {
println!("CALLBACK: download_callback");
println!("Canceling the command...");
println!("Done");
handler.cancel().await
})
}

fn error_callback(handler: listener::ListenerHandler) -> Pin<Box<dyn Future<Output = Result<()>>>> {
Box::pin(async move {
println!("ERROR");
println!("Done");
handler.proceed().await
})
}

fn rebooting_callback(
handler: listener::ListenerHandler,
) -> Pin<Box<dyn Future<Output = Result<()>>>> {
Box::pin(async move {
println!("CALLBACK: rebooting_callback");
println!("Stopping listener...");
handler.stop().await
})
}

#[async_std::main]
async fn main() -> Result<()> {
let mut listener = listener::StateChangeListener::default();
listener.on_state_change(listener::State::Download, Box::new(download_callback)).await;
listener.on_state_change(listener::State::Reboot, Box::new(rebooting_callback)).await;
listener.on_error(Box::new(error_callback)).await;

listener.listen().await
}
4 changes: 4 additions & 0 deletions updatehub-sdk/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ pub enum Error {
UnexpectedResponse(#[error(not(source))] surf::StatusCode),

Client(#[error(not(source))] surf::Error),

Io(std::io::Error),

Env(std::env::VarError),
}
1 change: 1 addition & 0 deletions updatehub-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
pub mod api;
mod client;
mod error;
pub mod listener;
mod serde_helpers;

pub use client::Client;
Expand Down
134 changes: 134 additions & 0 deletions updatehub-sdk/src/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright (C) 2018, 2019, 2020 O.S. Systems Sofware LTDA
//
// SPDX-License-Identifier: Apache-2.0

use crate::{Error, Result};
use async_std::{
io::BufReader,
os::unix::net::{UnixListener, UnixStream},
prelude::*,
};
use log::warn;
use std::{env, fs, path::Path, pin::Pin, process};

const SDK_TRIGGER_FILENAME: &str =
"/usr/share/updatehub/state-change-callbacks.d/10-updatehub-sdk-statechange-trigger";
const SOCKET_PATH: &str = "/run/updatehub-statechange.sock";

type Callback =
Box<dyn Fn(ListenerHandler) -> Pin<Box<dyn std::future::Future<Output = Result<()>>>>>;

#[derive(Default)]
pub struct StateChangeListener {
download_state_callbacks: Vec<Callback>,
install_state_callbacks: Vec<Callback>,
reboot_state_callbacks: Vec<Callback>,
error_state_callbacks: Vec<Callback>,
}

pub enum State {
Download,
Install,
Reboot,
Error,
}

pub struct ListenerHandler {
stream: UnixStream,
}

impl ListenerHandler {
// Cancels the current action on the agent.
pub async fn cancel(&mut self) -> Result<()> {
self.stream.write_all(b"cancel").await.map_err(Error::Io)
}

// Tell the agent to proceed with the transition.
pub async fn proceed(&self) -> Result<()> {
// No message need to be sent to the connection in order to the
// agent to proceed handling the current state.
Ok(())
}

// Tell the listener to stop the execution.
pub async fn stop(&self) -> Result<()> {
process::exit(1);
}
}

impl StateChangeListener {
pub async fn on_state_change(&mut self, state: State, f: Callback) {
match state {
State::Download => self.download_state_callbacks.push(f),
State::Install => self.install_state_callbacks.push(f),
State::Reboot => self.reboot_state_callbacks.push(f),
State::Error => self.error_state_callbacks.push(f),
}
}

pub async fn on_error(&mut self, f: Callback) {
self.error_state_callbacks.push(f);
}

pub async fn listen(&self) -> Result<()> {
if !Path::new(&SDK_TRIGGER_FILENAME).exists() {
warn!(
"WARNING: updatehub-sdk-statechange-trigger not found on {}",
SDK_TRIGGER_FILENAME
);
}
let socket_path = env::var("UH_LISTENER_TEST").unwrap_or_else(|_| SOCKET_PATH.to_string());
if Path::new(&socket_path).exists() {
fs::remove_file(&socket_path)?;
}
let listener = UnixListener::bind(socket_path).await?;

loop {
let (socket, ..) = listener.accept().await?;
self.handle_connection(socket).await?;
}
}

async fn handle_connection(&self, stream: UnixStream) -> Result<()> {
let mut reader = BufReader::new(&stream);
let mut line = String::new();

reader.read_line(&mut line).await?;

self.emit(stream, &line.trim()).await
}

async fn emit(&self, stream: UnixStream, input: &str) -> Result<()> {
let input: Vec<&str> = input.split_whitespace().collect();
match input[0] {
"entry_point" => {}

"download" => {
for f in self.download_state_callbacks.iter() {
let stream = stream.clone();
f(ListenerHandler { stream }).await?
}
}
"install" => {
for f in self.install_state_callbacks.iter() {
let stream = stream.clone();
f(ListenerHandler { stream }).await?;
}
}
"reboot" => {
for f in self.reboot_state_callbacks.iter() {
let stream = stream.clone();
f(ListenerHandler { stream }).await?;
}
}
"error" => {
for f in self.error_state_callbacks.iter() {
let stream = stream.clone();
f(ListenerHandler { stream }).await?;
}
}
_ => unreachable!("the input is not valid"),
}
Ok(())
}
}

0 comments on commit 800455d

Please sign in to comment.