Skip to content

Commit

Permalink
feat: allow scraping exec calls for data
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Mar 22, 2024
1 parent 5d89c15 commit d680b2b
Show file tree
Hide file tree
Showing 14 changed files with 503 additions and 13 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -6,3 +6,4 @@ members = [

[patch.crates-io]
#homeassistant-agent = { path = "../homeassistant-agent" }
#homeassistant-agent = { git = "https://github.com/ctron/homeassistant-agent.git", rev = "35ab7ef0a968334301431f6950045be3dc0a6318" }
4 changes: 2 additions & 2 deletions agent/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "resymo-agent"
version = "0.1.0-alpha.5"
version = "0.1.0-alpha.6"
edition = "2021"
license = "Apache-2.0"

Expand All @@ -23,7 +23,7 @@ clap = { version = "4", features = ["derive", "env", "string"] }
env_logger = "0.11"
futures = "0.3"
gethostname = "0.4"
homeassistant-agent = { version = "=0.2.0-alpha.5", features = ["schemars"] }
homeassistant-agent = { version = "=0.2.0-alpha.6", features = ["schemars"] }
humantime = "2"
humantime-serde = "1"
log = "0.4"
Expand Down
2 changes: 2 additions & 0 deletions agent/src/collector/disk_free.rs
@@ -1,3 +1,5 @@
//! Disk-free collector

use async_trait::async_trait;
use homeassistant_agent::model::{Discovery, SensorClass, StateClass};
use serde_json::Value;
Expand Down
190 changes: 190 additions & 0 deletions agent/src/collector/exec.rs
@@ -0,0 +1,190 @@
use crate::config::CommonCollector;
use crate::utils::is_default;
use anyhow::{anyhow, bail};
use async_trait::async_trait;
use homeassistant_agent::model::Discovery;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::process::Command;
use tokio::sync::Mutex;

#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Configuration {
#[serde(flatten)]
pub common: CommonCollector,

/// execution tasks
#[serde(default)]
pub items: HashMap<String, Task>,
}

impl Deref for Configuration {
type Target = CommonCollector;

fn deref(&self) -> &Self::Target {
&self.common
}
}

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Task {
#[serde(with = "humantime_serde", default = "default::period")]
#[schemars(schema_with = "crate::utils::humantime_duration")]
pub period: Duration,

/// The binary to call
pub command: String,

/// The arguments
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub args: Vec<String>,

/// The environment variables
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub envs: HashMap<String, String>,

#[serde(default, skip_serializing_if = "is_default")]
pub clean_env: bool,

/// The Home Assistant discovery section
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub discovery: Vec<Discovery>,
}

mod default {
use super::*;

pub const fn period() -> Duration {
Duration::from_secs(60)
}
}

#[derive(Clone, Debug)]
pub struct Error(String);

impl From<Error> for anyhow::Error {
fn from(value: Error) -> Self {
anyhow!("{}", value.0)
}
}

#[derive(Debug)]
struct Inner {
config: Task,
last_run: Option<Instant>,
state: Result<Value, Error>,
}

impl Inner {
fn new(config: Task) -> Self {
Self {
config,
last_run: None,
state: Err(Error("Not yet initialized".into())),
}
}

async fn run_once(&mut self) -> anyhow::Result<Value> {
let mut cmd = Command::new(&self.config.command);

if self.config.clean_env {
cmd.env_clear();
}

cmd.kill_on_drop(true)
.args(self.config.args.clone())
.envs(self.config.envs.clone().into_iter());

let output = cmd.output().await?;
if !output.status.success() {
bail!("Command failed: rc == {}", output.status);
}

self.mark_run();

let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
let status = output.status.code();

Ok(json!({
"stdout": stdout,
"stderr": stderr,
"status": status,
}))
}

async fn run(&mut self) -> anyhow::Result<Value> {
if self.need_run() {
self.state = self.run_once().await.map_err(|err| Error(err.to_string()));
}

Ok(self.state.clone()?)
}

fn need_run(&self) -> bool {
match self.last_run {
Some(last_run) => Instant::now() - last_run > self.config.period,
None => true,
}
}

fn mark_run(&mut self) {
// TODO: we should do better and provide a constant delay
self.last_run = Some(Instant::now());
}
}

#[derive(Debug)]
pub struct Collector {
inner: Arc<Mutex<Inner>>,
descriptor: Vec<Discovery>,
}

