Skip to content

Commit

Permalink
feature(backend): parallelize watchpoints
Browse files Browse the repository at this point in the history
Watchpoints are now checked in parallel. Cleanup code and move stuff around. Redis connection is now configured through the config values.
  • Loading branch information
Data5tream committed Apr 6, 2023
1 parent 1e494d6 commit 751d042
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 101 deletions.
5 changes: 4 additions & 1 deletion config.toml.example
Expand Up @@ -2,8 +2,11 @@
host = "127.0.0.1"
port = 8080

[redis]
url = "redis://127.0.0.1:6379/0"

[watcher]
interval = 30
watchlist = [
{ name = "Server 1", ip = "127.0.0.1", url = "http://localhost" }
{ id = "serv", name = "Server 1", ip = "127.0.0.1", url = "http://localhost:8080/" }
]
72 changes: 70 additions & 2 deletions src/cache.rs
@@ -1,8 +1,76 @@
use redis::Connection;
use redis::{Commands, Connection};

use crate::get_config;
use crate::watcher::{Watchpoint, WatchpointStatus};

/// Get a redis connection using the URL from the config file
pub fn get_redis_connection() -> Connection {
redis::Client::open("redis://127.0.01/")
let settings = get_config();
let redis_url = settings
.get_string("redis.url")
.expect("unable to get redis string!");

redis::Client::open(redis_url)
.expect("unable create redis client")
.get_connection()
.expect("unable to get redis connection")
}

/// Load the app configuration into memory (redis)
pub fn load_config() -> bool {
let settings = get_config();

let raw_watchlist = settings
.get_array("watcher.watchlist")
.expect("Invalid watch list");

// Make sure we have something to watch
if raw_watchlist.is_empty() {
println!("No entries in watchlist! Nothing to do");
return false;
}

let mut watchlist: Vec<Watchpoint> = Vec::new();
for i in &raw_watchlist {
watchlist.push(i.clone().try_deserialize().expect("invalid config value"));
}

let mut con = get_redis_connection();
let _: () = con
.set(
"config:watchpoints",
serde_json::to_string(&watchlist).unwrap(),
)
.unwrap();

true
}

/// Get a vector with all registered watchpoints
pub fn get_watchpoints() -> Result<Vec<Watchpoint>, ()> {
let mut con = get_redis_connection();
match con.get::<&str, String>("config:watchpoints") {
Ok(k) => Ok(serde_json::from_str(k.as_str()).unwrap()),
Err(_e) => Err(()),
}
}

/// Get a vector with all registered watchpoints and their status
pub fn get_watchpoint_status() -> Result<Vec<WatchpointStatus>, ()> {
let mut con = get_redis_connection();
let watchpoints = match get_watchpoints() {
Ok(d) => d,
Err(()) => return Err(()),
};

let mut data: Vec<WatchpointStatus> = Vec::new();
for watchpoint in watchpoints {
let status = con
.get::<&str, u16>(format!("status:{}:status-code", watchpoint.id).as_str())
.unwrap();

data.push(WatchpointStatus { watchpoint, status });
}

Ok(data)
}
11 changes: 11 additions & 0 deletions src/endpoints.rs
@@ -0,0 +1,11 @@
use actix_web::{get, HttpResponse, Responder};

use crate::cache::get_watchpoint_status;

#[get("/status")]
pub async fn status() -> impl Responder {
match get_watchpoint_status() {
Ok(d) => HttpResponse::Ok().json(d),
Err(()) => HttpResponse::InternalServerError().finish(),
}
}
12 changes: 12 additions & 0 deletions src/lib.rs
@@ -0,0 +1,12 @@
use config::Config;

/// Get config from config file and environmental variables
pub fn get_config() -> Config {
Config::builder()
.add_source(config::File::with_name("config"))
// Add in settings from the environment (with a prefix of APP)
// Eg.. `APP_DEBUG=1 ./target/app` would set the `debug` key
.add_source(config::Environment::with_prefix("APP"))
.build()
.expect("Invalid or missing config file")
}
83 changes: 18 additions & 65 deletions src/main.rs
@@ -1,88 +1,41 @@
use actix_cors::Cors;
use actix_web::{get, App, HttpResponse, HttpServer, Responder};
use config::Config;
use redis::Commands;
use actix_web::{App, HttpServer};

use crate::cache::get_redis_connection;
use crate::watcher::{Watchpoint, WatchpointStatus};
use crate::cache::load_config;
use crate::endpoints::status;
use crate::watcher::setup_watcher;
use simple_status_page::get_config;

mod cache;
mod endpoints;
mod watcher;

#[get("/")]
async fn hello() -> impl Responder {
HttpResponse::Ok().body("Hello world!")
}

