Skip to content

Commit

Permalink
add redis cache and simple JSON endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Data5tream committed Apr 3, 2023
1 parent b365523 commit b6d7061
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 22 deletions.
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Expand Up @@ -7,7 +7,9 @@ edition = "2021"
actix-web = "4"
actix-rt = "2.8.0"
chrono = "0.4.24"
config = { version="0.13.3", features=["toml"] }
config = { version = "0.13.3", features = ["toml"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio_schedule = "0.3.0"
redis = "0.22.3"
reqwest = { version = "0.11", features = ["json"] }
8 changes: 8 additions & 0 deletions src/cache.rs
@@ -0,0 +1,8 @@
use redis::Connection;

pub fn get_redis_connection() -> Connection {
redis::Client::open("redis://127.0.01/")
.expect("unable create redis client")
.get_connection()
.expect("unable to get redis connection")
}
43 changes: 36 additions & 7 deletions src/main.rs
@@ -1,15 +1,38 @@
use actix_web::{get, App, HttpResponse, HttpServer, Responder};
use config::Config;
use redis::Commands;

use crate::watcher::Watchpoint;
use crate::cache::get_redis_connection;
use crate::watcher::{Watchpoint, WatchpointStatus};

mod cache;
mod watcher;

#[get("/")]
async fn hello() -> impl Responder {
HttpResponse::Ok().body("Hello world!")
}

#[get("/status")]
async fn status() -> impl Responder {
let mut con = get_redis_connection();
let watchpoints: Vec<Watchpoint> = match con.get::<&str, String>("config:ids") {
Ok(k) => serde_json::from_str(k.as_str()).unwrap(),
Err(_e) => return HttpResponse::InternalServerError().finish(),
};

let mut data: Vec<WatchpointStatus> = Vec::new();
for watchpoint in watchpoints {
let status = con
.get::<&str, u16>(format!("status:{}:status-code", watchpoint.id).as_str())
.unwrap();

data.push(WatchpointStatus { watchpoint, status });
}

HttpResponse::Ok().json(data)
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
let settings = Config::builder()
Expand All @@ -20,7 +43,9 @@ async fn main() -> std::io::Result<()> {
.build()
.expect("Invalid or missing config file");

let raw_watchlist = settings.get_array("watcher.watchlist").expect("Invalid watch list");
let raw_watchlist = settings
.get_array("watcher.watchlist")
.expect("Invalid watch list");

// Make sure we have something to watch
if raw_watchlist.is_empty() {
Expand All @@ -33,7 +58,14 @@ async fn main() -> std::io::Result<()> {
watchlist.push(i.clone().try_deserialize().expect("invalid config value"));
}

let interval = settings.get::<u32>("watcher.interval").expect("Invalid interval");
let mut con = get_redis_connection();
let _: () = con
.set("config:ids", serde_json::to_string(&watchlist).unwrap())
.unwrap();

let interval = settings
.get::<u32>("watcher.interval")
.expect("Invalid interval");

actix_rt::spawn(async move {
watcher::start_watcher(interval, &watchlist).await;
Expand All @@ -43,10 +75,7 @@ async fn main() -> std::io::Result<()> {
let host = settings.get_string("webserver.host").expect("Invalid host");
let port = settings.get::<u16>("webserver.port").expect("Invalid port");

HttpServer::new(|| {
App::new()
.service(hello)
})
HttpServer::new(|| App::new().service(hello).service(status))
.bind((host, port))?
.run()
.await
Expand Down
44 changes: 30 additions & 14 deletions src/watcher.rs
@@ -1,31 +1,41 @@
use chrono::{Local, Utc};
use serde::{Deserialize};
use redis::Commands;
use serde::{Deserialize, Serialize};
use tokio_schedule::{every, Job};

#[derive(Deserialize)]
use crate::cache::get_redis_connection;

#[derive(Deserialize, Serialize)]
pub struct Watchpoint {
pub id: String,
name: String,
ip: String,
url: String
url: String,
}

#[derive(Deserialize, Serialize)]
pub struct WatchpointStatus {
pub watchpoint: Watchpoint,
pub status: u16,
}

async fn run_watcher(watchlist: &Vec<Watchpoint>) -> Result<(), Box<dyn std::error::Error>> {
println!("Running watchers - {:?}", Local::now());

let mut con = get_redis_connection();

for wp in watchlist {
println!(" - Run watcher for {} - {}", wp.name, wp.ip);
let res = reqwest::get(&wp.url).await;
match res {
Ok(response) => {
if response.status().is_success() {
println!(" [+] {} ({}) is up", wp.name, wp.url);
} else {
println!(" [-] {} ({}) returned status code {}", wp.name, wp.url, response.status().as_str());
}
con.set(
format!("status:{}:status-code", wp.id),
response.status().as_u16(),
)?;
}
Err(err) => {
println!(" [-] Unable to connect to {} ({})", wp.name, wp.url);
println!(" {:?}", err);
Err(_err) => {
con.set(format!("status:{}:status-code", wp.id), 999)?;
}
}
}
Expand All @@ -35,11 +45,17 @@ async fn run_watcher(watchlist: &Vec<Watchpoint>) -> Result<(), Box<dyn std::err

pub async fn start_watcher(interval: u32, watchlist: &Vec<Watchpoint>) {
// run initial scan
run_watcher(watchlist).await.expect("unable to perform checks");
run_watcher(watchlist)
.await
.expect("unable to perform checks");

let every_second = every(interval)
.seconds()
.in_timezone(&Utc)
.perform(|| async { run_watcher(watchlist).await.expect("unable to perform checks"); });
.perform(|| async {
run_watcher(watchlist)
.await
.expect("unable to perform checks");
});
every_second.await;
}
}

0 comments on commit b6d7061

Please sign in to comment.