impl Collector {
pub fn new(config: Configuration) -> HashMap<String, Self> {
config
.items
.into_iter()
.map(|(name, mut task)| {
let mut discovery = task.discovery.drain(..).collect::<Vec<_>>();

let mut auto = 0;
for discovery in &mut discovery {
if discovery.unique_id.is_none() {
let name = if auto == 0 {
format!("{name}")
} else {
// start suffixing items with a counter, better provide a unique_id
format!("{name}_{auto}")
};
discovery.unique_id = Some(name);
auto += 1;
}
}

let collector = Self {
descriptor: discovery.into_iter().collect(),
inner: Arc::new(Mutex::new(Inner::new(task))),
};

(name, collector)
})
.collect()
}
}

#[async_trait]
impl super::Collector for Collector {
async fn collect(&self) -> anyhow::Result<Value> {
self.inner.lock().await.run().await
}

fn describe_ha(&self) -> Vec<Discovery> {
self.descriptor.clone()
}
}
2 changes: 2 additions & 0 deletions agent/src/collector/load_avg.rs
@@ -1,3 +1,5 @@
//! Load average collector

use async_trait::async_trait;
use homeassistant_agent::model::{Discovery, StateClass};
use serde_json::Value;
Expand Down
2 changes: 2 additions & 0 deletions agent/src/collector/memory.rs
@@ -1,3 +1,5 @@
//! Memory collector

use async_trait::async_trait;
use homeassistant_agent::model::{Discovery, SensorClass, StateClass};
use serde_json::Value;
Expand Down
1 change: 1 addition & 0 deletions agent/src/collector/mod.rs
@@ -1,4 +1,5 @@
pub mod disk_free;
pub mod exec;
pub mod load_avg;
pub mod memory;
pub mod swap;
Expand Down
2 changes: 2 additions & 0 deletions agent/src/collector/swap.rs
@@ -1,3 +1,5 @@
//! Swap space collector

use async_trait::async_trait;
use homeassistant_agent::model::{Discovery, SensorClass, StateClass};
use serde_json::Value;
Expand Down
5 changes: 5 additions & 0 deletions agent/src/config.rs
Expand Up @@ -8,6 +8,7 @@
//! run --package resymo-agent --example gen_schema
//! ```

use crate::collector::exec;
use crate::{uplink, utils::is_default};

/// Agent configuration
Expand Down Expand Up @@ -58,4 +59,8 @@ pub struct Collectors {
/// Disk
#[serde(default)]
pub disk_free: CommonCollector,

/// Exec
#[serde(default)]
pub exec: exec::Configuration,
}
14 changes: 11 additions & 3 deletions agent/src/lib.rs
@@ -1,12 +1,15 @@
use crate::collector::{disk_free, load_avg, memory, swap, Manager};
use crate::config::Collectors;

pub mod collector;
pub mod common;
pub mod config;
pub mod uplink;

mod utils;

use crate::{
collector::{disk_free, exec, load_avg, memory, swap, Manager},
config::Collectors,
};

pub fn create_from(config: Collectors) -> anyhow::Result<Manager> {
let mut manager = Manager::new();

Expand All @@ -22,6 +25,11 @@ pub fn create_from(config: Collectors) -> anyhow::Result<Manager> {
if !config.load_avg.disabled {
manager = manager.register("load_avg", load_avg::Collector);
}
if !config.exec.disabled {
for (name, exec) in exec::Collector::new(config.exec) {
manager = manager.register(name, exec);
}
}

Ok(manager)
}
6 changes: 1 addition & 5 deletions agent/src/main.rs
@@ -1,10 +1,6 @@
use anyhow::Context;
use clap::Parser;
use resymo_agent::{
collector::{disk_free, load_avg, memory, swap, Manager},
config::Config,
create_from, uplink,
};
use resymo_agent::{config::Config, create_from, uplink};
use std::{future::Future, path::PathBuf, pin::Pin, process::ExitCode, sync::Arc};
use tokio::signal;

Expand Down
21 changes: 21 additions & 0 deletions agent/src/utils.rs
@@ -1,3 +1,24 @@
pub(crate) fn is_default<T: Default + PartialEq>(value: &T) -> bool {
value == &Default::default()
}

pub(crate) fn humantime_duration(
gen: &mut schemars::gen::SchemaGenerator,
) -> schemars::schema::Schema {
use schemars::schema::*;
use schemars::JsonSchema;
use serde_json::json;

let mut schema: SchemaObject = <String>::json_schema(gen).into();
schema.metadata = Some(Box::new(Metadata {
id: None,
title: None,
description: Some(r#"A duration in the humantime format. For example: '30s' for 30 seconds. '5m' for 5 minutes."#.to_string()),
default: None,
deprecated: false,
read_only: false,
write_only: false,
examples: vec![json!("30s"), json!("1m")],
}));
schema.into()
}

0 comments on commit d680b2b

Please sign in to comment.