#[get("/status")]
async fn status() -> impl Responder {
let mut con = get_redis_connection();
let watchpoints: Vec<Watchpoint> = match con.get::<&str, String>("config:ids") {
Ok(k) => serde_json::from_str(k.as_str()).unwrap(),
Err(_e) => return HttpResponse::InternalServerError().finish(),
};

let mut data: Vec<WatchpointStatus> = Vec::new();
for watchpoint in watchpoints {
let status = con
.get::<&str, u16>(format!("status:{}:status-code", watchpoint.id).as_str())
.unwrap();

data.push(WatchpointStatus { watchpoint, status });
}

HttpResponse::Ok().json(data)
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
let settings = Config::builder()
.add_source(config::File::with_name("config"))
// Add in settings from the environment (with a prefix of APP)
// Eg.. `APP_DEBUG=1 ./target/app` would set the `debug` key
.add_source(config::Environment::with_prefix("APP"))
.build()
.expect("Invalid or missing config file");

let raw_watchlist = settings
.get_array("watcher.watchlist")
.expect("Invalid watch list");
let settings = get_config();

// Make sure we have something to watch
if raw_watchlist.is_empty() {
println!("No entries in watchlist! Nothing to do");
// Load config file into cache, exit if no watchpoints are configured
let has_watchpoints = load_config();
if !has_watchpoints {
println!("No watchpoints configured! See README.md for instructions");
return Ok(());
}

let mut watchlist: Vec<Watchpoint> = Vec::new();
for i in &raw_watchlist {
watchlist.push(i.clone().try_deserialize().expect("invalid config value"));
}

let mut con = get_redis_connection();
let _: () = con
.set("config:ids", serde_json::to_string(&watchlist).unwrap())
.unwrap();

let interval = settings
.get::<u32>("watcher.interval")
.expect("Invalid interval");

actix_rt::spawn(async move {
watcher::start_watcher(interval, &watchlist).await;
});
// Create watcher thread
setup_watcher();

// Grab HTTP server configuration
let host = settings.get_string("webserver.host").expect("Invalid host");
let port = settings.get::<u16>("webserver.port").expect("Invalid port");
let url = settings.get_string("webserver.url").expect("Invalid URL");

HttpServer::new(|| {
let cors = Cors::permissive()
.allowed_origin("http://127.0.0.1:5173")
HttpServer::new(move || {
let cors = Cors::default()
.allowed_origin(&url)
.allowed_methods(vec!["GET"])
.max_age(3600);

App::new().wrap(cors).service(status).service(hello)
App::new().wrap(cors).service(status)
})
.bind((host, port))?
.run()
Expand Down
83 changes: 50 additions & 33 deletions src/watcher.rs
@@ -1,9 +1,10 @@
use chrono::{Local, Utc};
use actix_rt::time::sleep;
use redis::Commands;
use serde::{Deserialize, Serialize};
use tokio_schedule::{every, Job};
use std::time::Duration;

use crate::cache::get_redis_connection;
use crate::cache::{get_redis_connection, get_watchpoints};
use crate::get_config;

#[derive(Deserialize, Serialize)]
pub struct Watchpoint {
Expand All @@ -19,43 +20,59 @@ pub struct WatchpointStatus {
pub status: u16,
}

async fn run_watcher(watchlist: &Vec<Watchpoint>) -> Result<(), Box<dyn std::error::Error>> {
println!("Running watchers - {:?}", Local::now());
/// Check a watchpoints status and save it to redis
async fn check_watchpoint(wp: Watchpoint) {
println!(" - Run watcher for {} - {}", wp.name, wp.ip);

let mut con = get_redis_connection();

for wp in watchlist {
println!(" - Run watcher for {} - {}", wp.name, wp.ip);
let res = reqwest::get(&wp.url).await;
match res {
Ok(response) => {
con.set(
let res = reqwest::get(&wp.url).await;
match res {
Ok(response) => {
let _: () = con
.set(
format!("status:{}:status-code", wp.id),
response.status().as_u16(),
)?;
}
Err(_err) => {
con.set(format!("status:{}:status-code", wp.id), 999)?;
}
)
.unwrap();
}
}

Ok(())
Err(_err) => {
let _: () = con
.set(format!("status:{}:status-code", wp.id), 999)
.unwrap();
}
};
}

pub async fn start_watcher(interval: u32, watchlist: &Vec<Watchpoint>) {
// run initial scan
run_watcher(watchlist)
.await
.expect("unable to perform checks");

let every_second = every(interval)
.seconds()
.in_timezone(&Utc)
.perform(|| async {
run_watcher(watchlist)
.await
.expect("unable to perform checks");
/// Get config from redis and ran watchpoints in parallel
async fn cron_job() {
let settings = get_config();

loop {
// Get interval from cache
let interval = settings
.get::<u64>("watcher.interval")
.expect("Invalid interval");

// Run watcher in separate thread
actix_rt::spawn(async {
// Get watchpoints and run watcher
let watchpoints = get_watchpoints().unwrap();
for wp in watchpoints {
actix_rt::spawn(async move {
check_watchpoint(wp).await;
});
}
});
every_second.await;

// Wait for next execution
sleep(Duration::from_secs(interval)).await;
}
}

/// Set up the watcher thread
pub fn setup_watcher() {
actix_rt::spawn(async {
cron_job().await;
});
}

0 comments on commit 751d042

Please sign in to comment.