diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index 0ca5103c1..20b1940bd 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "d5d33d6454b8f860b5027bd8ef2ec6082f348be768b504ee6f55c78dd00704bc", + "checksum": "373e54054840c62d3438f63f1494707a66f3be25365a3abb2d53c837a692be7b", "crates": { "actix-codec 0.5.2": { "name": "actix-codec", @@ -29184,6 +29184,132 @@ }, "license": null }, + "log-noise-filter-backend 0.4.2": { + "name": "log-noise-filter-backend", + "version": "0.4.2", + "repository": null, + "targets": [ + { + "Library": { + "crate_name": "log_noise_filter_backend", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "log_noise_filter_backend", + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "deps": { + "common": [ + { + "id": "axum 0.7.5", + "target": "axum" + }, + { + "id": "clap 4.5.7", + "target": "clap" + }, + { + "id": "regex 1.10.5", + "target": "regex" + }, + { + "id": "serde 1.0.203", + "target": "serde" + }, + { + "id": "serde_json 1.0.117", + "target": "serde_json" + }, + { + "id": "slog 2.7.0", + "target": "slog" + }, + { + "id": "slog-async 2.8.0", + "target": "slog_async" + }, + { + "id": "slog-term 2.9.1", + "target": "slog_term" + }, + { + "id": "tokio 1.38.0", + "target": "tokio" + } + ], + "selects": {} + }, + "edition": "2021", + "version": "0.4.2" + }, + "license": null + }, + "log-noise-filter-downloader 0.4.2": { + "name": "log-noise-filter-downloader", + "version": "0.4.2", + "repository": null, + "targets": [], + "library_target_name": null, + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "deps": { + "common": [ + { + "id": "anyhow 1.0.86", + "target": "anyhow" + }, + { + "id": "clap 4.5.7", + "target": "clap" + }, + { + "id": "reqwest 0.12.5", + "target": "reqwest" + }, + { + "id": "serde 1.0.203", + "target": "serde" + }, + { + "id": "serde_json 1.0.117", + "target": "serde_json" + }, + { + "id": "slog 2.7.0", + "target": "slog" + }, + { + "id": "slog-async 2.8.0", + "target": "slog_async" + }, + { + "id": "slog-term 2.9.1", + "target": "slog_term" + }, + { + "id": "tokio 1.38.0", + "target": "tokio" + }, + { + "id": "url 2.5.2", + "target": "url" + } + ], + "selects": {} + }, + "edition": "2021", + "version": "0.4.2" + }, + "license": null + }, "lzma-sys 0.1.20": { "name": "lzma-sys", "version": "0.1.20", @@ -50786,6 +50912,8 @@ "ic-management-backend 0.4.2": "rs/ic-management-backend", "ic-management-types 0.4.2": "rs/ic-management-types", "log-fetcher 0.4.2": "rs/log-fetcher", + "log-noise-filter-backend 0.4.2": "rs/ic-observability/log-noise-filter-backend", + "log-noise-filter-downloader 0.4.2": "rs/ic-observability/log-noise-filter-downloader", "multiservice-discovery 0.4.2": "rs/ic-observability/multiservice-discovery", "multiservice-discovery-downloader 0.4.2": "rs/ic-observability/multiservice-discovery-downloader", "multiservice-discovery-shared 0.4.2": "rs/ic-observability/multiservice-discovery-shared", diff --git a/Cargo.lock b/Cargo.lock index bccd32059..f1bee657c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5860,6 +5860,38 @@ dependencies = [ "url", ] +[[package]] +name = "log-noise-filter-backend" +version = "0.4.2" +dependencies = [ + "axum 0.7.5", + "clap 4.5.7", + "regex", + "serde", + "serde_json", + "slog", + "slog-async", + "slog-term", + "tokio", +] + +[[package]] +name = "log-noise-filter-downloader" +version = "0.4.2" +dependencies = [ + "anyhow", + "clap 4.5.7", + "log-noise-filter-backend", + "reqwest", + "serde", + "serde_json", + "slog", + "slog-async", + "slog-term", + "tokio", + "url", +] + [[package]] name = "lzma-sys" version = "0.1.20" diff --git a/Cargo.toml b/Cargo.toml index 87e967167..fea8c62f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,8 @@ members = [ "rs/ic-management-backend", "rs/ic-management-types", "rs/ic-observability/config-writer-common", + "rs/ic-observability/log-noise-filter-backend", + "rs/ic-observability/log-noise-filter-downloader", "rs/ic-observability/multiservice-discovery", "rs/ic-observability/multiservice-discovery-downloader", "rs/ic-observability/multiservice-discovery-shared", @@ -18,7 +20,7 @@ members = [ "rs/canister-log-fetcher", "rs/np-notifications", "rs/rollout-controller", - "rs/slack-notifications", + "rs/slack-notifications", ] resolver = "2" @@ -44,6 +46,7 @@ async-recursion = "1.1.1" async-timer = "0.7.4" async-trait = "0.1.80" axum-otel-metrics = "0.8.1" +axum = "0.7.5" backoff = { version = "0.4.0", features = ["tokio"] } backon = "0.4.4" candid = "0.10.9" diff --git a/bazel/external_crates.bzl b/bazel/external_crates.bzl index bd85b5b55..53544d77b 100644 --- a/bazel/external_crates.bzl +++ b/bazel/external_crates.bzl @@ -51,6 +51,8 @@ def external_crates_repository(): "//rs/ic-management-backend:Cargo.toml", "//rs/ic-management-types:Cargo.toml", "//rs/ic-observability/config-writer-common:Cargo.toml", + "//rs/ic-observability/log-noise-filter-backend:Cargo.toml", + "//rs/ic-observability/log-noise-filter-downloader:Cargo.toml", "//rs/ic-observability/multiservice-discovery:Cargo.toml", "//rs/ic-observability/multiservice-discovery-downloader:Cargo.toml", "//rs/ic-observability/multiservice-discovery-shared:Cargo.toml", diff --git a/rs/ic-observability/log-noise-filter-backend/BUILD.bazel b/rs/ic-observability/log-noise-filter-backend/BUILD.bazel new file mode 100644 index 000000000..9d4aa2b6d --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/BUILD.bazel @@ -0,0 +1,59 @@ +load("@crate_index_dre//:defs.bzl", "aliases", "all_crate_deps") +load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_test", "rust_library") +load("@//rs:oci_images.bzl", "rust_binary_oci_image_rules") + +package(default_visibility = ["//visibility:public"]) + +DEPS = [] + +rust_library( + name = "log-noise-filter-backend-lib", + srcs = glob(["src/**/*.rs"]), + aliases = aliases(), + crate_name = "log_noise_filter_backend", + proc_macro_deps = all_crate_deps( + proc_macro = True, + ), + deps = all_crate_deps( + normal = True, + ) + DEPS, +) + +BINARY_DEPS = [ + ":log-noise-filter-backend-lib", +] + +rust_binary( + name = "log-noise-filter-backend", + srcs = glob(["src/**/*.rs"]), + aliases = aliases(), + proc_macro_deps = all_crate_deps( + proc_macro = True, + ), + stamp = 1, + deps = all_crate_deps( + normal = True, + ) + DEPS + BINARY_DEPS, +) + +rust_test( + name = "unit_test", + aliases = aliases( + normal_dev = True, + proc_macro_dev = True, + ), + crate = ":log-noise-filter-backend-lib", + proc_macro_deps = all_crate_deps( + proc_macro_dev = True, + ), + deps = all_crate_deps( + normal_dev = True, + ) + DEPS, +) + +rust_binary_oci_image_rules( + name = "oci_image", + src = ":log-noise-filter-backend", + base_image = "@debian-slim" +) + diff --git a/rs/ic-observability/log-noise-filter-backend/Cargo.toml b/rs/ic-observability/log-noise-filter-backend/Cargo.toml new file mode 100644 index 000000000..1d57a629b --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "log-noise-filter-backend" +version.workspace = true +edition.workspace = true +authors.workspace = true +description.workspace = true +documentation.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +clap = { workspace = true } +axum = { workspace = true } +slog = { workspace = true } +slog-async = { workspace = true } +slog-term = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +regex = { workspace = true } + +[[bin]] +name = "log-noise-filter-backend" +path = "src/main.rs" + +[lib] +name = "log_noise_filter_backend" +path = "src/lib.rs" diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/delete_criteria.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/delete_criteria.rs new file mode 100644 index 000000000..cca1992d6 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/delete_criteria.rs @@ -0,0 +1,22 @@ +use std::collections::BTreeMap; + +use axum::{extract::State, http::StatusCode, Json}; +use slog::{info, warn}; + +use crate::handlers::Server; + +pub async fn delete_criteria( + State(state): State, + Json(criteria): Json>, +) -> Result>, (StatusCode, String)> { + match state.delete_criteria(criteria.clone()).await { + Ok(()) => { + info!(state.logger, "Deleted criteria"; "indexes" => ?criteria); + Ok(Json(state.get_criteria_mapped().await)) + } + Err(missing) => { + warn!(state.logger, "Failed to delete criteria"; "indexes" => ?missing); + Err((StatusCode::NOT_FOUND, format!("Missing indexes: {:?}", missing))) + } + } +} diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/get_criteria.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/get_criteria.rs new file mode 100644 index 000000000..7d1bcd023 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/get_criteria.rs @@ -0,0 +1,10 @@ +use std::collections::BTreeMap; + +use axum::http::StatusCode; +use axum::{extract::State, Json}; + +use crate::handlers::Server; + +pub(crate) async fn get_criteria(State(state): State) -> Result>, (StatusCode, String)> { + Ok(Json(state.get_criteria_mapped().await)) +} diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/mod.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/mod.rs new file mode 100644 index 000000000..d13c834d8 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/mod.rs @@ -0,0 +1,3 @@ +pub mod delete_criteria; +pub mod get_criteria; +pub mod post_criteria; diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/post_criteria.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/post_criteria.rs new file mode 100644 index 000000000..2eebe9683 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/post_criteria.rs @@ -0,0 +1,12 @@ +use std::collections::BTreeMap; + +use axum::{extract::State, http::StatusCode, Json}; + +use crate::handlers::Server; + +pub async fn update(State(state): State, Json(criteria): Json>) -> Result>, (StatusCode, String)> { + match state.update_criteria(criteria).await { + Ok(()) => Ok(Json(state.get_criteria_mapped().await)), + Err(v) => Err((StatusCode::BAD_REQUEST, format!("Invalid instances of regex: {:?}", v))), + } +} diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/get_all.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/get_all.rs new file mode 100644 index 000000000..9f3e42d93 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/get_all.rs @@ -0,0 +1,12 @@ +use axum::{extract::State, http::StatusCode, Json}; + +use super::{Server, WholeState}; + +pub async fn get_all(State(server): State) -> Result, (StatusCode, String)> { + let state = WholeState { + criteria: server.get_criteria_mapped().await, + rate: server.get_rate().await, + }; + + Ok(Json(state)) +} diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs new file mode 100644 index 000000000..4704985fd --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs @@ -0,0 +1,148 @@ +use std::{collections::BTreeMap, hash::Hash, net::SocketAddr, path::PathBuf, sync::Arc}; + +use axum::{ + routing::{delete, get, post, put}, + Router, +}; +use criteria::{delete_criteria::delete_criteria, get_criteria::get_criteria, post_criteria::update}; + +use get_all::get_all; +use rate::{get_rate::get_rate, put_rate::put_rate}; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use slog::{warn, Logger}; +use tokio::sync::Mutex; + +pub(crate) mod criteria; +pub mod get_all; +pub(crate) mod rate; + +#[derive(Serialize, Deserialize)] +pub struct WholeState { + pub rate: u64, + pub criteria: BTreeMap, +} + +impl Hash for WholeState { + fn hash(&self, state: &mut H) { + self.rate.hash(state); + self.criteria.hash(state); + } +} + +impl Default for WholeState { + fn default() -> Self { + Self { + rate: 1500, + criteria: Default::default(), + } + } +} + +#[derive(Clone)] +pub struct Server { + pub logger: Logger, + criteria: Arc>>, + rate: Arc>, + path: Option, +} + +impl Server { + pub fn new(logger: Logger, rate: u64, criteria: Vec, path: Option) -> Self { + Self { + logger, + criteria: Arc::new(Mutex::new(criteria)), + rate: Arc::new(Mutex::new(rate)), + path, + } + } + + pub async fn run(&self, socket: SocketAddr) { + let app = Router::new() + .route("/criteria", get(get_criteria)) + .route("/criteria", post(update)) + .route("/criteria", delete(delete_criteria)) + .route("/rate", get(get_rate)) + .route("/rate", put(put_rate)) + .route("/", get(get_all)) + .with_state(self.clone()); + let listener = tokio::net::TcpListener::bind(socket).await.unwrap(); + axum::serve(listener, app) + .with_graceful_shutdown(async move { + tokio::signal::ctrl_c().await.unwrap(); + }) + .await + .unwrap(); + } + + pub async fn get_criteria_mapped(&self) -> BTreeMap { + let criteria = self.criteria.lock().await; + criteria.iter().enumerate().map(|(i, s)| (i as u32, s.clone())).collect() + } + + pub async fn get_rate(&self) -> u64 { + *self.rate.lock().await + } + + pub async fn update_criteria(&self, mut criteria: Vec) -> Result<(), Vec> { + let mut server_criteria = self.criteria.lock().await; + let mut errors = vec![]; + for c in criteria.iter_mut() { + *c = c.replace('\\', "\\\\"); + *c = c.replace('\'', "\\'"); + if let Err(e) = Regex::new(c) { + errors.push(e.to_string()); + } + } + + if !errors.is_empty() { + return Err(errors); + } + criteria.into_iter().for_each(|c| server_criteria.push(c)); + drop(server_criteria); + self.save_whole_state().await; + Ok(()) + } + + pub async fn update_rate(&self, rate: u64) -> u64 { + *self.rate.lock().await = rate; + rate + } + + pub async fn delete_criteria(&self, mut indexes: Vec) -> Result<(), Vec> { + let mut server_criteria = self.criteria.lock().await; + indexes.sort_by(|a, b| b.cmp(a)); + let missing = indexes + .iter() + .filter_map(|c| match server_criteria.get(*c as usize) { + Some(_) => None, + None => Some(*c), + }) + .collect::>(); + + if !missing.is_empty() { + return Err(missing); + } + + indexes.iter().for_each(|c| { + server_criteria.remove(*c as usize); + }); + drop(server_criteria); + self.save_whole_state().await; + Ok(()) + } + + async fn save_whole_state(&self) { + let state = WholeState { + rate: *self.rate.lock().await, + criteria: self.get_criteria_mapped().await, + }; + + if let Some(path) = &self.path { + match tokio::fs::write(path, serde_json::to_string_pretty(&state).unwrap()).await { + Ok(_) => (), + Err(e) => warn!(self.logger, "Failed to serialize state file {}, the error was: {:?}", path.display(), e), + }; + } + } +} diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/rate/get_rate.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/rate/get_rate.rs new file mode 100644 index 000000000..44133482b --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/rate/get_rate.rs @@ -0,0 +1,7 @@ +use axum::{extract::State, http::StatusCode, Json}; + +use crate::handlers::Server; + +pub(crate) async fn get_rate(State(state): State) -> Result, (StatusCode, String)> { + Ok(Json(state.get_rate().await)) +} diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/rate/mod.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/rate/mod.rs new file mode 100644 index 000000000..15b230dd4 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/rate/mod.rs @@ -0,0 +1,2 @@ +pub mod get_rate; +pub mod put_rate; diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/rate/put_rate.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/rate/put_rate.rs new file mode 100644 index 000000000..aee63f779 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/rate/put_rate.rs @@ -0,0 +1,7 @@ +use axum::{extract::State, http::StatusCode, Json}; + +use crate::handlers::Server; + +pub async fn put_rate(State(state): State, Json(rate): Json) -> Result, (StatusCode, String)> { + Ok(Json(state.update_rate(rate).await)) +} diff --git a/rs/ic-observability/log-noise-filter-backend/src/lib.rs b/rs/ic-observability/log-noise-filter-backend/src/lib.rs new file mode 100644 index 000000000..f8615f7fd --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/lib.rs @@ -0,0 +1,3 @@ +pub mod handlers; +#[cfg(test)] +mod tests; diff --git a/rs/ic-observability/log-noise-filter-backend/src/main.rs b/rs/ic-observability/log-noise-filter-backend/src/main.rs new file mode 100644 index 000000000..d8f617f4e --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/main.rs @@ -0,0 +1,90 @@ +use std::{ + collections::BTreeMap, + net::{Ipv4Addr, SocketAddr}, + path::PathBuf, +}; + +use clap::Parser; +use handlers::WholeState; +use slog::{info, o, warn, Drain, Level, Logger}; + +use crate::handlers::Server; + +mod handlers; + +#[tokio::main] +async fn main() { + let cli = Cli::parse(); + let logger = make_logger(from_str_to_log(&cli.log_level)); + info!(logger, "Running with following args: {:?}", cli); + + let socket = SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::UNSPECIFIED), cli.port); + info!(logger, "Running noise filter manager {}", socket); + + let (global_rate, criteria) = load_state(&cli.state_file, &logger).await; + + let server = Server::new(logger.clone(), global_rate, criteria.into_values().collect(), Some(cli.state_file)); + server.run(socket).await; + + info!(logger, "Noise filter manager stopped"); +} + +async fn load_state(path: &PathBuf, logger: &Logger) -> (u64, BTreeMap) { + if path.exists() { + let content = tokio::fs::read_to_string(path).await.unwrap(); + let maybe_state = serde_json::from_str::(&content); + match maybe_state { + Ok(state) => return (state.rate, state.criteria), + Err(e) => warn!( + logger, + "Failed to deserialize state file {}, will remove it and recreate. The error was: {:?}", + path.display(), + e + ), + } + } + + let default = WholeState::default(); + tokio::fs::write(path, serde_json::to_string_pretty(&default).unwrap()).await.unwrap(); + (default.rate, default.criteria) +} + +fn make_logger(level: Level) -> Logger { + let decorator = slog_term::TermDecorator::new().build(); + let full_format = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog::Filter::new(full_format, move |record: &slog::Record| record.level().is_at_least(level)).fuse(); + let drain = slog_async::Async::new(drain).chan_size(8192).build(); + Logger::root(drain.fuse(), o!()) +} + +#[derive(Parser, Debug)] +struct Cli { + #[clap( + long, + default_value = "info", + help = r#" +Log level to use for running. You can use standard log levels 'info', +'critical', 'error', 'warning', 'trace', 'debug' + +"# + )] + log_level: String, + + #[clap(long, default_value = "8080", help = "Port to use for running the api")] + port: u16, + + #[clap(long, help = "State file used to sync across restarts")] + state_file: PathBuf, +} + +fn from_str_to_log(value: &str) -> Level { + match value { + "info" => Level::Info, + "critical" => Level::Critical, + "error" => Level::Error, + "warning" => Level::Warning, + "trace" => Level::Trace, + "debug" => Level::Debug, + _ => panic!("Unsupported level: {}", value), + } +} diff --git a/rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs b/rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs new file mode 100644 index 000000000..17a1eeae4 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs @@ -0,0 +1,62 @@ +use axum::Json; + +use crate::{ + handlers::criteria::{delete_criteria::delete_criteria, get_criteria::get_criteria, post_criteria::update}, + tests::{server, server_with_criteria}, +}; + +#[tokio::test] +async fn delete_criteria_test() { + let server = server_with_criteria(["test", "another", "one more"].iter().map(|f| f.to_string()).collect()); + let payload = Json(vec![2, 0]); + + let resp = delete_criteria(server, payload).await; + + assert!(resp.is_ok()); + let resp = resp.unwrap(); + assert!(!resp.is_empty()); + assert!(resp.first_key_value().is_some()); + let (key, value) = resp.first_key_value().unwrap(); + assert_eq!(key, &0); + assert_eq!(value, "another") +} + +#[tokio::test] +async fn delete_criteria_error_test() { + let server = server(); + let payload = Json(vec![1]); + + let resp = delete_criteria(server, payload).await; + + assert!(resp.is_err()); +} + +#[tokio::test] +async fn get_criteria_test() { + let posted_criteria: Vec = ["test", "another", "one more"].iter().map(|f| f.to_string()).collect(); + let server = server_with_criteria(posted_criteria.clone()); + let resp = get_criteria(server).await; + + assert!(resp.is_ok()); + let resp = resp.unwrap(); + assert_eq!(resp.len(), posted_criteria.len()); + for i in 0..posted_criteria.len() { + assert_eq!(resp[&(i as u32)], posted_criteria[i]); + } +} + +#[tokio::test] +async fn post_criteria_test() { + let server = server_with_criteria(vec![]); + let payload = Json(["test", "another", "one more"].iter().map(|f| f.to_string()).collect::>()); + + let resp = update(server, payload).await; + + assert!(resp.is_ok()); + let resp = resp.unwrap(); + assert!(!resp.is_empty()); + assert!(resp.first_key_value().is_some()); + let (key, value) = resp.first_key_value().unwrap(); + assert_eq!(key, &0); + assert_eq!(value, "test") +} diff --git a/rs/ic-observability/log-noise-filter-backend/src/tests/mod.rs b/rs/ic-observability/log-noise-filter-backend/src/tests/mod.rs new file mode 100644 index 000000000..8852d1064 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/tests/mod.rs @@ -0,0 +1,25 @@ +use axum::extract::State; + +use crate::handlers::Server; + +mod criteria_tests; +mod rate_tests; + +const RATE: u64 = 42; +const CRITERIA: &[&str] = &["test.*"]; + +fn criteria() -> Vec { + CRITERIA.iter().map(|s| s.to_string()).collect() +} + +fn server() -> State { + server_with_rate_and_criteria(RATE, criteria()) +} + +fn server_with_rate_and_criteria(rate: u64, criteria: Vec) -> State { + State(Server::new(slog::Logger::root(slog::Discard, slog::o!()), rate, criteria, None)) +} + +fn server_with_criteria(criteria: Vec) -> State { + server_with_rate_and_criteria(RATE, criteria) +} diff --git a/rs/ic-observability/log-noise-filter-backend/src/tests/rate_tests.rs b/rs/ic-observability/log-noise-filter-backend/src/tests/rate_tests.rs new file mode 100644 index 000000000..722c1037e --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/tests/rate_tests.rs @@ -0,0 +1,15 @@ +use axum::Json; + +use crate::{handlers::rate::put_rate::put_rate, tests::server}; + +#[tokio::test] +async fn test_put() { + let server = server(); + let payload = Json(1200); + + let resp = put_rate(server, payload).await; + assert!(resp.is_ok()); + + let resp = resp.unwrap(); + assert_eq!(resp.0, 1200); +} diff --git a/rs/ic-observability/log-noise-filter-downloader/BUILD.bazel b/rs/ic-observability/log-noise-filter-downloader/BUILD.bazel new file mode 100644 index 000000000..b3c3b92a0 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-downloader/BUILD.bazel @@ -0,0 +1,27 @@ +load("@crate_index_dre//:defs.bzl", "aliases", "all_crate_deps") +load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_test") +load("@//rs:oci_images.bzl", "rust_binary_oci_image_rules") + +DEPS = [ + "//rs/ic-observability/log-noise-filter-backend:log-noise-filter-backend-lib", +] + +rust_binary( + name = "log-noise-filter-downloader", + srcs = glob(["src/**/*.rs"]), + aliases = aliases(), + proc_macro_deps = all_crate_deps( + proc_macro = True, + ), + stamp = 1, + deps = all_crate_deps( + normal = True, + ) + DEPS, +) + +rust_binary_oci_image_rules( + name = "oci_image", + src = ":log-noise-filter-downloader", + base_image = "@debian-slim" +) + diff --git a/rs/ic-observability/log-noise-filter-downloader/Cargo.toml b/rs/ic-observability/log-noise-filter-downloader/Cargo.toml new file mode 100644 index 000000000..7dc9d961a --- /dev/null +++ b/rs/ic-observability/log-noise-filter-downloader/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "log-noise-filter-downloader" +version.workspace = true +edition.workspace = true +authors.workspace = true +description.workspace = true +documentation.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +clap = { workspace = true } +slog = { workspace = true } +slog-async = { workspace = true } +slog-term = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +url = { workspace = true } +reqwest = { workspace = true } +anyhow = { workspace = true } +log-noise-filter-backend = { path = "../log-noise-filter-backend" } diff --git a/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs b/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs new file mode 100644 index 000000000..0af2f07d4 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs @@ -0,0 +1,124 @@ +use std::{ + collections::{hash_map::DefaultHasher, BTreeMap}, + hash::{Hash, Hasher}, + path::PathBuf, + time::Duration, +}; + +use log_noise_filter_backend::handlers::WholeState; +use reqwest::Client; +use serde::Serialize; +use slog::{info, warn, Logger}; +use tokio::{io::AsyncWriteExt, select}; +use url::Url; + +pub async fn download_loop(url: Url, logger: Logger, path: PathBuf, inputs: Vec, transform_id: String) { + let client = reqwest::Client::builder() + .connect_timeout(Duration::from_secs(30)) + .build() + .expect("Should be able to build a client"); + + let mut interval = tokio::time::interval(Duration::from_secs(15)); + let mut current_hash = 0; + loop { + select! { + tick = interval.tick() => { + info!(logger, "Running loop @ {:?}", tick); + }, + _ = tokio::signal::ctrl_c() => { + info!(logger, "Received shutdown signal, exiting..."); + break + } + } + + let response = match fetch_criteria(&client, url.clone(), &logger).await { + Ok(r) => r, + Err(_) => continue, + }; + + if !content_changed(&mut current_hash, &response, &logger) { + continue; + } + + write_to_file(&response, &inputs, &transform_id, &path).await + } +} + +async fn fetch_criteria(client: &Client, url: Url, logger: &Logger) -> anyhow::Result { + let response = client.get(url.clone()).send().await; + let response = match response { + Ok(r) if r.status().is_success() => r, + Ok(r) => { + warn!( + logger, + "Received error status while downloading: {:?}\n{:?}", + r.status(), + r.text().await.expect("Should have text") + ); + anyhow::bail!(""); + } + Err(e) => { + warn!(logger, "Error while downloading: {:?}", e); + anyhow::bail!(""); + } + }; + + match response.json().await { + Ok(r) => Ok(r), + Err(e) => { + warn!(logger, "Failed to parse response: {:?}", e); + anyhow::bail!(""); + } + } +} + +fn content_changed(current_hash: &mut u64, new_criteria: &WholeState, logger: &Logger) -> bool { + let mut hasher = DefaultHasher::new(); + new_criteria.hash(&mut hasher); + let new_hash = hasher.finish(); + if &new_hash == current_hash { + info!(logger, "Hash hasn't changed, skipping"); + return false; + } + + info!(logger, "Hash changed: {} -> {}", current_hash, new_hash); + *current_hash = new_hash; + true +} + +async fn write_to_file(state: &WholeState, inputs: &[String], transform_id: &String, path: &PathBuf) { + let (prefix, criteria) = match state.criteria.is_empty() { + // If the list is empty we should not sample anything. + // In other words we should exclude everything. + true => ("", "r'.*'".to_string()), + false => ( + "!", + state.criteria.values().map(|s| format!("r'{}'", s)).collect::>().join(","), + ), + }; + + let transform = VectorSampleTransform { + _type: "sample".to_string(), + inputs: inputs.to_owned(), + rate: state.rate, + exclude: format!("{}match_any(to_string(.MESSAGE) ?? \"\", [{}])", prefix, criteria), + }; + + let mut transforms = BTreeMap::new(); + transforms.insert(transform_id, transform); + let mut total = BTreeMap::new(); + total.insert("transforms", transforms); + + let transform = serde_json::to_string_pretty(&total).expect("Should be able to serialize"); + let mut file = tokio::fs::File::create(path).await.expect("Should be able to create file"); + file.write_all(transform.as_bytes()).await.expect("Should be able to write"); +} + +#[derive(Debug, Serialize, Clone)] +struct VectorSampleTransform { + #[serde(rename = "type")] + _type: String, + inputs: Vec, + rate: u64, + exclude: String, +} diff --git a/rs/ic-observability/log-noise-filter-downloader/src/main.rs b/rs/ic-observability/log-noise-filter-downloader/src/main.rs new file mode 100644 index 000000000..f20d948d0 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-downloader/src/main.rs @@ -0,0 +1,63 @@ +use std::path::PathBuf; + +use clap::Parser; +use download_loop::download_loop; +use slog::{info, o, Drain, Level, Logger}; +use url::Url; + +mod download_loop; + +#[tokio::main] +async fn main() { + let cli = Cli::parse(); + let logger = make_logger(from_str_to_log(&cli.log_level)); + info!(logger, "Running with following args: {:?}", cli); + + download_loop(cli.url, logger, cli.path, cli.inputs, cli.transform_id).await +} + +fn make_logger(level: Level) -> Logger { + let decorator = slog_term::TermDecorator::new().build(); + let full_format = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog::Filter::new(full_format, move |record: &slog::Record| record.level().is_at_least(level)).fuse(); + let drain = slog_async::Async::new(drain).chan_size(8192).build(); + Logger::root(drain.fuse(), o!()) +} + +#[derive(Parser, Debug)] +struct Cli { + #[clap( + long, + default_value = "info", + help = r#" +Log level to use for running. You can use standard log levels 'info', +'critical', 'error', 'warning', 'trace', 'debug' + +"# + )] + log_level: String, + + #[clap(long, help = "Url for the backend")] + url: Url, + + #[clap(long, help = "Path to where the output should be generated")] + path: PathBuf, + + #[clap(long, help = "Inputs that will be linked to this transform")] + inputs: Vec, + + #[clap(long, help = "Transform id", default_value = "sample-ic-logs-transform")] + transform_id: String, +} + +fn from_str_to_log(value: &str) -> Level { + match value { + "info" => Level::Info, + "critical" => Level::Critical, + "error" => Level::Error, + "warning" => Level::Warning, + "trace" => Level::Trace, + "debug" => Level::Debug, + _ => panic!("Unsupported level: {}", value), + } +} diff --git a/rs/ic-observability/multiservice-discovery/Cargo.toml b/rs/ic-observability/multiservice-discovery/Cargo.toml index c5d8e65ae..664121dea 100644 --- a/rs/ic-observability/multiservice-discovery/Cargo.toml +++ b/rs/ic-observability/multiservice-discovery/Cargo.toml @@ -27,7 +27,7 @@ slog-term = { workspace = true } tokio = { workspace = true } url = { workspace = true } futures.workspace = true -axum = "0.7.5" +axum = { workspace = true } axum-otel-metrics.workspace = true opentelemetry.workspace = true retry = { workspace = true }