Skip to content

Commit

Permalink
feat: allow triggering commands
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Mar 22, 2024
1 parent 4a947ba commit 5b38374
Show file tree
Hide file tree
Showing 12 changed files with 454 additions and 89 deletions.
4 changes: 2 additions & 2 deletions agent/Cargo.toml
Expand Up @@ -12,11 +12,11 @@ keywords = ["monitoring", "network", "server"]
readme = "../README.md"

[dependencies]
actix-http = "3"
actix-service = "2"
actix-web = "4"
actix-web-extras = "0.1"
actix-web-httpauth = "0.8"
actix-http = "3"
actix-service = "2"
anyhow = "1"
async-trait = "0.1"
clap = { version = "4", features = ["derive", "env", "string"] }
Expand Down
4 changes: 2 additions & 2 deletions agent/src/collector/exec.rs
Expand Up @@ -98,7 +98,7 @@ impl Inner {

cmd.kill_on_drop(true)
.args(self.config.args.clone())
.envs(self.config.envs.clone().into_iter());
.envs(self.config.envs.clone());

let output = cmd.output().await?;
if !output.status.success() {
Expand Down Expand Up @@ -157,7 +157,7 @@ impl Collector {
for discovery in &mut discovery {
if discovery.unique_id.is_none() {
let name = if auto == 0 {
format!("{name}")
name.to_string()
} else {
// start suffixing items with a counter, better provide a unique_id
format!("{name}_{auto}")
Expand Down
49 changes: 0 additions & 49 deletions agent/src/collector/mod.rs
Expand Up @@ -8,7 +8,6 @@ use actix_web::{body::BoxBody, HttpResponse, ResponseError};
use async_trait::async_trait;
use homeassistant_agent::model::Discovery;
use serde_json::json;
use std::collections::{BTreeMap, HashMap};

#[derive(Clone, Debug)]
pub struct ValueDescriptor {
Expand Down Expand Up @@ -43,51 +42,3 @@ impl ResponseError for Error {
}
}
}

#[derive(Default)]
pub struct Manager {
pub collectors: HashMap<String, Box<dyn Collector>>,
}

impl Manager {
pub fn new() -> Self {
Self {
collectors: Default::default(),
}
}

pub fn register<N: Into<String>, C: Collector + 'static>(
mut self,
name: N,
collector: C,
) -> Self {
self.collectors.insert(name.into(), Box::new(collector));
self
}

pub async fn collect_one(&self, name: &str) -> Result<Option<serde_json::Value>, Error> {
Ok(match self.collectors.get(name) {
Some(collector) => Some(
collector
.collect()
.await
.map_err(|err| Error::Collector(err.to_string()))?,
),
None => None,
})
}

pub async fn collect_all(&self) -> Result<BTreeMap<String, serde_json::Value>, Error> {
let mut result = BTreeMap::new();
for (name, collector) in &self.collectors {
result.insert(
name.to_string(),
collector
.collect()
.await
.map_err(|err| Error::Collector(err.to_string()))?,
);
}
Ok(result)
}
}
117 changes: 117 additions & 0 deletions agent/src/command/exec.rs
@@ -0,0 +1,117 @@
use crate::config::CommonCommand;
use crate::utils::is_default;
use async_trait::async_trait;
use homeassistant_agent::model::Discovery;
use std::borrow::Cow;
use std::collections::HashMap;
use std::ops::Deref;

#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Configuration {
#[serde(flatten)]
pub common: CommonCommand,

/// execution tasks
#[serde(default)]
pub items: HashMap<String, Run>,
}

impl Deref for Configuration {
type Target = CommonCommand;

fn deref(&self) -> &Self::Target {
&self.common
}
}

#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Run {
/// The binary to call
pub command: String,

/// The arguments
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub args: Vec<String>,

/// The environment variables
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub envs: HashMap<String, String>,

#[serde(default, skip_serializing_if = "is_default")]
pub clean_env: bool,

/// The Home Assistant discovery section
#[serde(default, skip_serializing_if = "Option::is_none")]
pub discovery: Option<Discovery>,
}

pub struct Command {
config: Run,
discovery: Option<Discovery>,
}

impl Command {
pub fn new(config: Configuration) -> HashMap<String, Command> {
config
.items
.into_iter()
.map(|(name, config)| {
let command = Self::new_run(&name, config);
(name, command)
})
.collect()
}

fn new_run(name: &str, config: Run) -> Self {
let discovery = if let Some(mut discovery) = config.discovery.clone() {
if discovery.unique_id.is_none() {
discovery.unique_id = Some(name.into());
}

if discovery.value_template.is_none() {
// default to stdout
discovery.value_template = Some("{{ value_json.stdout }}".into());
}
Some(discovery)
} else {
None
};

Self { config, discovery }
}
}

#[async_trait(?Send)]
impl super::Command for Command {
async fn start(&self, payload: Cow<'_, str>, callback: Box<dyn Fn(Result<(), ()>) + Send>) {
log::info!("running command: {payload}");

let mut cmd = tokio::process::Command::new(&self.config.command);

if self.config.clean_env {
cmd.env_clear();
}

cmd.args(self.config.args.clone())
.envs(self.config.envs.clone());

tokio::spawn(async move {
let result = match cmd.output().await {
Ok(output) if output.status.success() => Ok(()),
Ok(_) => Err(()),
Err(err) => {
log::warn!("Failed to launch command: {err}");
Err(())
}
};

(callback)(result);
});
}

fn describe_ha(&self) -> Option<Discovery> {
self.discovery.clone()
}
}
14 changes: 14 additions & 0 deletions agent/src/command/mod.rs
@@ -0,0 +1,14 @@
pub mod exec;

use async_trait::async_trait;
use homeassistant_agent::model::Discovery;
use std::borrow::Cow;

#[async_trait(?Send)]
pub trait Command: Send + Sync {
async fn start(&self, payload: Cow<'_, str>, callback: Box<dyn Fn(Result<(), ()>) + Send>);

fn describe_ha(&self) -> Option<Discovery> {
None
}
}
24 changes: 22 additions & 2 deletions agent/src/config.rs
Expand Up @@ -8,7 +8,7 @@
//! run --package resymo-agent --example gen_schema
//! ```

use crate::collector::exec;
use crate::{collector, command};
use crate::{uplink, utils::is_default};

/// Agent configuration
Expand All @@ -19,6 +19,9 @@ pub struct Config {

#[serde(default)]
pub collectors: Collectors,

#[serde(default)]
pub commands: Commands,
}

/// Uplink configuration
Expand Down Expand Up @@ -62,5 +65,22 @@ pub struct Collectors {

/// Exec
#[serde(default)]
pub exec: exec::Configuration,
pub exec: collector::exec::Configuration,
}

/// Common collector settings
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct CommonCommand {
#[serde(default, skip_serializing_if = "is_default")]
pub disabled: bool,
}

/// Collector configurations
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Commands {
/// Exec
#[serde(default)]
pub exec: command::exec::Configuration,
}
31 changes: 2 additions & 29 deletions agent/src/lib.rs
@@ -1,35 +1,8 @@
pub mod collector;
pub mod command;
pub mod common;
pub mod config;
pub mod manager;
pub mod uplink;

mod utils;

use crate::{
collector::{disk_free, exec, load_avg, memory, swap, Manager},
config::Collectors,
};

pub fn create_from(config: Collectors) -> anyhow::Result<Manager> {
let mut manager = Manager::new();

if !config.memory.disabled {
manager = manager.register("memory", memory::Collector);
}
if !config.swap.disabled {
manager = manager.register("swap", swap::Collector);
}
if !config.disk_free.disabled {
manager = manager.register("disk_free", disk_free::Collector);
}
if !config.load_avg.disabled {
manager = manager.register("load_avg", load_avg::Collector);
}
if !config.exec.disabled {
for (name, exec) in exec::Collector::new(config.exec) {
manager = manager.register(name, exec);
}
}

Ok(manager)
}
5 changes: 3 additions & 2 deletions agent/src/main.rs
@@ -1,9 +1,10 @@
use anyhow::Context;
use clap::Parser;
use resymo_agent::{config::Config, create_from, uplink};
use resymo_agent::{config::Config, uplink};
use std::{future::Future, path::PathBuf, pin::Pin, process::ExitCode, sync::Arc};
use tokio::signal;

use resymo_agent::manager::Manager;
#[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind};

Expand Down Expand Up @@ -65,7 +66,7 @@ async fn main() -> anyhow::Result<ExitCode> {
.with_context(|| format!("Reading configuration file: '{}'", cli.config.display()))?,
)?;

let manager = Arc::new(create_from(config.collectors)?);
let manager = Arc::new(Manager::try_from((config.collectors, config.commands))?);

log::info!("Starting agent");

Expand Down

0 comments on commit 5b38374

Please sign in to comment.