From c8a910647fded1c17836fa3129a610e711e77067 Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Thu, 25 Apr 2024 14:11:32 +0200 Subject: [PATCH 01/23] started work on filter manager --- Cargo.lock | 65 +++++++- Cargo.toml | 1 + rs/log-noise-filter-manager/Cargo.toml | 20 +++ .../src/handlers/get.rs | 24 +++ .../src/handlers/mod.rs | 146 ++++++++++++++++++ .../src/handlers/put.rs | 18 +++ rs/log-noise-filter-manager/src/main.rs | 81 ++++++++++ 7 files changed, 353 insertions(+), 2 deletions(-) create mode 100644 rs/log-noise-filter-manager/Cargo.toml create mode 100644 rs/log-noise-filter-manager/src/handlers/get.rs create mode 100644 rs/log-noise-filter-manager/src/handlers/mod.rs create mode 100644 rs/log-noise-filter-manager/src/handlers/put.rs create mode 100644 rs/log-noise-filter-manager/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 661c0b66e..1278ee1a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5871,6 +5871,21 @@ dependencies = [ "url", ] +[[package]] +name = "log-noise-filter-manager" +version = "0.3.2" +dependencies = [ + "axum 0.7.4", + "clap 4.5.4", + "serde", + "serde_json", + "slog", + "slog-async", + "slog-term", + "tokio", + "toml", +] + [[package]] name = "lzma-sys" version = "0.1.20" @@ -7913,6 +7928,15 @@ dependencies = [ "syn 2.0.52", ] +[[package]] +name = "serde_spanned" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb3622f419d1296904700073ea6cc23ad690adbd66f13ea683df73298736f0c1" +dependencies = [ + "serde", +] + [[package]] name = "serde_tokenstream" version = "0.1.7" @@ -8769,11 +8793,26 @@ dependencies = [ "tracing", ] +[[package]] +name = "toml" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9dd1545e8208b4a5af1aa9bbd0b4cf7e9ea08fabc5d0a5c67fcaafa17433aa3" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit 0.22.12", +] + [[package]] name = "toml_datetime" version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" +dependencies = [ + "serde", +] [[package]] name = "toml_edit" @@ -8783,7 +8822,7 @@ checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ "indexmap 2.2.5", "toml_datetime", - "winnow", + "winnow 0.5.40", ] [[package]] @@ -8794,7 +8833,20 @@ checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ "indexmap 2.2.5", "toml_datetime", - "winnow", + "winnow 0.5.40", +] + +[[package]] +name = "toml_edit" +version = "0.22.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3328d4f68a705b2a4498da1d580585d39a6510f98318a2cec3018a7ec61ddef" +dependencies = [ + "indexmap 2.2.5", + "serde", + "serde_spanned", + "toml_datetime", + "winnow 0.6.6", ] [[package]] @@ -9491,6 +9543,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "winnow" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0c976aaaa0e1f90dbb21e9587cdaf1d9679a1cde8875c0d6bd83ab96a208352" +dependencies = [ + "memchr", +] + [[package]] name = "winreg" version = "0.50.0" diff --git a/Cargo.toml b/Cargo.toml index bad5b9e10..f65cbf995 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ members = [ "rs/np-notifications", "rs/rollout-controller", "rs/slack-notifications", + "rs/log-noise-filter-manager", ] resolver = "2" diff --git a/rs/log-noise-filter-manager/Cargo.toml b/rs/log-noise-filter-manager/Cargo.toml new file mode 100644 index 000000000..ddb17cd00 --- /dev/null +++ b/rs/log-noise-filter-manager/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "log-noise-filter-manager" +version.workspace = true +edition.workspace = true +authors.workspace = true +description.workspace = true +documentation.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +clap = { workspace = true } +axum = "0.7.4" +slog = { workspace = true } +slog-async = { workspace = true } +slog-term = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +toml = "0.8.12" diff --git a/rs/log-noise-filter-manager/src/handlers/get.rs b/rs/log-noise-filter-manager/src/handlers/get.rs new file mode 100644 index 000000000..4f030ea25 --- /dev/null +++ b/rs/log-noise-filter-manager/src/handlers/get.rs @@ -0,0 +1,24 @@ +use axum::http::StatusCode; +use axum::{extract::State, Json}; + +use super::{Server, TopLevelVectorTransform}; + +pub(crate) async fn content( + State(state): State, +) -> Result, (StatusCode, String)> { + Ok(Json(state.read_file().await)) +} + +pub(crate) async fn only_routes(State(state): State) -> Result>, (StatusCode, String)> { + let content = state.read_file().await; + Ok(Json( + content + .transforms + .noise_filter + .route + .noisy + .split('\n') + .map(|t| t.to_string()) + .collect(), + )) +} diff --git a/rs/log-noise-filter-manager/src/handlers/mod.rs b/rs/log-noise-filter-manager/src/handlers/mod.rs new file mode 100644 index 000000000..3bcbbcf85 --- /dev/null +++ b/rs/log-noise-filter-manager/src/handlers/mod.rs @@ -0,0 +1,146 @@ +use std::{net::SocketAddr, path::PathBuf, sync::Arc}; + +use axum::Router; +use serde::{Deserialize, Serialize}; +use slog::{error, info, Logger}; +use tokio::sync::Mutex; + +use self::{ + get::{content, only_routes}, + put::update, +}; +use axum::routing::{get, put}; + +mod get; +mod put; + +#[derive(Clone)] +pub struct Server { + pub logger: Logger, + pub file_path: PathBuf, + mutex: Arc>, +} + +impl Server { + pub fn new(logger: Logger, file_path: PathBuf) -> Self { + Self { + logger, + file_path, + mutex: Arc::new(Mutex::new(())), + } + } + + pub async fn run(&self, socket: SocketAddr, reroute_unmatched: String, inputs: Vec) { + self.ensure_file_exists(reroute_unmatched, inputs).await; + + let app = Router::new() + .route("/", get(content)) + .route("/only-routes", get(only_routes)) + .route("/", put(update)) + .with_state(self.clone()); + let listener = tokio::net::TcpListener::bind(socket).await.unwrap(); + axum::serve(listener, app) + .with_graceful_shutdown(async move { + tokio::signal::ctrl_c().await.unwrap(); + }) + .await + .unwrap(); + } + + async fn ensure_file_exists(&self, reroute_unmatched: String, inputs: Vec) { + match tokio::fs::File::open(&self.file_path).await { + Ok(_) => { + let _ = self.read_file().await; + info!(self.logger, "Validated initial toml content"); + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + match tokio::fs::File::create(&self.file_path).await { + Ok(_) => { + self.write_structure(&Self::get_initial(reroute_unmatched, inputs)) + .await; + } + Err(e) => { + error!(self.logger, "Received an error while creating the file: {:?}", e); + panic!(); + } + } + } + Err(e) => { + error!(self.logger, "Received unexpected error: {:?}", e); + panic!(); + } + }; + } + + fn get_initial(reroute_unmatched: String, inputs: Vec) -> TopLevelVectorTransform { + TopLevelVectorTransform { + transforms: VectorTransform { + noise_filter: NoiseFilter { + type_: "route".to_string(), + inputs, + reroute_unmatched, + route: Route { + noisy: "false".to_string(), + }, + }, + }, + } + } + + pub async fn write_structure(&self, structure: &TopLevelVectorTransform) { + let serialized = match toml::to_string_pretty(&structure) { + Ok(v) => v, + Err(e) => { + error!(self.logger, "Error while serializing initial structure: {:?}", e); + panic!(); + } + }; + match tokio::fs::write(&self.file_path, &serialized.as_bytes()).await { + Ok(_) => info!(self.logger, "Serialized initial structure"), + Err(e) => { + error!(self.logger, "Couldn't serialize initial strucuture: {:?}", e); + panic!() + } + } + } + + pub async fn read_file(&self) -> TopLevelVectorTransform { + let _ = self.mutex.lock().await; + let content = match tokio::fs::read_to_string(&self.file_path).await { + Ok(c) => c, + Err(e) => { + error!(self.logger, "Couldn't read content to string: {:?}", e); + panic!() + } + }; + + match toml::from_str::(&content) { + Ok(v) => v, + Err(e) => { + error!(self.logger, "Validation of initial toml failed: {:?}", e); + panic!() + } + } + } +} + +#[derive(Serialize, Deserialize)] +pub struct TopLevelVectorTransform { + pub transforms: VectorTransform, +} +#[derive(Serialize, Deserialize)] + +pub struct VectorTransform { + pub noise_filter: NoiseFilter, +} +#[derive(Serialize, Deserialize)] +pub struct NoiseFilter { + pub type_: String, + pub inputs: Vec, + pub reroute_unmatched: String, + pub route: Route, +} +#[derive(Serialize, Deserialize)] +pub struct Route { + pub noisy: String, +} diff --git a/rs/log-noise-filter-manager/src/handlers/put.rs b/rs/log-noise-filter-manager/src/handlers/put.rs new file mode 100644 index 000000000..0d68718e4 --- /dev/null +++ b/rs/log-noise-filter-manager/src/handlers/put.rs @@ -0,0 +1,18 @@ +use axum::{extract::State, http::StatusCode, Json}; + +use super::{Server, TopLevelVectorTransform}; + +pub async fn update( + State(state): State, + Json(routes): Json>, +) -> Result, (StatusCode, String)> { + let mut content = state.read_file().await; + if routes.is_empty() { + content.transforms.noise_filter.route.noisy = "false".to_string(); + } else { + content.transforms.noise_filter.route.noisy = routes.join("\n") + } + + state.write_structure(&content).await; + Ok(Json(content)) +} diff --git a/rs/log-noise-filter-manager/src/main.rs b/rs/log-noise-filter-manager/src/main.rs new file mode 100644 index 000000000..90b9637eb --- /dev/null +++ b/rs/log-noise-filter-manager/src/main.rs @@ -0,0 +1,81 @@ +use std::{ + net::{Ipv4Addr, SocketAddr}, + path::PathBuf, +}; + +use clap::Parser; +use slog::{info, o, Drain, Level, Logger}; + +use crate::handlers::Server; + +mod handlers; + +#[tokio::main] +async fn main() { + let cli = Cli::parse(); + let logger = make_logger(from_str_to_log(&cli.log_level)); + info!(logger, "Running with following args: {:?}", cli); + + let socket = SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::UNSPECIFIED), cli.port); + info!(logger, "Running noise filter manager {}", socket); + + let server = Server::new(logger.clone(), cli.file_path); + server.run(socket, cli.reroute_unmached, cli.inputs).await; + + info!(logger, "Noise filter manager stopped"); +} + +fn make_logger(level: Level) -> Logger { + let decorator = slog_term::TermDecorator::new().build(); + let full_format = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog::Filter::new(full_format, move |record: &slog::Record| { + record.level().is_at_least(level) + }) + .fuse(); + let drain = slog_async::Async::new(drain).chan_size(8192).build(); + Logger::root(drain.fuse(), o!()) +} + +#[derive(Parser, Debug)] +struct Cli { + #[clap( + long, + default_value = "info", + help = r#" +Log level to use for running. You can use standard log levels 'info', +'critical', 'error', 'warning', 'trace', 'debug' + +"# + )] + log_level: String, + + #[clap(long, default_value = "8080", help = "Port to use for running the api")] + port: u16, + + #[clap( + long, + help = "File path to the vector config in toml used for the routing configuration" + )] + file_path: PathBuf, + + #[clap( + long, + help = "Explained: https://vector.dev/docs/reference/configuration/transforms/route/#reroute_unmatched" + )] + reroute_unmached: String, + + #[clap(long, help = "All inputs that should be linked to this transform")] + inputs: Vec, +} + +fn from_str_to_log(value: &str) -> Level { + match value { + "info" => Level::Info, + "critical" => Level::Critical, + "error" => Level::Error, + "warning" => Level::Warning, + "trace" => Level::Trace, + "debug" => Level::Debug, + _ => panic!("Unsupported level: {}", value), + } +} From 4aeb156f3630c05fc451f9ffe4eb34bda4554ae2 Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Thu, 25 Apr 2024 15:29:32 +0200 Subject: [PATCH 02/23] fixing separator --- rs/log-noise-filter-manager/src/handlers/get.rs | 4 ++-- rs/log-noise-filter-manager/src/handlers/mod.rs | 9 ++++++++- rs/log-noise-filter-manager/src/handlers/put.rs | 4 ++-- rs/log-noise-filter-manager/src/main.rs | 4 ++-- 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/rs/log-noise-filter-manager/src/handlers/get.rs b/rs/log-noise-filter-manager/src/handlers/get.rs index 4f030ea25..8c784dc99 100644 --- a/rs/log-noise-filter-manager/src/handlers/get.rs +++ b/rs/log-noise-filter-manager/src/handlers/get.rs @@ -1,7 +1,7 @@ use axum::http::StatusCode; use axum::{extract::State, Json}; -use super::{Server, TopLevelVectorTransform}; +use super::{Server, TopLevelVectorTransform, SEPARATOR}; pub(crate) async fn content( State(state): State, @@ -17,7 +17,7 @@ pub(crate) async fn only_routes(State(state): State) -> Result { match tokio::fs::File::create(&self.file_path).await { Ok(_) => { + if inputs.is_empty() { + error!(self.logger, "Vector needs at least one input."); + panic!() + } self.write_structure(&Self::get_initial(reroute_unmatched, inputs)) .await; + info!(self.logger, "Serialized initial structure") } Err(e) => { error!(self.logger, "Received an error while creating the file: {:?}", e); @@ -96,7 +103,7 @@ impl Server { } }; match tokio::fs::write(&self.file_path, &serialized.as_bytes()).await { - Ok(_) => info!(self.logger, "Serialized initial structure"), + Ok(_) => {} Err(e) => { error!(self.logger, "Couldn't serialize initial strucuture: {:?}", e); panic!() diff --git a/rs/log-noise-filter-manager/src/handlers/put.rs b/rs/log-noise-filter-manager/src/handlers/put.rs index 0d68718e4..2bae92090 100644 --- a/rs/log-noise-filter-manager/src/handlers/put.rs +++ b/rs/log-noise-filter-manager/src/handlers/put.rs @@ -1,6 +1,6 @@ use axum::{extract::State, http::StatusCode, Json}; -use super::{Server, TopLevelVectorTransform}; +use super::{Server, TopLevelVectorTransform, SEPARATOR}; pub async fn update( State(state): State, @@ -10,7 +10,7 @@ pub async fn update( if routes.is_empty() { content.transforms.noise_filter.route.noisy = "false".to_string(); } else { - content.transforms.noise_filter.route.noisy = routes.join("\n") + content.transforms.noise_filter.route.noisy = routes.join(SEPARATOR) } state.write_structure(&content).await; diff --git a/rs/log-noise-filter-manager/src/main.rs b/rs/log-noise-filter-manager/src/main.rs index 90b9637eb..af21d527d 100644 --- a/rs/log-noise-filter-manager/src/main.rs +++ b/rs/log-noise-filter-manager/src/main.rs @@ -20,7 +20,7 @@ async fn main() { info!(logger, "Running noise filter manager {}", socket); let server = Server::new(logger.clone(), cli.file_path); - server.run(socket, cli.reroute_unmached, cli.inputs).await; + server.run(socket, cli.reroute_unmatched, cli.inputs).await; info!(logger, "Noise filter manager stopped"); } @@ -62,7 +62,7 @@ Log level to use for running. You can use standard log levels 'info', long, help = "Explained: https://vector.dev/docs/reference/configuration/transforms/route/#reroute_unmatched" )] - reroute_unmached: String, + reroute_unmatched: String, #[clap(long, help = "All inputs that should be linked to this transform")] inputs: Vec, From b2c57b6b411ee12b4c23065c0532b78d2ddb123b Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Thu, 25 Apr 2024 16:23:00 +0200 Subject: [PATCH 03/23] started working on the newest glibc --- Cargo.Bazel.lock | 279 +++++++++++++++++++++++- bazel/external_crates.bzl | 1 + rs/log-noise-filter-manager/BUILD.bazel | 25 +++ 3 files changed, 304 insertions(+), 1 deletion(-) create mode 100644 rs/log-noise-filter-manager/BUILD.bazel diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index 4a3515dfd..942cde901 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "010c4cc7f8ff7c440201253636da5ef5b4ba9097e64e163e4cfd27559cfa1a33", + "checksum": "cc2a7d694017824a421208f5b929af2aaf3b8f1666c4b93f850fd60f008242b1", "crates": { "actix-codec 0.5.2": { "name": "actix-codec", @@ -29245,6 +29245,62 @@ }, "license": null }, + "log-noise-filter-manager 0.3.2": { + "name": "log-noise-filter-manager", + "version": "0.3.2", + "repository": null, + "targets": [], + "library_target_name": null, + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "deps": { + "common": [ + { + "id": "axum 0.7.4", + "target": "axum" + }, + { + "id": "clap 4.5.4", + "target": "clap" + }, + { + "id": "serde 1.0.197", + "target": "serde" + }, + { + "id": "serde_json 1.0.115", + "target": "serde_json" + }, + { + "id": "slog 2.7.0", + "target": "slog" + }, + { + "id": "slog-async 2.8.0", + "target": "slog_async" + }, + { + "id": "slog-term 2.9.1", + "target": "slog_term" + }, + { + "id": "tokio 1.36.0", + "target": "tokio" + }, + { + "id": "toml 0.8.12", + "target": "toml" + } + ], + "selects": {} + }, + "edition": "2021", + "version": "0.3.2" + }, + "license": null + }, "lzma-sys 0.1.20": { "name": "lzma-sys", "version": "0.1.20", @@ -39960,6 +40016,51 @@ }, "license": "MIT OR Apache-2.0" }, + "serde_spanned 0.6.5": { + "name": "serde_spanned", + "version": "0.6.5", + "repository": { + "Http": { + "url": "https://static.crates.io/crates/serde_spanned/0.6.5/download", + "sha256": "eb3622f419d1296904700073ea6cc23ad690adbd66f13ea683df73298736f0c1" + } + }, + "targets": [ + { + "Library": { + "crate_name": "serde_spanned", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "serde_spanned", + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "crate_features": { + "common": [ + "serde" + ], + "selects": {} + }, + "deps": { + "common": [ + { + "id": "serde 1.0.197", + "target": "serde" + } + ], + "selects": {} + }, + "edition": "2021", + "version": "0.6.5" + }, + "license": "MIT OR Apache-2.0" + }, "serde_tokenstream 0.1.7": { "name": "serde_tokenstream", "version": "0.1.7", @@ -44254,6 +44355,65 @@ }, "license": "MIT" }, + "toml 0.8.12": { + "name": "toml", + "version": "0.8.12", + "repository": { + "Http": { + "url": "https://static.crates.io/crates/toml/0.8.12/download", + "sha256": "e9dd1545e8208b4a5af1aa9bbd0b4cf7e9ea08fabc5d0a5c67fcaafa17433aa3" + } + }, + "targets": [ + { + "Library": { + "crate_name": "toml", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "toml", + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "crate_features": { + "common": [ + "default", + "display", + "parse" + ], + "selects": {} + }, + "deps": { + "common": [ + { + "id": "serde 1.0.197", + "target": "serde" + }, + { + "id": "serde_spanned 0.6.5", + "target": "serde_spanned" + }, + { + "id": "toml_datetime 0.6.5", + "target": "toml_datetime" + }, + { + "id": "toml_edit 0.22.12", + "target": "toml_edit" + } + ], + "selects": {} + }, + "edition": "2021", + "version": "0.8.12" + }, + "license": "MIT OR Apache-2.0" + }, "toml_datetime 0.6.5": { "name": "toml_datetime", "version": "0.6.5", @@ -44279,6 +44439,21 @@ "compile_data_glob": [ "**" ], + "crate_features": { + "common": [ + "serde" + ], + "selects": {} + }, + "deps": { + "common": [ + { + "id": "serde 1.0.197", + "target": "serde" + } + ], + "selects": {} + }, "edition": "2021", "version": "0.6.5" }, @@ -44384,6 +44559,69 @@ }, "license": "MIT OR Apache-2.0" }, + "toml_edit 0.22.12": { + "name": "toml_edit", + "version": "0.22.12", + "repository": { + "Http": { + "url": "https://static.crates.io/crates/toml_edit/0.22.12/download", + "sha256": "d3328d4f68a705b2a4498da1d580585d39a6510f98318a2cec3018a7ec61ddef" + } + }, + "targets": [ + { + "Library": { + "crate_name": "toml_edit", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "toml_edit", + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "crate_features": { + "common": [ + "display", + "parse", + "serde" + ], + "selects": {} + }, + "deps": { + "common": [ + { + "id": "indexmap 2.2.5", + "target": "indexmap" + }, + { + "id": "serde 1.0.197", + "target": "serde" + }, + { + "id": "serde_spanned 0.6.5", + "target": "serde_spanned" + }, + { + "id": "toml_datetime 0.6.5", + "target": "toml_datetime" + }, + { + "id": "winnow 0.6.6", + "target": "winnow" + } + ], + "selects": {} + }, + "edition": "2021", + "version": "0.22.12" + }, + "license": "MIT OR Apache-2.0" + }, "tonic 0.11.0": { "name": "tonic", "version": "0.11.0", @@ -48636,6 +48874,44 @@ }, "license": "MIT" }, + "winnow 0.6.6": { + "name": "winnow", + "version": "0.6.6", + "repository": { + "Http": { + "url": "https://static.crates.io/crates/winnow/0.6.6/download", + "sha256": "f0c976aaaa0e1f90dbb21e9587cdaf1d9679a1cde8875c0d6bd83ab96a208352" + } + }, + "targets": [ + { + "Library": { + "crate_name": "winnow", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "winnow", + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "crate_features": { + "common": [ + "alloc", + "default", + "std" + ], + "selects": {} + }, + "edition": "2021", + "version": "0.6.6" + }, + "license": "MIT" + }, "winreg 0.50.0": { "name": "winreg", "version": "0.50.0", @@ -50255,6 +50531,7 @@ "ic-management-backend 0.3.2": "rs/ic-management-backend", "ic-management-types 0.3.2": "rs/ic-management-types", "log-fetcher 0.3.2": "rs/log-fetcher", + "log-noise-filter-manager 0.3.2": "rs/log-noise-filter-manager", "multiservice-discovery 0.3.2": "rs/ic-observability/multiservice-discovery", "multiservice-discovery-downloader 0.3.2": "rs/ic-observability/multiservice-discovery-downloader", "multiservice-discovery-shared 0.3.2": "rs/ic-observability/multiservice-discovery-shared", diff --git a/bazel/external_crates.bzl b/bazel/external_crates.bzl index bd85b5b55..9718779a9 100644 --- a/bazel/external_crates.bzl +++ b/bazel/external_crates.bzl @@ -63,6 +63,7 @@ def external_crates_repository(): "//rs/np-notifications:Cargo.toml", "//rs/slack-notifications:Cargo.toml", "//rs/rollout-controller:Cargo.toml", + "//rs/log-noise-filter-manager:Cargo.toml" ], splicing_config = splicing_config( resolver_version = "2", diff --git a/rs/log-noise-filter-manager/BUILD.bazel b/rs/log-noise-filter-manager/BUILD.bazel new file mode 100644 index 000000000..00e236abe --- /dev/null +++ b/rs/log-noise-filter-manager/BUILD.bazel @@ -0,0 +1,25 @@ +load("@crate_index_dre//:defs.bzl", "aliases", "all_crate_deps") +load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_test") +load("@//rs:oci_images.bzl", "rust_binary_oci_image_rules") + +DEPS = [] + +rust_binary( + name = "log-noise-filter-manager", + srcs = glob(["src/**/*.rs"]), + aliases = aliases(), + proc_macro_deps = all_crate_deps( + proc_macro = True, + ), + stamp = 1, + deps = all_crate_deps( + normal = True, + ) + DEPS, +) + +rust_binary_oci_image_rules( + name = "oci_image", + src = ":log-noise-filter-manager", + base_image = "@cc-debian12" +) + From b4580b0b0328b2c33bd1be8283481449dad5fc0d Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Fri, 26 Apr 2024 16:18:57 +0200 Subject: [PATCH 04/23] fixing bazel base image --- rs/log-noise-filter-manager/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rs/log-noise-filter-manager/BUILD.bazel b/rs/log-noise-filter-manager/BUILD.bazel index 00e236abe..b01c71bab 100644 --- a/rs/log-noise-filter-manager/BUILD.bazel +++ b/rs/log-noise-filter-manager/BUILD.bazel @@ -20,6 +20,6 @@ rust_binary( rust_binary_oci_image_rules( name = "oci_image", src = ":log-noise-filter-manager", - base_image = "@cc-debian12" + base_image = "@debian-slim" ) From 5aba5a411c865bf837ccbaa63f7cbdd83b35c7d4 Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Tue, 18 Jun 2024 15:28:01 +0200 Subject: [PATCH 05/23] running rustfmt --- .../src/handlers/get.rs | 4 +-- .../src/handlers/mod.rs | 31 ++++++++----------- .../src/handlers/put.rs | 5 +-- rs/log-noise-filter-manager/src/main.rs | 10 ++---- 4 files changed, 17 insertions(+), 33 deletions(-) diff --git a/rs/log-noise-filter-manager/src/handlers/get.rs b/rs/log-noise-filter-manager/src/handlers/get.rs index 8c784dc99..b09a99469 100644 --- a/rs/log-noise-filter-manager/src/handlers/get.rs +++ b/rs/log-noise-filter-manager/src/handlers/get.rs @@ -3,9 +3,7 @@ use axum::{extract::State, Json}; use super::{Server, TopLevelVectorTransform, SEPARATOR}; -pub(crate) async fn content( - State(state): State, -) -> Result, (StatusCode, String)> { +pub(crate) async fn content(State(state): State) -> Result, (StatusCode, String)> { Ok(Json(state.read_file().await)) } diff --git a/rs/log-noise-filter-manager/src/handlers/mod.rs b/rs/log-noise-filter-manager/src/handlers/mod.rs index a68685855..19cbb6446 100644 --- a/rs/log-noise-filter-manager/src/handlers/mod.rs +++ b/rs/log-noise-filter-manager/src/handlers/mod.rs @@ -55,23 +55,20 @@ impl Server { let _ = self.read_file().await; info!(self.logger, "Validated initial toml content"); } - Err(e) if e.kind() == std::io::ErrorKind::NotFound => { - match tokio::fs::File::create(&self.file_path).await { - Ok(_) => { - if inputs.is_empty() { - error!(self.logger, "Vector needs at least one input."); - panic!() - } - self.write_structure(&Self::get_initial(reroute_unmatched, inputs)) - .await; - info!(self.logger, "Serialized initial structure") - } - Err(e) => { - error!(self.logger, "Received an error while creating the file: {:?}", e); - panic!(); + Err(e) if e.kind() == std::io::ErrorKind::NotFound => match tokio::fs::File::create(&self.file_path).await { + Ok(_) => { + if inputs.is_empty() { + error!(self.logger, "Vector needs at least one input."); + panic!() } + self.write_structure(&Self::get_initial(reroute_unmatched, inputs)).await; + info!(self.logger, "Serialized initial structure") } - } + Err(e) => { + error!(self.logger, "Received an error while creating the file: {:?}", e); + panic!(); + } + }, Err(e) => { error!(self.logger, "Received unexpected error: {:?}", e); panic!(); @@ -86,9 +83,7 @@ impl Server { type_: "route".to_string(), inputs, reroute_unmatched, - route: Route { - noisy: "false".to_string(), - }, + route: Route { noisy: "false".to_string() }, }, }, } diff --git a/rs/log-noise-filter-manager/src/handlers/put.rs b/rs/log-noise-filter-manager/src/handlers/put.rs index 2bae92090..be2e9e66c 100644 --- a/rs/log-noise-filter-manager/src/handlers/put.rs +++ b/rs/log-noise-filter-manager/src/handlers/put.rs @@ -2,10 +2,7 @@ use axum::{extract::State, http::StatusCode, Json}; use super::{Server, TopLevelVectorTransform, SEPARATOR}; -pub async fn update( - State(state): State, - Json(routes): Json>, -) -> Result, (StatusCode, String)> { +pub async fn update(State(state): State, Json(routes): Json>) -> Result, (StatusCode, String)> { let mut content = state.read_file().await; if routes.is_empty() { content.transforms.noise_filter.route.noisy = "false".to_string(); diff --git a/rs/log-noise-filter-manager/src/main.rs b/rs/log-noise-filter-manager/src/main.rs index af21d527d..9d01c9e3f 100644 --- a/rs/log-noise-filter-manager/src/main.rs +++ b/rs/log-noise-filter-manager/src/main.rs @@ -28,10 +28,7 @@ async fn main() { fn make_logger(level: Level) -> Logger { let decorator = slog_term::TermDecorator::new().build(); let full_format = slog_term::FullFormat::new(decorator).build().fuse(); - let drain = slog::Filter::new(full_format, move |record: &slog::Record| { - record.level().is_at_least(level) - }) - .fuse(); + let drain = slog::Filter::new(full_format, move |record: &slog::Record| record.level().is_at_least(level)).fuse(); let drain = slog_async::Async::new(drain).chan_size(8192).build(); Logger::root(drain.fuse(), o!()) } @@ -52,10 +49,7 @@ Log level to use for running. You can use standard log levels 'info', #[clap(long, default_value = "8080", help = "Port to use for running the api")] port: u16, - #[clap( - long, - help = "File path to the vector config in toml used for the routing configuration" - )] + #[clap(long, help = "File path to the vector config in toml used for the routing configuration")] file_path: PathBuf, #[clap( From 96ce60241b40f6915f29d5a629ec08bad3896079 Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Wed, 26 Jun 2024 15:24:24 +0200 Subject: [PATCH 06/23] refactoring --- Cargo.Bazel.lock | 8 +- Cargo.lock | 2 +- Cargo.toml | 2 +- bazel/external_crates.bzl | 2 +- .../BUILD.bazel | 4 +- .../Cargo.toml | 2 +- .../src/handlers/delete.rs | 15 ++ .../src/handlers/get.rs | 10 ++ .../src/handlers/mod.rs | 77 +++++++++ .../src/handlers/put.rs | 11 ++ .../src/main.rs | 21 +-- .../src/handlers/get.rs | 22 --- .../src/handlers/mod.rs | 148 ------------------ .../src/handlers/put.rs | 15 -- 14 files changed, 126 insertions(+), 213 deletions(-) rename rs/{log-noise-filter-manager => log-noise-filter-backend}/BUILD.bazel (86%) rename rs/{log-noise-filter-manager => log-noise-filter-backend}/Cargo.toml (93%) create mode 100644 rs/log-noise-filter-backend/src/handlers/delete.rs create mode 100644 rs/log-noise-filter-backend/src/handlers/get.rs create mode 100644 rs/log-noise-filter-backend/src/handlers/mod.rs create mode 100644 rs/log-noise-filter-backend/src/handlers/put.rs rename rs/{log-noise-filter-manager => log-noise-filter-backend}/src/main.rs (72%) delete mode 100644 rs/log-noise-filter-manager/src/handlers/get.rs delete mode 100644 rs/log-noise-filter-manager/src/handlers/mod.rs delete mode 100644 rs/log-noise-filter-manager/src/handlers/put.rs diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index 344aef98b..b87d71a93 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "57018fa2234f578824c1c81bebdd725c3bca052a4210cc4f8ead192c8d153972", + "checksum": "077a6edeb56f425cff5d5be0fa78051775b205b6764662613cbb2b00b09f40c1", "crates": { "actix-codec 0.5.2": { "name": "actix-codec", @@ -29180,8 +29180,8 @@ }, "license": null }, - "log-noise-filter-manager 0.4.1": { - "name": "log-noise-filter-manager", + "log-noise-filter-backend 0.4.1": { + "name": "log-noise-filter-backend", "version": "0.4.1", "repository": null, "targets": [], @@ -51058,7 +51058,7 @@ "ic-management-backend 0.4.1": "rs/ic-management-backend", "ic-management-types 0.4.1": "rs/ic-management-types", "log-fetcher 0.4.1": "rs/log-fetcher", - "log-noise-filter-manager 0.4.1": "rs/log-noise-filter-manager", + "log-noise-filter-backend 0.4.1": "rs/log-noise-filter-backend", "multiservice-discovery 0.4.1": "rs/ic-observability/multiservice-discovery", "multiservice-discovery-downloader 0.4.1": "rs/ic-observability/multiservice-discovery-downloader", "multiservice-discovery-shared 0.4.1": "rs/ic-observability/multiservice-discovery-shared", diff --git a/Cargo.lock b/Cargo.lock index 5ee88e92d..7bbab17ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5860,7 +5860,7 @@ dependencies = [ ] [[package]] -name = "log-noise-filter-manager" +name = "log-noise-filter-backend" version = "0.4.1" dependencies = [ "axum 0.7.5", diff --git a/Cargo.toml b/Cargo.toml index 7428724c1..dad12be66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,11 +15,11 @@ members = [ "rs/ic-observability/service-discovery", "rs/ic-observability/sns-downloader", "rs/log-fetcher", + "rs/log-noise-filter-backend", "rs/canister-log-fetcher", "rs/np-notifications", "rs/rollout-controller", "rs/slack-notifications", - "rs/log-noise-filter-manager", ] resolver = "2" diff --git a/bazel/external_crates.bzl b/bazel/external_crates.bzl index 9718779a9..3e8df8cc1 100644 --- a/bazel/external_crates.bzl +++ b/bazel/external_crates.bzl @@ -63,7 +63,7 @@ def external_crates_repository(): "//rs/np-notifications:Cargo.toml", "//rs/slack-notifications:Cargo.toml", "//rs/rollout-controller:Cargo.toml", - "//rs/log-noise-filter-manager:Cargo.toml" + "//rs/log-noise-filter-backend:Cargo.toml" ], splicing_config = splicing_config( resolver_version = "2", diff --git a/rs/log-noise-filter-manager/BUILD.bazel b/rs/log-noise-filter-backend/BUILD.bazel similarity index 86% rename from rs/log-noise-filter-manager/BUILD.bazel rename to rs/log-noise-filter-backend/BUILD.bazel index b01c71bab..45b1a02d8 100644 --- a/rs/log-noise-filter-manager/BUILD.bazel +++ b/rs/log-noise-filter-backend/BUILD.bazel @@ -5,7 +5,7 @@ load("@//rs:oci_images.bzl", "rust_binary_oci_image_rules") DEPS = [] rust_binary( - name = "log-noise-filter-manager", + name = "log-noise-filter-backend", srcs = glob(["src/**/*.rs"]), aliases = aliases(), proc_macro_deps = all_crate_deps( @@ -19,7 +19,7 @@ rust_binary( rust_binary_oci_image_rules( name = "oci_image", - src = ":log-noise-filter-manager", + src = ":log-noise-filter-backend", base_image = "@debian-slim" ) diff --git a/rs/log-noise-filter-manager/Cargo.toml b/rs/log-noise-filter-backend/Cargo.toml similarity index 93% rename from rs/log-noise-filter-manager/Cargo.toml rename to rs/log-noise-filter-backend/Cargo.toml index ddb17cd00..1da98fb41 100644 --- a/rs/log-noise-filter-manager/Cargo.toml +++ b/rs/log-noise-filter-backend/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "log-noise-filter-manager" +name = "log-noise-filter-backend" version.workspace = true edition.workspace = true authors.workspace = true diff --git a/rs/log-noise-filter-backend/src/handlers/delete.rs b/rs/log-noise-filter-backend/src/handlers/delete.rs new file mode 100644 index 000000000..5182346cd --- /dev/null +++ b/rs/log-noise-filter-backend/src/handlers/delete.rs @@ -0,0 +1,15 @@ +use std::collections::BTreeMap; + +use axum::{extract::State, http::StatusCode, Json}; + +use super::Server; + +pub async fn delete_criteria( + State(state): State, + Json(criteria): Json>, +) -> Result>, (StatusCode, String)> { + match state.delete_criteria(criteria).await { + Ok(()) => Ok(Json(state.get_criteria_mapped().await)), + Err(missing) => Err((StatusCode::NOT_FOUND, format!("Missing indexes: {:?}", missing))), + } +} diff --git a/rs/log-noise-filter-backend/src/handlers/get.rs b/rs/log-noise-filter-backend/src/handlers/get.rs new file mode 100644 index 000000000..ee7e5c9d4 --- /dev/null +++ b/rs/log-noise-filter-backend/src/handlers/get.rs @@ -0,0 +1,10 @@ +use std::collections::BTreeMap; + +use axum::http::StatusCode; +use axum::{extract::State, Json}; + +use super::Server; + +pub(crate) async fn get_criteria(State(state): State) -> Result>, (StatusCode, String)> { + Ok(Json(state.get_criteria_mapped().await)) +} diff --git a/rs/log-noise-filter-backend/src/handlers/mod.rs b/rs/log-noise-filter-backend/src/handlers/mod.rs new file mode 100644 index 000000000..aa0a9c0b3 --- /dev/null +++ b/rs/log-noise-filter-backend/src/handlers/mod.rs @@ -0,0 +1,77 @@ +use std::{collections::BTreeMap, net::SocketAddr, sync::Arc}; + +use axum::{routing::delete, Router}; +use delete::delete_criteria; +use get::get_criteria; +use slog::Logger; +use tokio::sync::Mutex; + +use self::put::update; +use axum::routing::{get, put}; + +mod delete; +mod get; +mod put; + +#[derive(Clone)] +pub struct Server { + pub logger: Logger, + criteria: Arc>>, +} + +impl Server { + pub fn new(logger: Logger) -> Self { + Self { + logger, + criteria: Arc::new(Mutex::new(vec![])), + } + } + + pub async fn run(&self, socket: SocketAddr) { + let app = Router::new() + .route("/", get(get_criteria)) + .route("/", put(update)) + .route("/", delete(delete_criteria)) + .with_state(self.clone()); + let listener = tokio::net::TcpListener::bind(socket).await.unwrap(); + axum::serve(listener, app) + .with_graceful_shutdown(async move { + tokio::signal::ctrl_c().await.unwrap(); + }) + .await + .unwrap(); + } + + pub async fn get_criteria_mapped(&self) -> BTreeMap { + let criteria = self.criteria.lock().await; + criteria.iter().enumerate().map(|(i, s)| (i as u32, s.clone())).collect() + } + + pub async fn update_criteria(&self, criteria: Vec) { + let mut server_criteria = self.criteria.lock().await; + criteria.iter().for_each(|c| { + server_criteria.push(c.to_string()); + }); + } + + pub async fn delete_criteria(&self, indexes: Vec) -> Result<(), Vec> { + let mut server_criteria = self.criteria.lock().await; + let missing = indexes + .iter() + .filter_map(|c| match server_criteria.get(*c as usize) { + Some(_) => None, + None => Some(*c), + }) + .collect::>(); + + if !missing.is_empty() { + return Err(missing); + } + + indexes.iter().for_each(|c| { + server_criteria.remove(*c as usize); + }); + + Ok(()) + } +} diff --git a/rs/log-noise-filter-backend/src/handlers/put.rs b/rs/log-noise-filter-backend/src/handlers/put.rs new file mode 100644 index 000000000..ac5e11c22 --- /dev/null +++ b/rs/log-noise-filter-backend/src/handlers/put.rs @@ -0,0 +1,11 @@ +use std::collections::BTreeMap; + +use axum::{extract::State, http::StatusCode, Json}; + +use super::Server; + +pub async fn update(State(state): State, Json(criteria): Json>) -> Result>, (StatusCode, String)> { + state.update_criteria(criteria).await; + + Ok(Json(state.get_criteria_mapped().await)) +} diff --git a/rs/log-noise-filter-manager/src/main.rs b/rs/log-noise-filter-backend/src/main.rs similarity index 72% rename from rs/log-noise-filter-manager/src/main.rs rename to rs/log-noise-filter-backend/src/main.rs index 9d01c9e3f..684d1fda9 100644 --- a/rs/log-noise-filter-manager/src/main.rs +++ b/rs/log-noise-filter-backend/src/main.rs @@ -1,7 +1,4 @@ -use std::{ - net::{Ipv4Addr, SocketAddr}, - path::PathBuf, -}; +use std::net::{Ipv4Addr, SocketAddr}; use clap::Parser; use slog::{info, o, Drain, Level, Logger}; @@ -19,8 +16,8 @@ async fn main() { let socket = SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::UNSPECIFIED), cli.port); info!(logger, "Running noise filter manager {}", socket); - let server = Server::new(logger.clone(), cli.file_path); - server.run(socket, cli.reroute_unmatched, cli.inputs).await; + let server = Server::new(logger.clone()); + server.run(socket).await; info!(logger, "Noise filter manager stopped"); } @@ -48,18 +45,6 @@ Log level to use for running. You can use standard log levels 'info', #[clap(long, default_value = "8080", help = "Port to use for running the api")] port: u16, - - #[clap(long, help = "File path to the vector config in toml used for the routing configuration")] - file_path: PathBuf, - - #[clap( - long, - help = "Explained: https://vector.dev/docs/reference/configuration/transforms/route/#reroute_unmatched" - )] - reroute_unmatched: String, - - #[clap(long, help = "All inputs that should be linked to this transform")] - inputs: Vec, } fn from_str_to_log(value: &str) -> Level { diff --git a/rs/log-noise-filter-manager/src/handlers/get.rs b/rs/log-noise-filter-manager/src/handlers/get.rs deleted file mode 100644 index b09a99469..000000000 --- a/rs/log-noise-filter-manager/src/handlers/get.rs +++ /dev/null @@ -1,22 +0,0 @@ -use axum::http::StatusCode; -use axum::{extract::State, Json}; - -use super::{Server, TopLevelVectorTransform, SEPARATOR}; - -pub(crate) async fn content(State(state): State) -> Result, (StatusCode, String)> { - Ok(Json(state.read_file().await)) -} - -pub(crate) async fn only_routes(State(state): State) -> Result>, (StatusCode, String)> { - let content = state.read_file().await; - Ok(Json( - content - .transforms - .noise_filter - .route - .noisy - .split(SEPARATOR) - .map(|t| t.to_string()) - .collect(), - )) -} diff --git a/rs/log-noise-filter-manager/src/handlers/mod.rs b/rs/log-noise-filter-manager/src/handlers/mod.rs deleted file mode 100644 index 19cbb6446..000000000 --- a/rs/log-noise-filter-manager/src/handlers/mod.rs +++ /dev/null @@ -1,148 +0,0 @@ -use std::{net::SocketAddr, path::PathBuf, sync::Arc}; - -use axum::Router; -use serde::{Deserialize, Serialize}; -use slog::{error, info, Logger}; -use tokio::sync::Mutex; - -use self::{ - get::{content, only_routes}, - put::update, -}; -use axum::routing::{get, put}; - -mod get; -mod put; - -pub const SEPARATOR: &str = " || "; - -#[derive(Clone)] -pub struct Server { - pub logger: Logger, - pub file_path: PathBuf, - mutex: Arc>, -} - -impl Server { - pub fn new(logger: Logger, file_path: PathBuf) -> Self { - Self { - logger, - file_path, - mutex: Arc::new(Mutex::new(())), - } - } - - pub async fn run(&self, socket: SocketAddr, reroute_unmatched: String, inputs: Vec) { - self.ensure_file_exists(reroute_unmatched, inputs).await; - - let app = Router::new() - .route("/", get(content)) - .route("/only-routes", get(only_routes)) - .route("/", put(update)) - .with_state(self.clone()); - let listener = tokio::net::TcpListener::bind(socket).await.unwrap(); - axum::serve(listener, app) - .with_graceful_shutdown(async move { - tokio::signal::ctrl_c().await.unwrap(); - }) - .await - .unwrap(); - } - - async fn ensure_file_exists(&self, reroute_unmatched: String, inputs: Vec) { - match tokio::fs::File::open(&self.file_path).await { - Ok(_) => { - let _ = self.read_file().await; - info!(self.logger, "Validated initial toml content"); - } - Err(e) if e.kind() == std::io::ErrorKind::NotFound => match tokio::fs::File::create(&self.file_path).await { - Ok(_) => { - if inputs.is_empty() { - error!(self.logger, "Vector needs at least one input."); - panic!() - } - self.write_structure(&Self::get_initial(reroute_unmatched, inputs)).await; - info!(self.logger, "Serialized initial structure") - } - Err(e) => { - error!(self.logger, "Received an error while creating the file: {:?}", e); - panic!(); - } - }, - Err(e) => { - error!(self.logger, "Received unexpected error: {:?}", e); - panic!(); - } - }; - } - - fn get_initial(reroute_unmatched: String, inputs: Vec) -> TopLevelVectorTransform { - TopLevelVectorTransform { - transforms: VectorTransform { - noise_filter: NoiseFilter { - type_: "route".to_string(), - inputs, - reroute_unmatched, - route: Route { noisy: "false".to_string() }, - }, - }, - } - } - - pub async fn write_structure(&self, structure: &TopLevelVectorTransform) { - let serialized = match toml::to_string_pretty(&structure) { - Ok(v) => v, - Err(e) => { - error!(self.logger, "Error while serializing initial structure: {:?}", e); - panic!(); - } - }; - match tokio::fs::write(&self.file_path, &serialized.as_bytes()).await { - Ok(_) => {} - Err(e) => { - error!(self.logger, "Couldn't serialize initial strucuture: {:?}", e); - panic!() - } - } - } - - pub async fn read_file(&self) -> TopLevelVectorTransform { - let _ = self.mutex.lock().await; - let content = match tokio::fs::read_to_string(&self.file_path).await { - Ok(c) => c, - Err(e) => { - error!(self.logger, "Couldn't read content to string: {:?}", e); - panic!() - } - }; - - match toml::from_str::(&content) { - Ok(v) => v, - Err(e) => { - error!(self.logger, "Validation of initial toml failed: {:?}", e); - panic!() - } - } - } -} - -#[derive(Serialize, Deserialize)] -pub struct TopLevelVectorTransform { - pub transforms: VectorTransform, -} -#[derive(Serialize, Deserialize)] - -pub struct VectorTransform { - pub noise_filter: NoiseFilter, -} -#[derive(Serialize, Deserialize)] -pub struct NoiseFilter { - pub type_: String, - pub inputs: Vec, - pub reroute_unmatched: String, - pub route: Route, -} -#[derive(Serialize, Deserialize)] -pub struct Route { - pub noisy: String, -} diff --git a/rs/log-noise-filter-manager/src/handlers/put.rs b/rs/log-noise-filter-manager/src/handlers/put.rs deleted file mode 100644 index be2e9e66c..000000000 --- a/rs/log-noise-filter-manager/src/handlers/put.rs +++ /dev/null @@ -1,15 +0,0 @@ -use axum::{extract::State, http::StatusCode, Json}; - -use super::{Server, TopLevelVectorTransform, SEPARATOR}; - -pub async fn update(State(state): State, Json(routes): Json>) -> Result, (StatusCode, String)> { - let mut content = state.read_file().await; - if routes.is_empty() { - content.transforms.noise_filter.route.noisy = "false".to_string(); - } else { - content.transforms.noise_filter.route.noisy = routes.join(SEPARATOR) - } - - state.write_structure(&content).await; - Ok(Json(content)) -} From b9975218d2266733aed40a7fc78c0c09e7bb3224 Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Fri, 28 Jun 2024 11:17:01 +0200 Subject: [PATCH 07/23] initial model --- Cargo.lock | 66 +++--------- Cargo.toml | 5 +- bazel/external_crates.bzl | 3 +- .../log-noise-filter-backend/BUILD.bazel | 0 .../log-noise-filter-backend/Cargo.toml | 1 - .../src/handlers/delete.rs | 0 .../src/handlers/get.rs | 0 .../src/handlers/mod.rs | 0 .../src/handlers/put.rs | 0 .../log-noise-filter-backend/src/main.rs | 0 .../log-noise-filter-downloader/BUILD.bazel | 25 +++++ .../log-noise-filter-downloader/Cargo.toml | 20 ++++ .../src/download_loop.rs | 101 ++++++++++++++++++ .../log-noise-filter-downloader/src/main.rs | 70 ++++++++++++ 14 files changed, 238 insertions(+), 53 deletions(-) rename rs/{ => ic-observability}/log-noise-filter-backend/BUILD.bazel (100%) rename rs/{ => ic-observability}/log-noise-filter-backend/Cargo.toml (97%) rename rs/{ => ic-observability}/log-noise-filter-backend/src/handlers/delete.rs (100%) rename rs/{ => ic-observability}/log-noise-filter-backend/src/handlers/get.rs (100%) rename rs/{ => ic-observability}/log-noise-filter-backend/src/handlers/mod.rs (100%) rename rs/{ => ic-observability}/log-noise-filter-backend/src/handlers/put.rs (100%) rename rs/{ => ic-observability}/log-noise-filter-backend/src/main.rs (100%) create mode 100644 rs/ic-observability/log-noise-filter-downloader/BUILD.bazel create mode 100644 rs/ic-observability/log-noise-filter-downloader/Cargo.toml create mode 100644 rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs create mode 100644 rs/ic-observability/log-noise-filter-downloader/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 7bbab17ac..280eca59e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5871,7 +5871,21 @@ dependencies = [ "slog-async", "slog-term", "tokio", - "toml", +] + +[[package]] +name = "log-noise-filter-downloader" +version = "0.4.1" +dependencies = [ + "clap 4.5.7", + "reqwest", + "serde", + "serde_json", + "slog", + "slog-async", + "slog-term", + "tokio", + "url", ] [[package]] @@ -8072,15 +8086,6 @@ dependencies = [ "syn 2.0.66", ] -[[package]] -name = "serde_spanned" -version = "0.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79e674e01f999af37c49f70a6ede167a8a60b2503e56c5599532a65baa5969a0" -dependencies = [ - "serde", -] - [[package]] name = "serde_tokenstream" version = "0.1.7" @@ -8969,26 +8974,11 @@ dependencies = [ "tokio", ] -[[package]] -name = "toml" -version = "0.8.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9dd1545e8208b4a5af1aa9bbd0b4cf7e9ea08fabc5d0a5c67fcaafa17433aa3" -dependencies = [ - "serde", - "serde_spanned", - "toml_datetime", - "toml_edit 0.22.12", -] - [[package]] name = "toml_datetime" version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4badfd56924ae69bcc9039335b2e017639ce3f9b001c393c1b2d1ef846ce2cbf" -dependencies = [ - "serde", -] [[package]] name = "toml_edit" @@ -8998,7 +8988,7 @@ checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ "indexmap 2.2.6", "toml_datetime", - "winnow 0.5.40", + "winnow", ] [[package]] @@ -9009,20 +8999,7 @@ checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ "indexmap 2.2.6", "toml_datetime", - "winnow 0.5.40", -] - -[[package]] -name = "toml_edit" -version = "0.22.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3328d4f68a705b2a4498da1d580585d39a6510f98318a2cec3018a7ec61ddef" -dependencies = [ - "indexmap 2.2.6", - "serde", - "serde_spanned", - "toml_datetime", - "winnow 0.6.6", + "winnow", ] [[package]] @@ -9711,15 +9688,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "winnow" -version = "0.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0c976aaaa0e1f90dbb21e9587cdaf1d9679a1cde8875c0d6bd83ab96a208352" -dependencies = [ - "memchr", -] - [[package]] name = "winreg" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index dad12be66..3b0f72afc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,8 @@ members = [ "rs/ic-management-backend", "rs/ic-management-types", "rs/ic-observability/config-writer-common", + "rs/ic-observability/log-noise-filter-backend", + "rs/ic-observability/log-noise-filter-downloader", "rs/ic-observability/multiservice-discovery", "rs/ic-observability/multiservice-discovery-downloader", "rs/ic-observability/multiservice-discovery-shared", @@ -15,11 +17,10 @@ members = [ "rs/ic-observability/service-discovery", "rs/ic-observability/sns-downloader", "rs/log-fetcher", - "rs/log-noise-filter-backend", "rs/canister-log-fetcher", "rs/np-notifications", "rs/rollout-controller", - "rs/slack-notifications", + "rs/slack-notifications", ] resolver = "2" diff --git a/bazel/external_crates.bzl b/bazel/external_crates.bzl index 3e8df8cc1..53544d77b 100644 --- a/bazel/external_crates.bzl +++ b/bazel/external_crates.bzl @@ -51,6 +51,8 @@ def external_crates_repository(): "//rs/ic-management-backend:Cargo.toml", "//rs/ic-management-types:Cargo.toml", "//rs/ic-observability/config-writer-common:Cargo.toml", + "//rs/ic-observability/log-noise-filter-backend:Cargo.toml", + "//rs/ic-observability/log-noise-filter-downloader:Cargo.toml", "//rs/ic-observability/multiservice-discovery:Cargo.toml", "//rs/ic-observability/multiservice-discovery-downloader:Cargo.toml", "//rs/ic-observability/multiservice-discovery-shared:Cargo.toml", @@ -63,7 +65,6 @@ def external_crates_repository(): "//rs/np-notifications:Cargo.toml", "//rs/slack-notifications:Cargo.toml", "//rs/rollout-controller:Cargo.toml", - "//rs/log-noise-filter-backend:Cargo.toml" ], splicing_config = splicing_config( resolver_version = "2", diff --git a/rs/log-noise-filter-backend/BUILD.bazel b/rs/ic-observability/log-noise-filter-backend/BUILD.bazel similarity index 100% rename from rs/log-noise-filter-backend/BUILD.bazel rename to rs/ic-observability/log-noise-filter-backend/BUILD.bazel diff --git a/rs/log-noise-filter-backend/Cargo.toml b/rs/ic-observability/log-noise-filter-backend/Cargo.toml similarity index 97% rename from rs/log-noise-filter-backend/Cargo.toml rename to rs/ic-observability/log-noise-filter-backend/Cargo.toml index 1da98fb41..5f54ec263 100644 --- a/rs/log-noise-filter-backend/Cargo.toml +++ b/rs/ic-observability/log-noise-filter-backend/Cargo.toml @@ -17,4 +17,3 @@ slog-term = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } -toml = "0.8.12" diff --git a/rs/log-noise-filter-backend/src/handlers/delete.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/delete.rs similarity index 100% rename from rs/log-noise-filter-backend/src/handlers/delete.rs rename to rs/ic-observability/log-noise-filter-backend/src/handlers/delete.rs diff --git a/rs/log-noise-filter-backend/src/handlers/get.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/get.rs similarity index 100% rename from rs/log-noise-filter-backend/src/handlers/get.rs rename to rs/ic-observability/log-noise-filter-backend/src/handlers/get.rs diff --git a/rs/log-noise-filter-backend/src/handlers/mod.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs similarity index 100% rename from rs/log-noise-filter-backend/src/handlers/mod.rs rename to rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs diff --git a/rs/log-noise-filter-backend/src/handlers/put.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/put.rs similarity index 100% rename from rs/log-noise-filter-backend/src/handlers/put.rs rename to rs/ic-observability/log-noise-filter-backend/src/handlers/put.rs diff --git a/rs/log-noise-filter-backend/src/main.rs b/rs/ic-observability/log-noise-filter-backend/src/main.rs similarity index 100% rename from rs/log-noise-filter-backend/src/main.rs rename to rs/ic-observability/log-noise-filter-backend/src/main.rs diff --git a/rs/ic-observability/log-noise-filter-downloader/BUILD.bazel b/rs/ic-observability/log-noise-filter-downloader/BUILD.bazel new file mode 100644 index 000000000..b8f0aae21 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-downloader/BUILD.bazel @@ -0,0 +1,25 @@ +load("@crate_index_dre//:defs.bzl", "aliases", "all_crate_deps") +load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_test") +load("@//rs:oci_images.bzl", "rust_binary_oci_image_rules") + +DEPS = [] + +rust_binary( + name = "log-noise-filter-downloader", + srcs = glob(["src/**/*.rs"]), + aliases = aliases(), + proc_macro_deps = all_crate_deps( + proc_macro = True, + ), + stamp = 1, + deps = all_crate_deps( + normal = True, + ) + DEPS, +) + +rust_binary_oci_image_rules( + name = "oci_image", + src = ":log-noise-filter-downloader", + base_image = "@debian-slim" +) + diff --git a/rs/ic-observability/log-noise-filter-downloader/Cargo.toml b/rs/ic-observability/log-noise-filter-downloader/Cargo.toml new file mode 100644 index 000000000..88497da94 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-downloader/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "log-noise-filter-downloader" +version.workspace = true +edition.workspace = true +authors.workspace = true +description.workspace = true +documentation.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +clap = { workspace = true } +slog = { workspace = true } +slog-async = { workspace = true } +slog-term = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +url = { workspace = true } +reqwest = { workspace = true } diff --git a/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs b/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs new file mode 100644 index 000000000..191972507 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs @@ -0,0 +1,101 @@ +use std::{ + collections::{hash_map::DefaultHasher, BTreeMap}, + hash::{Hash, Hasher}, + path::PathBuf, + time::Duration, +}; + +use serde::Serialize; +use slog::{info, warn, Logger}; +use tokio::{io::AsyncWriteExt, select}; +use url::Url; + +pub async fn download_loop(url: Url, logger: Logger, path: PathBuf, inputs: Vec, rate: u64, transform_id: String) { + let client = reqwest::Client::builder() + .connect_timeout(Duration::from_secs(30)) + .build() + .expect("Should be able to build a client"); + + let mut interval = tokio::time::interval(Duration::from_secs(15)); + let mut current_hash = 0; + loop { + select! { + tick = interval.tick() => { + info!(logger, "Running loop @ {:?}", tick); + }, + _ = tokio::signal::ctrl_c() => { + info!(logger, "Received shutdown signal, exiting..."); + break + } + } + + let response = client.get(url.clone()).send().await; + let response = match response { + Ok(r) if r.status().is_success() => r, + Ok(r) => { + warn!( + logger, + "Received error status while downloading: {:?}\n{:?}", + r.status(), + r.text().await.expect("Should have text") + ); + continue; + } + Err(e) => { + warn!(logger, "Error while downloading: {:?}", e); + continue; + } + }; + + let response = match response.json::>().await { + Ok(r) => r, + Err(e) => { + warn!(logger, "Failed to parse response: {:?}", e); + continue; + } + }; + + let mut hasher = DefaultHasher::new(); + response.hash(&mut hasher); + let new_hash = hasher.finish(); + if new_hash == current_hash { + info!(logger, "Hash hasn't changed, skipping"); + continue; + } + + info!(logger, "Hash changed: {} -> {}", current_hash, new_hash); + current_hash = new_hash; + + let response = match response.is_empty() { + true => "r'.*'".to_string(), + false => response.values().map(|s| format!("r'{}'", s)).collect::>().join(","), + }; + + let transform = VectorSampleTransform { + _type: "sample".to_string(), + inputs: inputs.clone(), + key_field: "MESSAGE".to_string(), + rate, + exclude: format!("!match_any(.MESSAGE, [{}])", response), + }; + + let mut transforms = BTreeMap::new(); + transforms.insert(&transform_id, transform); + let mut total = BTreeMap::new(); + total.insert("transforms", transforms); + + let transform = serde_json::to_string_pretty(&total).expect("Should be able to serialize"); + let mut file = tokio::fs::File::create(path.clone()).await.expect("Should be able to create file"); + file.write_all(transform.as_bytes()).await.expect("Should be able to write"); + } +} + +#[derive(Debug, Serialize, Clone)] +struct VectorSampleTransform { + #[serde(rename = "type")] + _type: String, + inputs: Vec, + key_field: String, + rate: u64, + exclude: String, +} diff --git a/rs/ic-observability/log-noise-filter-downloader/src/main.rs b/rs/ic-observability/log-noise-filter-downloader/src/main.rs new file mode 100644 index 000000000..71cbe56ea --- /dev/null +++ b/rs/ic-observability/log-noise-filter-downloader/src/main.rs @@ -0,0 +1,70 @@ +use std::path::PathBuf; + +use clap::Parser; +use download_loop::download_loop; +use slog::{info, o, Drain, Level, Logger}; +use url::Url; + +mod download_loop; + +#[tokio::main] +async fn main() { + let cli = Cli::parse(); + let logger = make_logger(from_str_to_log(&cli.log_level)); + info!(logger, "Running with following args: {:?}", cli); + + download_loop(cli.url, logger, cli.path, cli.inputs, cli.rate, cli.transform_id).await +} + +fn make_logger(level: Level) -> Logger { + let decorator = slog_term::TermDecorator::new().build(); + let full_format = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog::Filter::new(full_format, move |record: &slog::Record| record.level().is_at_least(level)).fuse(); + let drain = slog_async::Async::new(drain).chan_size(8192).build(); + Logger::root(drain.fuse(), o!()) +} + +#[derive(Parser, Debug)] +struct Cli { + #[clap( + long, + default_value = "info", + help = r#" +Log level to use for running. You can use standard log levels 'info', +'critical', 'error', 'warning', 'trace', 'debug' + +"# + )] + log_level: String, + + #[clap(long, help = "Url for the backend")] + url: Url, + + #[clap(long, help = "Path to where the output should be generated")] + path: PathBuf, + + #[clap( + long, + help = "Rate of the matched messages that should be let through. It will be 1/rate", + default_value = "100" + )] + rate: u64, + + #[clap(long, help = "Inputs that will be linked to this transform")] + inputs: Vec, + + #[clap(long, help = "Transform id", default_value = "sample-ic-logs-transform")] + transform_id: String, +} + +fn from_str_to_log(value: &str) -> Level { + match value { + "info" => Level::Info, + "critical" => Level::Critical, + "error" => Level::Error, + "warning" => Level::Warning, + "trace" => Level::Trace, + "debug" => Level::Debug, + _ => panic!("Unsupported level: {}", value), + } +} From 63bee3431ffb97293ca8ef82e55be2d900e2ae28 Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Fri, 28 Jun 2024 11:19:39 +0200 Subject: [PATCH 08/23] repinning --- Cargo.Bazel.lock | 281 +++++++++------------------------------------ k8s/oci_images.bzl | 2 +- rs/oci_images.bzl | 2 +- 3 files changed, 59 insertions(+), 226 deletions(-) diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index b87d71a93..4deecb623 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "077a6edeb56f425cff5d5be0fa78051775b205b6764662613cbb2b00b09f40c1", + "checksum": "850fa358597486f8fa325b70c5deea34b28c57fac48c40e87516719e2363d05a", "crates": { "actix-codec 0.5.2": { "name": "actix-codec", @@ -29223,10 +29223,62 @@ { "id": "tokio 1.38.0", "target": "tokio" + } + ], + "selects": {} + }, + "edition": "2021", + "version": "0.4.1" + }, + "license": null + }, + "log-noise-filter-downloader 0.4.1": { + "name": "log-noise-filter-downloader", + "version": "0.4.1", + "repository": null, + "targets": [], + "library_target_name": null, + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "deps": { + "common": [ + { + "id": "clap 4.5.7", + "target": "clap" }, { - "id": "toml 0.8.12", - "target": "toml" + "id": "reqwest 0.12.5", + "target": "reqwest" + }, + { + "id": "serde 1.0.203", + "target": "serde" + }, + { + "id": "serde_json 1.0.117", + "target": "serde_json" + }, + { + "id": "slog 2.7.0", + "target": "slog" + }, + { + "id": "slog-async 2.8.0", + "target": "slog_async" + }, + { + "id": "slog-term 2.9.1", + "target": "slog_term" + }, + { + "id": "tokio 1.38.0", + "target": "tokio" + }, + { + "id": "url 2.5.2", + "target": "url" } ], "selects": {} @@ -40695,51 +40747,6 @@ }, "license": "MIT OR Apache-2.0" }, - "serde_spanned 0.6.6": { - "name": "serde_spanned", - "version": "0.6.6", - "repository": { - "Http": { - "url": "https://static.crates.io/crates/serde_spanned/0.6.6/download", - "sha256": "79e674e01f999af37c49f70a6ede167a8a60b2503e56c5599532a65baa5969a0" - } - }, - "targets": [ - { - "Library": { - "crate_name": "serde_spanned", - "crate_root": "src/lib.rs", - "srcs": [ - "**/*.rs" - ] - } - } - ], - "library_target_name": "serde_spanned", - "common_attrs": { - "compile_data_glob": [ - "**" - ], - "crate_features": { - "common": [ - "serde" - ], - "selects": {} - }, - "deps": { - "common": [ - { - "id": "serde 1.0.203", - "target": "serde" - } - ], - "selects": {} - }, - "edition": "2021", - "version": "0.6.6" - }, - "license": "MIT OR Apache-2.0" - }, "serde_tokenstream 0.1.7": { "name": "serde_tokenstream", "version": "0.1.7", @@ -45202,65 +45209,6 @@ }, "license": "MIT" }, - "toml 0.8.12": { - "name": "toml", - "version": "0.8.12", - "repository": { - "Http": { - "url": "https://static.crates.io/crates/toml/0.8.12/download", - "sha256": "e9dd1545e8208b4a5af1aa9bbd0b4cf7e9ea08fabc5d0a5c67fcaafa17433aa3" - } - }, - "targets": [ - { - "Library": { - "crate_name": "toml", - "crate_root": "src/lib.rs", - "srcs": [ - "**/*.rs" - ] - } - } - ], - "library_target_name": "toml", - "common_attrs": { - "compile_data_glob": [ - "**" - ], - "crate_features": { - "common": [ - "default", - "display", - "parse" - ], - "selects": {} - }, - "deps": { - "common": [ - { - "id": "serde 1.0.203", - "target": "serde" - }, - { - "id": "serde_spanned 0.6.6", - "target": "serde_spanned" - }, - { - "id": "toml_datetime 0.6.6", - "target": "toml_datetime" - }, - { - "id": "toml_edit 0.22.12", - "target": "toml_edit" - } - ], - "selects": {} - }, - "edition": "2021", - "version": "0.8.12" - }, - "license": "MIT OR Apache-2.0" - }, "toml_datetime 0.6.6": { "name": "toml_datetime", "version": "0.6.6", @@ -45286,21 +45234,6 @@ "compile_data_glob": [ "**" ], - "crate_features": { - "common": [ - "serde" - ], - "selects": {} - }, - "deps": { - "common": [ - { - "id": "serde 1.0.203", - "target": "serde" - } - ], - "selects": {} - }, "edition": "2021", "version": "0.6.6" }, @@ -45406,69 +45339,6 @@ }, "license": "MIT OR Apache-2.0" }, - "toml_edit 0.22.12": { - "name": "toml_edit", - "version": "0.22.12", - "repository": { - "Http": { - "url": "https://static.crates.io/crates/toml_edit/0.22.12/download", - "sha256": "d3328d4f68a705b2a4498da1d580585d39a6510f98318a2cec3018a7ec61ddef" - } - }, - "targets": [ - { - "Library": { - "crate_name": "toml_edit", - "crate_root": "src/lib.rs", - "srcs": [ - "**/*.rs" - ] - } - } - ], - "library_target_name": "toml_edit", - "common_attrs": { - "compile_data_glob": [ - "**" - ], - "crate_features": { - "common": [ - "display", - "parse", - "serde" - ], - "selects": {} - }, - "deps": { - "common": [ - { - "id": "indexmap 2.2.6", - "target": "indexmap" - }, - { - "id": "serde 1.0.203", - "target": "serde" - }, - { - "id": "serde_spanned 0.6.6", - "target": "serde_spanned" - }, - { - "id": "toml_datetime 0.6.6", - "target": "toml_datetime" - }, - { - "id": "winnow 0.6.6", - "target": "winnow" - } - ], - "selects": {} - }, - "edition": "2021", - "version": "0.22.12" - }, - "license": "MIT OR Apache-2.0" - }, "tonic 0.11.0": { "name": "tonic", "version": "0.11.0", @@ -49482,44 +49352,6 @@ }, "license": "MIT" }, - "winnow 0.6.6": { - "name": "winnow", - "version": "0.6.6", - "repository": { - "Http": { - "url": "https://static.crates.io/crates/winnow/0.6.6/download", - "sha256": "f0c976aaaa0e1f90dbb21e9587cdaf1d9679a1cde8875c0d6bd83ab96a208352" - } - }, - "targets": [ - { - "Library": { - "crate_name": "winnow", - "crate_root": "src/lib.rs", - "srcs": [ - "**/*.rs" - ] - } - } - ], - "library_target_name": "winnow", - "common_attrs": { - "compile_data_glob": [ - "**" - ], - "crate_features": { - "common": [ - "alloc", - "default", - "std" - ], - "selects": {} - }, - "edition": "2021", - "version": "0.6.6" - }, - "license": "MIT" - }, "winreg 0.52.0": { "name": "winreg", "version": "0.52.0", @@ -51058,7 +50890,8 @@ "ic-management-backend 0.4.1": "rs/ic-management-backend", "ic-management-types 0.4.1": "rs/ic-management-types", "log-fetcher 0.4.1": "rs/log-fetcher", - "log-noise-filter-backend 0.4.1": "rs/log-noise-filter-backend", + "log-noise-filter-backend 0.4.1": "rs/ic-observability/log-noise-filter-backend", + "log-noise-filter-downloader 0.4.1": "rs/ic-observability/log-noise-filter-downloader", "multiservice-discovery 0.4.1": "rs/ic-observability/multiservice-discovery", "multiservice-discovery-downloader 0.4.1": "rs/ic-observability/multiservice-discovery-downloader", "multiservice-discovery-shared 0.4.1": "rs/ic-observability/multiservice-discovery-shared", diff --git a/k8s/oci_images.bzl b/k8s/oci_images.bzl index 78746af82..0b47a85ed 100644 --- a/k8s/oci_images.bzl +++ b/k8s/oci_images.bzl @@ -39,7 +39,7 @@ def python_oci_image_rules(name, src, base_image = "@distroless_python3"): } ) - tarball_name = "{}-tarball".format(binary.name) + tarball_name = "tarball".format(binary.name) oci_tarball( name = tarball_name, image = image_rule_name, diff --git a/rs/oci_images.bzl b/rs/oci_images.bzl index 1151f3b0f..f18ffdebf 100644 --- a/rs/oci_images.bzl +++ b/rs/oci_images.bzl @@ -30,7 +30,7 @@ def rust_binary_oci_image_rules(name, src, base_image = "@distroless_cc_debian12 tars = [tar_rule_name] + other_layers, ) - tarball_name = "{}-tarball".format(binary.name) + tarball_name = "tarball".format(binary.name) oci_tarball( name = tarball_name, image = image_rule_name, From 81ea2ca77f4e52207065264e50813abf14c12cbb Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Fri, 28 Jun 2024 13:46:08 +0200 Subject: [PATCH 09/23] fixing bugs --- .../log-noise-filter-backend/src/handlers/mod.rs | 3 ++- .../log-noise-filter-downloader/src/download_loop.rs | 10 ++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs index aa0a9c0b3..603ac128f 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs @@ -54,8 +54,9 @@ impl Server { }); } - pub async fn delete_criteria(&self, indexes: Vec) -> Result<(), Vec> { + pub async fn delete_criteria(&self, mut indexes: Vec) -> Result<(), Vec> { let mut server_criteria = self.criteria.lock().await; + indexes.sort_by(|a, b| b.cmp(a)); let missing = indexes .iter() .filter_map(|c| match server_criteria.get(*c as usize) { diff --git a/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs b/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs index 191972507..915bd4b84 100644 --- a/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs +++ b/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs @@ -66,17 +66,16 @@ pub async fn download_loop(url: Url, logger: Logger, path: PathBuf, inputs: Vec< info!(logger, "Hash changed: {} -> {}", current_hash, new_hash); current_hash = new_hash; - let response = match response.is_empty() { - true => "r'.*'".to_string(), - false => response.values().map(|s| format!("r'{}'", s)).collect::>().join(","), + let (prefix, criteria) = match response.is_empty() { + true => ("", "r'.*'".to_string()), + false => ("!", response.values().map(|s| format!("r'{}'", s)).collect::>().join(",")), }; let transform = VectorSampleTransform { _type: "sample".to_string(), inputs: inputs.clone(), - key_field: "MESSAGE".to_string(), rate, - exclude: format!("!match_any(.MESSAGE, [{}])", response), + exclude: format!("{}match_any(to_string(.MESSAGE) ?? \"\", [{}])", prefix, criteria), }; let mut transforms = BTreeMap::new(); @@ -95,7 +94,6 @@ struct VectorSampleTransform { #[serde(rename = "type")] _type: String, inputs: Vec, - key_field: String, rate: u64, exclude: String, } From 8abcc8e19a83f1e925e36669dc9659f623fecf3b Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Fri, 28 Jun 2024 16:26:19 +0200 Subject: [PATCH 10/23] ensuring a buildable regex --- Cargo.Bazel.lock | 6 +++++- Cargo.lock | 1 + .../log-noise-filter-backend/Cargo.toml | 1 + .../src/handlers/mod.rs | 20 +++++++++++++++---- .../src/handlers/put.rs | 7 ++++--- 5 files changed, 27 insertions(+), 8 deletions(-) diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index 4deecb623..feb70ecb9 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "850fa358597486f8fa325b70c5deea34b28c57fac48c40e87516719e2363d05a", + "checksum": "5180c9e3a4b69b259750b2b60f1b56d3667a0545409170e914da734999599b2e", "crates": { "actix-codec 0.5.2": { "name": "actix-codec", @@ -29200,6 +29200,10 @@ "id": "clap 4.5.7", "target": "clap" }, + { + "id": "regex 1.10.5", + "target": "regex" + }, { "id": "serde 1.0.203", "target": "serde" diff --git a/Cargo.lock b/Cargo.lock index 280eca59e..e89a73845 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5865,6 +5865,7 @@ version = "0.4.1" dependencies = [ "axum 0.7.5", "clap 4.5.7", + "regex", "serde", "serde_json", "slog", diff --git a/rs/ic-observability/log-noise-filter-backend/Cargo.toml b/rs/ic-observability/log-noise-filter-backend/Cargo.toml index 5f54ec263..1a0446559 100644 --- a/rs/ic-observability/log-noise-filter-backend/Cargo.toml +++ b/rs/ic-observability/log-noise-filter-backend/Cargo.toml @@ -17,3 +17,4 @@ slog-term = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } +regex = { workspace = true } diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs index 603ac128f..2efcd59e7 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs @@ -3,6 +3,7 @@ use std::{collections::BTreeMap, net::SocketAddr, sync::Arc}; use axum::{routing::delete, Router}; use delete::delete_criteria; use get::get_criteria; +use regex::Regex; use slog::Logger; use tokio::sync::Mutex; @@ -47,11 +48,22 @@ impl Server { criteria.iter().enumerate().map(|(i, s)| (i as u32, s.clone())).collect() } - pub async fn update_criteria(&self, criteria: Vec) { + pub async fn update_criteria(&self, mut criteria: Vec) -> Result<(), Vec> { let mut server_criteria = self.criteria.lock().await; - criteria.iter().for_each(|c| { - server_criteria.push(c.to_string()); - }); + let mut errors = vec![]; + for c in criteria.iter_mut() { + *c = c.replace("\\", "\\\\"); + *c = c.replace("'", "\\'"); + if let Err(e) = Regex::new(c) { + errors.push(e.to_string()); + } + } + + if !errors.is_empty() { + return Err(errors); + } + criteria.into_iter().for_each(|c| server_criteria.push(c)); + Ok(()) } pub async fn delete_criteria(&self, mut indexes: Vec) -> Result<(), Vec> { diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/put.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/put.rs index ac5e11c22..f5bfb67be 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/handlers/put.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/put.rs @@ -5,7 +5,8 @@ use axum::{extract::State, http::StatusCode, Json}; use super::Server; pub async fn update(State(state): State, Json(criteria): Json>) -> Result>, (StatusCode, String)> { - state.update_criteria(criteria).await; - - Ok(Json(state.get_criteria_mapped().await)) + match state.update_criteria(criteria).await { + Ok(()) => Ok(Json(state.get_criteria_mapped().await)), + Err(v) => Err((StatusCode::BAD_REQUEST, format!("Invalid instances of regex: {:?}", v))), + } } From c13148757a8b2e2bed8f1ff606644c3bb87c1488 Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Fri, 28 Jun 2024 17:02:17 +0200 Subject: [PATCH 11/23] clippy --- Cargo.lock | 1 + .../src/handlers/mod.rs | 4 +- .../log-noise-filter-downloader/Cargo.toml | 1 + .../src/download_loop.rs | 117 +++++++++++------- 4 files changed, 73 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e89a73845..a4064de30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5878,6 +5878,7 @@ dependencies = [ name = "log-noise-filter-downloader" version = "0.4.1" dependencies = [ + "anyhow", "clap 4.5.7", "reqwest", "serde", diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs index 2efcd59e7..546f3763a 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs @@ -52,8 +52,8 @@ impl Server { let mut server_criteria = self.criteria.lock().await; let mut errors = vec![]; for c in criteria.iter_mut() { - *c = c.replace("\\", "\\\\"); - *c = c.replace("'", "\\'"); + *c = c.replace('\\', "\\\\"); + *c = c.replace('\'', "\\'"); if let Err(e) = Regex::new(c) { errors.push(e.to_string()); } diff --git a/rs/ic-observability/log-noise-filter-downloader/Cargo.toml b/rs/ic-observability/log-noise-filter-downloader/Cargo.toml index 88497da94..ee9d315a4 100644 --- a/rs/ic-observability/log-noise-filter-downloader/Cargo.toml +++ b/rs/ic-observability/log-noise-filter-downloader/Cargo.toml @@ -18,3 +18,4 @@ serde_json = { workspace = true } tokio = { workspace = true } url = { workspace = true } reqwest = { workspace = true } +anyhow = { workspace = true } diff --git a/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs b/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs index 915bd4b84..d0fd9ceb4 100644 --- a/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs +++ b/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs @@ -5,6 +5,7 @@ use std::{ time::Duration, }; +use reqwest::Client; use serde::Serialize; use slog::{info, warn, Logger}; use tokio::{io::AsyncWriteExt, select}; @@ -29,64 +30,84 @@ pub async fn download_loop(url: Url, logger: Logger, path: PathBuf, inputs: Vec< } } - let response = client.get(url.clone()).send().await; - let response = match response { - Ok(r) if r.status().is_success() => r, - Ok(r) => { - warn!( - logger, - "Received error status while downloading: {:?}\n{:?}", - r.status(), - r.text().await.expect("Should have text") - ); - continue; - } - Err(e) => { - warn!(logger, "Error while downloading: {:?}", e); - continue; - } - }; - - let response = match response.json::>().await { + let response = match fetch_criteria(&client, url.clone(), &logger).await { Ok(r) => r, - Err(e) => { - warn!(logger, "Failed to parse response: {:?}", e); - continue; - } + Err(_) => continue, }; - let mut hasher = DefaultHasher::new(); - response.hash(&mut hasher); - let new_hash = hasher.finish(); - if new_hash == current_hash { - info!(logger, "Hash hasn't changed, skipping"); + if !content_changed(&mut current_hash, &response, &logger) { continue; } - info!(logger, "Hash changed: {} -> {}", current_hash, new_hash); - current_hash = new_hash; - - let (prefix, criteria) = match response.is_empty() { - true => ("", "r'.*'".to_string()), - false => ("!", response.values().map(|s| format!("r'{}'", s)).collect::>().join(",")), - }; + write_to_file(&response, &inputs, rate, &transform_id, &path).await + } +} - let transform = VectorSampleTransform { - _type: "sample".to_string(), - inputs: inputs.clone(), - rate, - exclude: format!("{}match_any(to_string(.MESSAGE) ?? \"\", [{}])", prefix, criteria), - }; +async fn fetch_criteria(client: &Client, url: Url, logger: &Logger) -> anyhow::Result> { + let response = client.get(url.clone()).send().await; + let response = match response { + Ok(r) if r.status().is_success() => r, + Ok(r) => { + warn!( + logger, + "Received error status while downloading: {:?}\n{:?}", + r.status(), + r.text().await.expect("Should have text") + ); + anyhow::bail!(""); + } + Err(e) => { + warn!(logger, "Error while downloading: {:?}", e); + anyhow::bail!(""); + } + }; - let mut transforms = BTreeMap::new(); - transforms.insert(&transform_id, transform); - let mut total = BTreeMap::new(); - total.insert("transforms", transforms); + match response.json::>().await { + Ok(r) => Ok(r), + Err(e) => { + warn!(logger, "Failed to parse response: {:?}", e); + anyhow::bail!(""); + } + } +} - let transform = serde_json::to_string_pretty(&total).expect("Should be able to serialize"); - let mut file = tokio::fs::File::create(path.clone()).await.expect("Should be able to create file"); - file.write_all(transform.as_bytes()).await.expect("Should be able to write"); +fn content_changed(current_hash: &mut u64, new_criteria: &BTreeMap, logger: &Logger) -> bool { + let mut hasher = DefaultHasher::new(); + new_criteria.hash(&mut hasher); + let new_hash = hasher.finish(); + if &new_hash == current_hash { + info!(logger, "Hash hasn't changed, skipping"); + return false; } + + info!(logger, "Hash changed: {} -> {}", current_hash, new_hash); + *current_hash = new_hash; + true +} + +async fn write_to_file(criteria: &BTreeMap, inputs: &[String], rate: u64, transform_id: &String, path: &PathBuf) { + let (prefix, criteria) = match criteria.is_empty() { + // If the list is empty we should not sample anything. + // In other words we should exclude everything. + true => ("", "r'.*'".to_string()), + false => ("!", criteria.values().map(|s| format!("r'{}'", s)).collect::>().join(",")), + }; + + let transform = VectorSampleTransform { + _type: "sample".to_string(), + inputs: inputs.to_owned(), + rate, + exclude: format!("{}match_any(to_string(.MESSAGE) ?? \"\", [{}])", prefix, criteria), + }; + + let mut transforms = BTreeMap::new(); + transforms.insert(transform_id, transform); + let mut total = BTreeMap::new(); + total.insert("transforms", transforms); + + let transform = serde_json::to_string_pretty(&total).expect("Should be able to serialize"); + let mut file = tokio::fs::File::create(path).await.expect("Should be able to create file"); + file.write_all(transform.as_bytes()).await.expect("Should be able to write"); } #[derive(Debug, Serialize, Clone)] From d24aace01013a690e3cfc658f250d259d01bc4cb Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Mon, 1 Jul 2024 11:03:35 +0200 Subject: [PATCH 12/23] rolling back changes for bazel --- .pre-commit-config.yaml | 2 +- k8s/oci_images.bzl | 2 +- rs/oci_images.bzl | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 571141bf0..644a85eb1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -67,7 +67,7 @@ repos: - --match=.* - repo: https://github.com/PyCQA/pylint - rev: v2.17.7 + rev: v3.2.5 hooks: - id: pylint name: pylint diff --git a/k8s/oci_images.bzl b/k8s/oci_images.bzl index 0b47a85ed..78746af82 100644 --- a/k8s/oci_images.bzl +++ b/k8s/oci_images.bzl @@ -39,7 +39,7 @@ def python_oci_image_rules(name, src, base_image = "@distroless_python3"): } ) - tarball_name = "tarball".format(binary.name) + tarball_name = "{}-tarball".format(binary.name) oci_tarball( name = tarball_name, image = image_rule_name, diff --git a/rs/oci_images.bzl b/rs/oci_images.bzl index f18ffdebf..1151f3b0f 100644 --- a/rs/oci_images.bzl +++ b/rs/oci_images.bzl @@ -30,7 +30,7 @@ def rust_binary_oci_image_rules(name, src, base_image = "@distroless_cc_debian12 tars = [tar_rule_name] + other_layers, ) - tarball_name = "tarball".format(binary.name) + tarball_name = "{}-tarball".format(binary.name) oci_tarball( name = tarball_name, image = image_rule_name, From f8e3c26cf5cc9ea2d1be0749b7bcc70491a477c9 Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Mon, 1 Jul 2024 11:19:49 +0200 Subject: [PATCH 13/23] rolling back pre-commit fix --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 644a85eb1..571141bf0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -67,7 +67,7 @@ repos: - --match=.* - repo: https://github.com/PyCQA/pylint - rev: v3.2.5 + rev: v2.17.7 hooks: - id: pylint name: pylint From 1fbe9aed7f8e4e835298564f2da5d1c869bb54dc Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Mon, 1 Jul 2024 11:44:57 +0200 Subject: [PATCH 14/23] applying suggestions --- Cargo.Bazel.lock | 2 +- Cargo.toml | 1 + .../log-noise-filter-backend/Cargo.toml | 2 +- .../log-noise-filter-backend/src/handlers/delete.rs | 13 ++++++++++--- .../multiservice-discovery/Cargo.toml | 2 +- 5 files changed, 14 insertions(+), 6 deletions(-) diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index dcb85559d..8b7882f24 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "24114f0ea682762469e382f2de27cc6ac7c4cc2499f58e73623fb7837eea9e8b", + "checksum": "593d35dcf7fdec142a6998ac7c522505e1712c654c56e965c17926bf55375f58", "crates": { "actix-codec 0.5.2": { "name": "actix-codec", diff --git a/Cargo.toml b/Cargo.toml index 2f0eed760..fea8c62f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ async-recursion = "1.1.1" async-timer = "0.7.4" async-trait = "0.1.80" axum-otel-metrics = "0.8.1" +axum = "0.7.5" backoff = { version = "0.4.0", features = ["tokio"] } backon = "0.4.4" candid = "0.10.9" diff --git a/rs/ic-observability/log-noise-filter-backend/Cargo.toml b/rs/ic-observability/log-noise-filter-backend/Cargo.toml index 1a0446559..cb6b63bab 100644 --- a/rs/ic-observability/log-noise-filter-backend/Cargo.toml +++ b/rs/ic-observability/log-noise-filter-backend/Cargo.toml @@ -10,7 +10,7 @@ documentation.workspace = true [dependencies] clap = { workspace = true } -axum = "0.7.4" +axum = { workspace = true } slog = { workspace = true } slog-async = { workspace = true } slog-term = { workspace = true } diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/delete.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/delete.rs index 5182346cd..24a39a026 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/handlers/delete.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/delete.rs @@ -1,6 +1,7 @@ use std::collections::BTreeMap; use axum::{extract::State, http::StatusCode, Json}; +use slog::{info, warn}; use super::Server; @@ -8,8 +9,14 @@ pub async fn delete_criteria( State(state): State, Json(criteria): Json>, ) -> Result>, (StatusCode, String)> { - match state.delete_criteria(criteria).await { - Ok(()) => Ok(Json(state.get_criteria_mapped().await)), - Err(missing) => Err((StatusCode::NOT_FOUND, format!("Missing indexes: {:?}", missing))), + match state.delete_criteria(criteria.clone()).await { + Ok(()) => { + info!(state.logger, "Deleted criteria"; "indexes" => ?criteria); + Ok(Json(state.get_criteria_mapped().await)) + } + Err(missing) => { + warn!(state.logger, "Failed to delete criteria"; "indexes" => ?missing); + Err((StatusCode::NOT_FOUND, format!("Missing indexes: {:?}", missing))) + } } } diff --git a/rs/ic-observability/multiservice-discovery/Cargo.toml b/rs/ic-observability/multiservice-discovery/Cargo.toml index c5d8e65ae..664121dea 100644 --- a/rs/ic-observability/multiservice-discovery/Cargo.toml +++ b/rs/ic-observability/multiservice-discovery/Cargo.toml @@ -27,7 +27,7 @@ slog-term = { workspace = true } tokio = { workspace = true } url = { workspace = true } futures.workspace = true -axum = "0.7.5" +axum = { workspace = true } axum-otel-metrics.workspace = true opentelemetry.workspace = true retry = { workspace = true } From 4b4c0372cc540b0e8d4ab0fccbe78857c648c82a Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Mon, 1 Jul 2024 12:43:44 +0200 Subject: [PATCH 15/23] refactoring --- Cargo.Bazel.lock | 16 +++++-- Cargo.lock | 1 + .../log-noise-filter-backend/Cargo.toml | 8 ++++ .../delete_criteria.rs} | 2 +- .../{get.rs => criteria/get_criteria.rs} | 2 +- .../src/handlers/criteria/mod.rs | 3 ++ .../{put.rs => criteria/post_criteria.rs} | 2 +- .../src/handlers/get_all.rs | 28 +++++++++++++ .../src/handlers/mod.rs | 42 +++++++++++++------ .../src/handlers/rate/get_rate.rs | 7 ++++ .../src/handlers/rate/mod.rs | 2 + .../src/handlers/rate/put_rate.rs | 7 ++++ .../log-noise-filter-backend/src/lib.rs | 1 + .../log-noise-filter-backend/src/main.rs | 5 ++- .../log-noise-filter-downloader/Cargo.toml | 1 + .../src/download_loop.rs | 22 ++++++---- .../log-noise-filter-downloader/src/main.rs | 9 +--- 17 files changed, 121 insertions(+), 37 deletions(-) rename rs/ic-observability/log-noise-filter-backend/src/handlers/{delete.rs => criteria/delete_criteria.rs} (96%) rename rs/ic-observability/log-noise-filter-backend/src/handlers/{get.rs => criteria/get_criteria.rs} (90%) create mode 100644 rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/mod.rs rename rs/ic-observability/log-noise-filter-backend/src/handlers/{put.rs => criteria/post_criteria.rs} (93%) create mode 100644 rs/ic-observability/log-noise-filter-backend/src/handlers/get_all.rs create mode 100644 rs/ic-observability/log-noise-filter-backend/src/handlers/rate/get_rate.rs create mode 100644 rs/ic-observability/log-noise-filter-backend/src/handlers/rate/mod.rs create mode 100644 rs/ic-observability/log-noise-filter-backend/src/handlers/rate/put_rate.rs create mode 100644 rs/ic-observability/log-noise-filter-backend/src/lib.rs diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index 8b7882f24..20b1940bd 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "593d35dcf7fdec142a6998ac7c522505e1712c654c56e965c17926bf55375f58", + "checksum": "373e54054840c62d3438f63f1494707a66f3be25365a3abb2d53c837a692be7b", "crates": { "actix-codec 0.5.2": { "name": "actix-codec", @@ -29188,8 +29188,18 @@ "name": "log-noise-filter-backend", "version": "0.4.2", "repository": null, - "targets": [], - "library_target_name": null, + "targets": [ + { + "Library": { + "crate_name": "log_noise_filter_backend", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "log_noise_filter_backend", "common_attrs": { "compile_data_glob": [ "**" diff --git a/Cargo.lock b/Cargo.lock index 759f87258..f1bee657c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5881,6 +5881,7 @@ version = "0.4.2" dependencies = [ "anyhow", "clap 4.5.7", + "log-noise-filter-backend", "reqwest", "serde", "serde_json", diff --git a/rs/ic-observability/log-noise-filter-backend/Cargo.toml b/rs/ic-observability/log-noise-filter-backend/Cargo.toml index cb6b63bab..1d57a629b 100644 --- a/rs/ic-observability/log-noise-filter-backend/Cargo.toml +++ b/rs/ic-observability/log-noise-filter-backend/Cargo.toml @@ -18,3 +18,11 @@ serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } regex = { workspace = true } + +[[bin]] +name = "log-noise-filter-backend" +path = "src/main.rs" + +[lib] +name = "log_noise_filter_backend" +path = "src/lib.rs" diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/delete.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/delete_criteria.rs similarity index 96% rename from rs/ic-observability/log-noise-filter-backend/src/handlers/delete.rs rename to rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/delete_criteria.rs index 24a39a026..cca1992d6 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/handlers/delete.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/delete_criteria.rs @@ -3,7 +3,7 @@ use std::collections::BTreeMap; use axum::{extract::State, http::StatusCode, Json}; use slog::{info, warn}; -use super::Server; +use crate::handlers::Server; pub async fn delete_criteria( State(state): State, diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/get.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/get_criteria.rs similarity index 90% rename from rs/ic-observability/log-noise-filter-backend/src/handlers/get.rs rename to rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/get_criteria.rs index ee7e5c9d4..7d1bcd023 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/handlers/get.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/get_criteria.rs @@ -3,7 +3,7 @@ use std::collections::BTreeMap; use axum::http::StatusCode; use axum::{extract::State, Json}; -use super::Server; +use crate::handlers::Server; pub(crate) async fn get_criteria(State(state): State) -> Result>, (StatusCode, String)> { Ok(Json(state.get_criteria_mapped().await)) diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/mod.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/mod.rs new file mode 100644 index 000000000..d13c834d8 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/mod.rs @@ -0,0 +1,3 @@ +pub mod delete_criteria; +pub mod get_criteria; +pub mod post_criteria; diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/put.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/post_criteria.rs similarity index 93% rename from rs/ic-observability/log-noise-filter-backend/src/handlers/put.rs rename to rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/post_criteria.rs index f5bfb67be..2eebe9683 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/handlers/put.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/criteria/post_criteria.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use axum::{extract::State, http::StatusCode, Json}; -use super::Server; +use crate::handlers::Server; pub async fn update(State(state): State, Json(criteria): Json>) -> Result>, (StatusCode, String)> { match state.update_criteria(criteria).await { diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/get_all.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/get_all.rs new file mode 100644 index 000000000..5b297b95f --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/get_all.rs @@ -0,0 +1,28 @@ +use std::{collections::BTreeMap, hash::Hash}; + +use axum::{extract::State, http::StatusCode, Json}; +use serde::{Deserialize, Serialize}; + +use super::Server; + +pub async fn get_all(State(server): State) -> Result, (StatusCode, String)> { + let state = WholeState { + criteria: server.get_criteria_mapped().await, + rate: server.get_rate().await, + }; + + Ok(Json(state)) +} + +#[derive(Serialize, Deserialize)] +pub struct WholeState { + pub rate: u64, + pub criteria: BTreeMap, +} + +impl Hash for WholeState { + fn hash(&self, state: &mut H) { + self.rate.hash(state); + self.criteria.hash(state); + } +} diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs index 546f3763a..be612a9e2 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs @@ -1,38 +1,45 @@ use std::{collections::BTreeMap, net::SocketAddr, sync::Arc}; -use axum::{routing::delete, Router}; -use delete::delete_criteria; -use get::get_criteria; +use axum::{ + routing::{delete, get, post, put}, + Router, +}; +use criteria::{delete_criteria::delete_criteria, get_criteria::get_criteria, post_criteria::update}; + +use get_all::get_all; +use rate::{get_rate::get_rate, put_rate::put_rate}; use regex::Regex; use slog::Logger; use tokio::sync::Mutex; -use self::put::update; -use axum::routing::{get, put}; - -mod delete; -mod get; -mod put; +mod criteria; +pub mod get_all; +mod rate; #[derive(Clone)] pub struct Server { pub logger: Logger, criteria: Arc>>, + rate: Arc>, } impl Server { - pub fn new(logger: Logger) -> Self { + pub fn new(logger: Logger, rate: u64) -> Self { Self { logger, criteria: Arc::new(Mutex::new(vec![])), + rate: Arc::new(Mutex::new(rate)), } } pub async fn run(&self, socket: SocketAddr) { let app = Router::new() - .route("/", get(get_criteria)) - .route("/", put(update)) - .route("/", delete(delete_criteria)) + .route("/criteria", get(get_criteria)) + .route("/criteria", post(update)) + .route("/criteria", delete(delete_criteria)) + .route("/rate", get(get_rate)) + .route("/rate", put(put_rate)) + .route("/", get(get_all)) .with_state(self.clone()); let listener = tokio::net::TcpListener::bind(socket).await.unwrap(); axum::serve(listener, app) @@ -48,6 +55,10 @@ impl Server { criteria.iter().enumerate().map(|(i, s)| (i as u32, s.clone())).collect() } + pub async fn get_rate(&self) -> u64 { + *self.rate.lock().await + } + pub async fn update_criteria(&self, mut criteria: Vec) -> Result<(), Vec> { let mut server_criteria = self.criteria.lock().await; let mut errors = vec![]; @@ -66,6 +77,11 @@ impl Server { Ok(()) } + pub async fn update_rate(&self, rate: u64) -> u64 { + *self.rate.lock().await = rate; + rate + } + pub async fn delete_criteria(&self, mut indexes: Vec) -> Result<(), Vec> { let mut server_criteria = self.criteria.lock().await; indexes.sort_by(|a, b| b.cmp(a)); diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/rate/get_rate.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/rate/get_rate.rs new file mode 100644 index 000000000..44133482b --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/rate/get_rate.rs @@ -0,0 +1,7 @@ +use axum::{extract::State, http::StatusCode, Json}; + +use crate::handlers::Server; + +pub(crate) async fn get_rate(State(state): State) -> Result, (StatusCode, String)> { + Ok(Json(state.get_rate().await)) +} diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/rate/mod.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/rate/mod.rs new file mode 100644 index 000000000..15b230dd4 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/rate/mod.rs @@ -0,0 +1,2 @@ +pub mod get_rate; +pub mod put_rate; diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/rate/put_rate.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/rate/put_rate.rs new file mode 100644 index 000000000..aee63f779 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/rate/put_rate.rs @@ -0,0 +1,7 @@ +use axum::{extract::State, http::StatusCode, Json}; + +use crate::handlers::Server; + +pub async fn put_rate(State(state): State, Json(rate): Json) -> Result, (StatusCode, String)> { + Ok(Json(state.update_rate(rate).await)) +} diff --git a/rs/ic-observability/log-noise-filter-backend/src/lib.rs b/rs/ic-observability/log-noise-filter-backend/src/lib.rs new file mode 100644 index 000000000..c3d449565 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/lib.rs @@ -0,0 +1 @@ +pub mod handlers; diff --git a/rs/ic-observability/log-noise-filter-backend/src/main.rs b/rs/ic-observability/log-noise-filter-backend/src/main.rs index 684d1fda9..815ea609c 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/main.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/main.rs @@ -16,7 +16,7 @@ async fn main() { let socket = SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::UNSPECIFIED), cli.port); info!(logger, "Running noise filter manager {}", socket); - let server = Server::new(logger.clone()); + let server = Server::new(logger.clone(), cli.global_rate); server.run(socket).await; info!(logger, "Noise filter manager stopped"); @@ -45,6 +45,9 @@ Log level to use for running. You can use standard log levels 'info', #[clap(long, default_value = "8080", help = "Port to use for running the api")] port: u16, + + #[clap(long, default_value = "1500", help = "Global rate")] + global_rate: u64, } fn from_str_to_log(value: &str) -> Level { diff --git a/rs/ic-observability/log-noise-filter-downloader/Cargo.toml b/rs/ic-observability/log-noise-filter-downloader/Cargo.toml index ee9d315a4..7dc9d961a 100644 --- a/rs/ic-observability/log-noise-filter-downloader/Cargo.toml +++ b/rs/ic-observability/log-noise-filter-downloader/Cargo.toml @@ -19,3 +19,4 @@ tokio = { workspace = true } url = { workspace = true } reqwest = { workspace = true } anyhow = { workspace = true } +log-noise-filter-backend = { path = "../log-noise-filter-backend" } diff --git a/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs b/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs index d0fd9ceb4..e6998f368 100644 --- a/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs +++ b/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs @@ -5,13 +5,14 @@ use std::{ time::Duration, }; +use log_noise_filter_backend::handlers::get_all::WholeState; use reqwest::Client; use serde::Serialize; use slog::{info, warn, Logger}; use tokio::{io::AsyncWriteExt, select}; use url::Url; -pub async fn download_loop(url: Url, logger: Logger, path: PathBuf, inputs: Vec, rate: u64, transform_id: String) { +pub async fn download_loop(url: Url, logger: Logger, path: PathBuf, inputs: Vec, transform_id: String) { let client = reqwest::Client::builder() .connect_timeout(Duration::from_secs(30)) .build() @@ -39,11 +40,11 @@ pub async fn download_loop(url: Url, logger: Logger, path: PathBuf, inputs: Vec< continue; } - write_to_file(&response, &inputs, rate, &transform_id, &path).await + write_to_file(&response, &inputs, &transform_id, &path).await } } -async fn fetch_criteria(client: &Client, url: Url, logger: &Logger) -> anyhow::Result> { +async fn fetch_criteria(client: &Client, url: Url, logger: &Logger) -> anyhow::Result { let response = client.get(url.clone()).send().await; let response = match response { Ok(r) if r.status().is_success() => r, @@ -62,7 +63,7 @@ async fn fetch_criteria(client: &Client, url: Url, logger: &Logger) -> anyhow::R } }; - match response.json::>().await { + match response.json().await { Ok(r) => Ok(r), Err(e) => { warn!(logger, "Failed to parse response: {:?}", e); @@ -71,7 +72,7 @@ async fn fetch_criteria(client: &Client, url: Url, logger: &Logger) -> anyhow::R } } -fn content_changed(current_hash: &mut u64, new_criteria: &BTreeMap, logger: &Logger) -> bool { +fn content_changed(current_hash: &mut u64, new_criteria: &WholeState, logger: &Logger) -> bool { let mut hasher = DefaultHasher::new(); new_criteria.hash(&mut hasher); let new_hash = hasher.finish(); @@ -85,18 +86,21 @@ fn content_changed(current_hash: &mut u64, new_criteria: &BTreeMap, true } -async fn write_to_file(criteria: &BTreeMap, inputs: &[String], rate: u64, transform_id: &String, path: &PathBuf) { - let (prefix, criteria) = match criteria.is_empty() { +async fn write_to_file(state: &WholeState, inputs: &[String], transform_id: &String, path: &PathBuf) { + let (prefix, criteria) = match state.criteria.is_empty() { // If the list is empty we should not sample anything. // In other words we should exclude everything. true => ("", "r'.*'".to_string()), - false => ("!", criteria.values().map(|s| format!("r'{}'", s)).collect::>().join(",")), + false => ( + "!", + state.criteria.values().map(|s| format!("r'{}'", s)).collect::>().join(","), + ), }; let transform = VectorSampleTransform { _type: "sample".to_string(), inputs: inputs.to_owned(), - rate, + rate: state.rate, exclude: format!("{}match_any(to_string(.MESSAGE) ?? \"\", [{}])", prefix, criteria), }; diff --git a/rs/ic-observability/log-noise-filter-downloader/src/main.rs b/rs/ic-observability/log-noise-filter-downloader/src/main.rs index 71cbe56ea..f20d948d0 100644 --- a/rs/ic-observability/log-noise-filter-downloader/src/main.rs +++ b/rs/ic-observability/log-noise-filter-downloader/src/main.rs @@ -13,7 +13,7 @@ async fn main() { let logger = make_logger(from_str_to_log(&cli.log_level)); info!(logger, "Running with following args: {:?}", cli); - download_loop(cli.url, logger, cli.path, cli.inputs, cli.rate, cli.transform_id).await + download_loop(cli.url, logger, cli.path, cli.inputs, cli.transform_id).await } fn make_logger(level: Level) -> Logger { @@ -43,13 +43,6 @@ Log level to use for running. You can use standard log levels 'info', #[clap(long, help = "Path to where the output should be generated")] path: PathBuf, - #[clap( - long, - help = "Rate of the matched messages that should be let through. It will be 1/rate", - default_value = "100" - )] - rate: u64, - #[clap(long, help = "Inputs that will be linked to this transform")] inputs: Vec, From 4a6b93bc550df4d5a3ab46242c37c3bf39d226af Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Mon, 1 Jul 2024 12:52:20 +0200 Subject: [PATCH 16/23] fixing bazel --- .../log-noise-filter-backend/BUILD.bazel | 23 +++++++++++++++++-- .../log-noise-filter-downloader/BUILD.bazel | 4 +++- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/rs/ic-observability/log-noise-filter-backend/BUILD.bazel b/rs/ic-observability/log-noise-filter-backend/BUILD.bazel index 45b1a02d8..23eb07d49 100644 --- a/rs/ic-observability/log-noise-filter-backend/BUILD.bazel +++ b/rs/ic-observability/log-noise-filter-backend/BUILD.bazel @@ -1,9 +1,28 @@ load("@crate_index_dre//:defs.bzl", "aliases", "all_crate_deps") -load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_test") +load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_test", "rust_library") load("@//rs:oci_images.bzl", "rust_binary_oci_image_rules") +package(default_visibility = ["//visibility:public"]) + DEPS = [] +rust_library( + name = "log-noise-filter-backend-lib", + srcs = glob(["src/**/*.rs"]), + aliases = aliases(), + crate_name = "log_noise_filter_backend", + proc_macro_deps = all_crate_deps( + proc_macro = True, + ), + deps = all_crate_deps( + normal = True, + ) + DEPS, +) + +BINARY_DEPS = [ + ":log-noise-filter-backend-lib", +] + rust_binary( name = "log-noise-filter-backend", srcs = glob(["src/**/*.rs"]), @@ -14,7 +33,7 @@ rust_binary( stamp = 1, deps = all_crate_deps( normal = True, - ) + DEPS, + ) + DEPS + BINARY_DEPS, ) rust_binary_oci_image_rules( diff --git a/rs/ic-observability/log-noise-filter-downloader/BUILD.bazel b/rs/ic-observability/log-noise-filter-downloader/BUILD.bazel index b8f0aae21..b3c3b92a0 100644 --- a/rs/ic-observability/log-noise-filter-downloader/BUILD.bazel +++ b/rs/ic-observability/log-noise-filter-downloader/BUILD.bazel @@ -2,7 +2,9 @@ load("@crate_index_dre//:defs.bzl", "aliases", "all_crate_deps") load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_test") load("@//rs:oci_images.bzl", "rust_binary_oci_image_rules") -DEPS = [] +DEPS = [ + "//rs/ic-observability/log-noise-filter-backend:log-noise-filter-backend-lib", +] rust_binary( name = "log-noise-filter-downloader", From d99b22c022034a29e4fedabc9422987986ae432b Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Mon, 1 Jul 2024 13:17:05 +0200 Subject: [PATCH 17/23] started writing tests --- .../src/handlers/mod.rs | 8 ++--- .../log-noise-filter-backend/src/main.rs | 4 ++- .../src/tests/criteria_tests.rs | 32 +++++++++++++++++++ .../log-noise-filter-backend/src/tests/mod.rs | 29 +++++++++++++++++ .../src/tests/rate_tests.rs | 1 + 5 files changed, 69 insertions(+), 5 deletions(-) create mode 100644 rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs create mode 100644 rs/ic-observability/log-noise-filter-backend/src/tests/mod.rs create mode 100644 rs/ic-observability/log-noise-filter-backend/src/tests/rate_tests.rs diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs index be612a9e2..4e118a073 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs @@ -12,9 +12,9 @@ use regex::Regex; use slog::Logger; use tokio::sync::Mutex; -mod criteria; +pub(crate) mod criteria; pub mod get_all; -mod rate; +pub(crate) mod rate; #[derive(Clone)] pub struct Server { @@ -24,10 +24,10 @@ pub struct Server { } impl Server { - pub fn new(logger: Logger, rate: u64) -> Self { + pub fn new(logger: Logger, rate: u64, criteria: Vec) -> Self { Self { logger, - criteria: Arc::new(Mutex::new(vec![])), + criteria: Arc::new(Mutex::new(criteria)), rate: Arc::new(Mutex::new(rate)), } } diff --git a/rs/ic-observability/log-noise-filter-backend/src/main.rs b/rs/ic-observability/log-noise-filter-backend/src/main.rs index 815ea609c..9ae5e14fd 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/main.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/main.rs @@ -6,6 +6,8 @@ use slog::{info, o, Drain, Level, Logger}; use crate::handlers::Server; mod handlers; +#[cfg(test)] +mod tests; #[tokio::main] async fn main() { @@ -16,7 +18,7 @@ async fn main() { let socket = SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::UNSPECIFIED), cli.port); info!(logger, "Running noise filter manager {}", socket); - let server = Server::new(logger.clone(), cli.global_rate); + let server = Server::new(logger.clone(), cli.global_rate, vec![]); server.run(socket).await; info!(logger, "Noise filter manager stopped"); diff --git a/rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs b/rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs new file mode 100644 index 000000000..c46ec07b7 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs @@ -0,0 +1,32 @@ +use axum::Json; + +use crate::{ + handlers::criteria::delete_criteria::delete_criteria, + tests::{server, server_with_criteria}, +}; + +#[tokio::test] +async fn delete_criteria_test() { + let server = server_with_criteria(vec!["test", "another", "one more"].iter().map(|f| f.to_string()).collect()); + let payload = Json(vec![2, 0]); + + let resp = delete_criteria(server, payload).await; + + assert!(resp.is_ok()); + let resp = resp.unwrap(); + assert_eq!(resp.is_empty(), false); + assert!(resp.first_key_value().is_some()); + let (key, value) = resp.first_key_value().unwrap(); + assert_eq!(key, &0); + assert_eq!(value, "another") +} + +#[tokio::test] +async fn delete_criteria_error_test() { + let server = server(); + let payload = Json(vec![1]); + + let resp = delete_criteria(server, payload).await; + + assert!(resp.is_err()); +} diff --git a/rs/ic-observability/log-noise-filter-backend/src/tests/mod.rs b/rs/ic-observability/log-noise-filter-backend/src/tests/mod.rs new file mode 100644 index 000000000..38fbd26df --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/tests/mod.rs @@ -0,0 +1,29 @@ +use axum::extract::State; + +use crate::handlers::Server; + +mod criteria_tests; +mod rate_tests; + +const RATE: u64 = 42; +const CRITERIA: &[&str] = &["test.*"]; + +fn criteria() -> Vec { + CRITERIA.iter().map(|s| s.to_string()).collect() +} + +fn server() -> State { + server_with_rate_and_criteria(RATE, criteria()) +} + +fn server_with_rate_and_criteria(rate: u64, criteria: Vec) -> State { + State(Server::new(slog::Logger::root(slog::Discard, slog::o!()), rate, criteria)) +} + +fn server_with_rate(rate: u64) -> State { + server_with_rate_and_criteria(rate, vec![]) +} + +fn server_with_criteria(criteria: Vec) -> State { + server_with_rate_and_criteria(RATE, criteria) +} diff --git a/rs/ic-observability/log-noise-filter-backend/src/tests/rate_tests.rs b/rs/ic-observability/log-noise-filter-backend/src/tests/rate_tests.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/rs/ic-observability/log-noise-filter-backend/src/tests/rate_tests.rs @@ -0,0 +1 @@ + From 16174dd42b5eee90ad384be9d47791d69fd3ceff Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Mon, 1 Jul 2024 13:31:50 +0200 Subject: [PATCH 18/23] finishing backend tests --- .../src/tests/criteria_tests.rs | 32 ++++++++++++++++++- .../src/tests/rate_tests.rs | 14 ++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs b/rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs index c46ec07b7..efabbc73d 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs @@ -1,7 +1,7 @@ use axum::Json; use crate::{ - handlers::criteria::delete_criteria::delete_criteria, + handlers::criteria::{delete_criteria::delete_criteria, get_criteria::get_criteria, post_criteria::update}, tests::{server, server_with_criteria}, }; @@ -30,3 +30,33 @@ async fn delete_criteria_error_test() { assert!(resp.is_err()); } + +#[tokio::test] +async fn get_criteria_test() { + let posted_criteria: Vec = vec!["test", "another", "one more"].iter().map(|f| f.to_string()).collect(); + let server = server_with_criteria(posted_criteria.clone()); + let resp = get_criteria(server).await; + + assert!(resp.is_ok()); + let resp = resp.unwrap(); + assert_eq!(resp.len(), posted_criteria.len()); + for i in 0..posted_criteria.len() { + assert_eq!(resp[&(i as u32)], posted_criteria[i]); + } +} + +#[tokio::test] +async fn post_criteria_test() { + let server = server_with_criteria(vec![]); + let payload = Json(vec!["test", "another", "one more"].iter().map(|f| f.to_string()).collect::>()); + + let resp = update(server, payload).await; + + assert!(resp.is_ok()); + let resp = resp.unwrap(); + assert_eq!(resp.is_empty(), false); + assert!(resp.first_key_value().is_some()); + let (key, value) = resp.first_key_value().unwrap(); + assert_eq!(key, &0); + assert_eq!(value, "test") +} diff --git a/rs/ic-observability/log-noise-filter-backend/src/tests/rate_tests.rs b/rs/ic-observability/log-noise-filter-backend/src/tests/rate_tests.rs index 8b1378917..722c1037e 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/tests/rate_tests.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/tests/rate_tests.rs @@ -1 +1,15 @@ +use axum::Json; +use crate::{handlers::rate::put_rate::put_rate, tests::server}; + +#[tokio::test] +async fn test_put() { + let server = server(); + let payload = Json(1200); + + let resp = put_rate(server, payload).await; + assert!(resp.is_ok()); + + let resp = resp.unwrap(); + assert_eq!(resp.0, 1200); +} From a6b6cbe0be0b69ebfea22c606e80270dd8763724 Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Mon, 1 Jul 2024 13:44:42 +0200 Subject: [PATCH 19/23] fixing running tests with bazel --- .../log-noise-filter-backend/BUILD.bazel | 15 +++++++++++++++ .../log-noise-filter-backend/src/lib.rs | 2 ++ .../log-noise-filter-backend/src/main.rs | 2 -- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/rs/ic-observability/log-noise-filter-backend/BUILD.bazel b/rs/ic-observability/log-noise-filter-backend/BUILD.bazel index 23eb07d49..9d4aa2b6d 100644 --- a/rs/ic-observability/log-noise-filter-backend/BUILD.bazel +++ b/rs/ic-observability/log-noise-filter-backend/BUILD.bazel @@ -36,6 +36,21 @@ rust_binary( ) + DEPS + BINARY_DEPS, ) +rust_test( + name = "unit_test", + aliases = aliases( + normal_dev = True, + proc_macro_dev = True, + ), + crate = ":log-noise-filter-backend-lib", + proc_macro_deps = all_crate_deps( + proc_macro_dev = True, + ), + deps = all_crate_deps( + normal_dev = True, + ) + DEPS, +) + rust_binary_oci_image_rules( name = "oci_image", src = ":log-noise-filter-backend", diff --git a/rs/ic-observability/log-noise-filter-backend/src/lib.rs b/rs/ic-observability/log-noise-filter-backend/src/lib.rs index c3d449565..f8615f7fd 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/lib.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/lib.rs @@ -1 +1,3 @@ pub mod handlers; +#[cfg(test)] +mod tests; diff --git a/rs/ic-observability/log-noise-filter-backend/src/main.rs b/rs/ic-observability/log-noise-filter-backend/src/main.rs index 9ae5e14fd..5ab83ee0f 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/main.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/main.rs @@ -6,8 +6,6 @@ use slog::{info, o, Drain, Level, Logger}; use crate::handlers::Server; mod handlers; -#[cfg(test)] -mod tests; #[tokio::main] async fn main() { From a2e2e51c3cc52dc48dcdebcfd50dffc63239930e Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Mon, 1 Jul 2024 14:30:21 +0200 Subject: [PATCH 20/23] adding state file --- .../src/handlers/get_all.rs | 18 +-------- .../src/handlers/mod.rs | 25 ++++++++++++- .../log-noise-filter-backend/src/main.rs | 37 ++++++++++++++++--- .../src/download_loop.rs | 2 +- 4 files changed, 58 insertions(+), 24 deletions(-) diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/get_all.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/get_all.rs index 5b297b95f..9f3e42d93 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/handlers/get_all.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/get_all.rs @@ -1,9 +1,6 @@ -use std::{collections::BTreeMap, hash::Hash}; - use axum::{extract::State, http::StatusCode, Json}; -use serde::{Deserialize, Serialize}; -use super::Server; +use super::{Server, WholeState}; pub async fn get_all(State(server): State) -> Result, (StatusCode, String)> { let state = WholeState { @@ -13,16 +10,3 @@ pub async fn get_all(State(server): State) -> Result, ( Ok(Json(state)) } - -#[derive(Serialize, Deserialize)] -pub struct WholeState { - pub rate: u64, - pub criteria: BTreeMap, -} - -impl Hash for WholeState { - fn hash(&self, state: &mut H) { - self.rate.hash(state); - self.criteria.hash(state); - } -} diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs index 4e118a073..81121bffe 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, net::SocketAddr, sync::Arc}; +use std::{collections::BTreeMap, hash::Hash, net::SocketAddr, sync::Arc}; use axum::{ routing::{delete, get, post, put}, @@ -9,6 +9,7 @@ use criteria::{delete_criteria::delete_criteria, get_criteria::get_criteria, pos use get_all::get_all; use rate::{get_rate::get_rate, put_rate::put_rate}; use regex::Regex; +use serde::{Deserialize, Serialize}; use slog::Logger; use tokio::sync::Mutex; @@ -16,6 +17,28 @@ pub(crate) mod criteria; pub mod get_all; pub(crate) mod rate; +#[derive(Serialize, Deserialize)] +pub struct WholeState { + pub rate: u64, + pub criteria: BTreeMap, +} + +impl Hash for WholeState { + fn hash(&self, state: &mut H) { + self.rate.hash(state); + self.criteria.hash(state); + } +} + +impl Default for WholeState { + fn default() -> Self { + Self { + rate: 1500, + criteria: Default::default(), + } + } +} + #[derive(Clone)] pub struct Server { pub logger: Logger, diff --git a/rs/ic-observability/log-noise-filter-backend/src/main.rs b/rs/ic-observability/log-noise-filter-backend/src/main.rs index 5ab83ee0f..cf175ea0b 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/main.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/main.rs @@ -1,7 +1,12 @@ -use std::net::{Ipv4Addr, SocketAddr}; +use std::{ + collections::BTreeMap, + net::{Ipv4Addr, SocketAddr}, + path::PathBuf, +}; use clap::Parser; -use slog::{info, o, Drain, Level, Logger}; +use handlers::WholeState; +use slog::{info, o, warn, Drain, Level, Logger}; use crate::handlers::Server; @@ -16,12 +21,34 @@ async fn main() { let socket = SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::UNSPECIFIED), cli.port); info!(logger, "Running noise filter manager {}", socket); - let server = Server::new(logger.clone(), cli.global_rate, vec![]); + let (global_rate, criteria) = load_state(&cli.state_file, &logger).await; + + let server = Server::new(logger.clone(), global_rate, criteria.into_iter().map(|(_, c)| c).collect()); server.run(socket).await; info!(logger, "Noise filter manager stopped"); } +async fn load_state(path: &PathBuf, logger: &Logger) -> (u64, BTreeMap) { + if path.exists() { + let content = tokio::fs::read_to_string(path).await.unwrap(); + let maybe_state = serde_json::from_str::(&content); + match maybe_state { + Ok(state) => return (state.rate, state.criteria), + Err(e) => warn!( + logger, + "Failed to deserialize state file {}, will remove it and recreate. The error was: {:?}", + path.display(), + e + ), + } + } + + let default = WholeState::default(); + tokio::fs::write(path, serde_json::to_string_pretty(&default).unwrap()).await.unwrap(); + (default.rate, default.criteria) +} + fn make_logger(level: Level) -> Logger { let decorator = slog_term::TermDecorator::new().build(); let full_format = slog_term::FullFormat::new(decorator).build().fuse(); @@ -46,8 +73,8 @@ Log level to use for running. You can use standard log levels 'info', #[clap(long, default_value = "8080", help = "Port to use for running the api")] port: u16, - #[clap(long, default_value = "1500", help = "Global rate")] - global_rate: u64, + #[clap(long, help = "State file used to sync across restarts")] + state_file: PathBuf, } fn from_str_to_log(value: &str) -> Level { diff --git a/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs b/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs index e6998f368..0af2f07d4 100644 --- a/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs +++ b/rs/ic-observability/log-noise-filter-downloader/src/download_loop.rs @@ -5,7 +5,7 @@ use std::{ time::Duration, }; -use log_noise_filter_backend::handlers::get_all::WholeState; +use log_noise_filter_backend::handlers::WholeState; use reqwest::Client; use serde::Serialize; use slog::{info, warn, Logger}; From 80ff3a094e4297d789c52aaf9b2388fda491f0bf Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Mon, 1 Jul 2024 14:47:35 +0200 Subject: [PATCH 21/23] fixing clippy --- .../src/handlers/mod.rs | 27 ++++++++++++++++--- .../log-noise-filter-backend/src/main.rs | 7 ++++- .../src/tests/criteria_tests.rs | 8 +++--- .../log-noise-filter-backend/src/tests/mod.rs | 6 +---- 4 files changed, 34 insertions(+), 14 deletions(-) diff --git a/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs b/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs index 81121bffe..4704985fd 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/handlers/mod.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, hash::Hash, net::SocketAddr, sync::Arc}; +use std::{collections::BTreeMap, hash::Hash, net::SocketAddr, path::PathBuf, sync::Arc}; use axum::{ routing::{delete, get, post, put}, @@ -10,7 +10,7 @@ use get_all::get_all; use rate::{get_rate::get_rate, put_rate::put_rate}; use regex::Regex; use serde::{Deserialize, Serialize}; -use slog::Logger; +use slog::{warn, Logger}; use tokio::sync::Mutex; pub(crate) mod criteria; @@ -44,14 +44,16 @@ pub struct Server { pub logger: Logger, criteria: Arc>>, rate: Arc>, + path: Option, } impl Server { - pub fn new(logger: Logger, rate: u64, criteria: Vec) -> Self { + pub fn new(logger: Logger, rate: u64, criteria: Vec, path: Option) -> Self { Self { logger, criteria: Arc::new(Mutex::new(criteria)), rate: Arc::new(Mutex::new(rate)), + path, } } @@ -97,6 +99,8 @@ impl Server { return Err(errors); } criteria.into_iter().for_each(|c| server_criteria.push(c)); + drop(server_criteria); + self.save_whole_state().await; Ok(()) } @@ -123,7 +127,22 @@ impl Server { indexes.iter().for_each(|c| { server_criteria.remove(*c as usize); }); - + drop(server_criteria); + self.save_whole_state().await; Ok(()) } + + async fn save_whole_state(&self) { + let state = WholeState { + rate: *self.rate.lock().await, + criteria: self.get_criteria_mapped().await, + }; + + if let Some(path) = &self.path { + match tokio::fs::write(path, serde_json::to_string_pretty(&state).unwrap()).await { + Ok(_) => (), + Err(e) => warn!(self.logger, "Failed to serialize state file {}, the error was: {:?}", path.display(), e), + }; + } + } } diff --git a/rs/ic-observability/log-noise-filter-backend/src/main.rs b/rs/ic-observability/log-noise-filter-backend/src/main.rs index cf175ea0b..5717b49e7 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/main.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/main.rs @@ -23,7 +23,12 @@ async fn main() { let (global_rate, criteria) = load_state(&cli.state_file, &logger).await; - let server = Server::new(logger.clone(), global_rate, criteria.into_iter().map(|(_, c)| c).collect()); + let server = Server::new( + logger.clone(), + global_rate, + criteria.into_iter().map(|(_, c)| c).collect(), + Some(cli.state_file), + ); server.run(socket).await; info!(logger, "Noise filter manager stopped"); diff --git a/rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs b/rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs index efabbc73d..1657890bd 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs @@ -7,7 +7,7 @@ use crate::{ #[tokio::test] async fn delete_criteria_test() { - let server = server_with_criteria(vec!["test", "another", "one more"].iter().map(|f| f.to_string()).collect()); + let server = server_with_criteria(["test", "another", "one more"].iter().map(|f| f.to_string()).collect()); let payload = Json(vec![2, 0]); let resp = delete_criteria(server, payload).await; @@ -33,7 +33,7 @@ async fn delete_criteria_error_test() { #[tokio::test] async fn get_criteria_test() { - let posted_criteria: Vec = vec!["test", "another", "one more"].iter().map(|f| f.to_string()).collect(); + let posted_criteria: Vec = ["test", "another", "one more"].iter().map(|f| f.to_string()).collect(); let server = server_with_criteria(posted_criteria.clone()); let resp = get_criteria(server).await; @@ -48,13 +48,13 @@ async fn get_criteria_test() { #[tokio::test] async fn post_criteria_test() { let server = server_with_criteria(vec![]); - let payload = Json(vec!["test", "another", "one more"].iter().map(|f| f.to_string()).collect::>()); + let payload = Json(["test", "another", "one more"].iter().map(|f| f.to_string()).collect::>()); let resp = update(server, payload).await; assert!(resp.is_ok()); let resp = resp.unwrap(); - assert_eq!(resp.is_empty(), false); + assert!(!resp.is_empty()); assert!(resp.first_key_value().is_some()); let (key, value) = resp.first_key_value().unwrap(); assert_eq!(key, &0); diff --git a/rs/ic-observability/log-noise-filter-backend/src/tests/mod.rs b/rs/ic-observability/log-noise-filter-backend/src/tests/mod.rs index 38fbd26df..8852d1064 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/tests/mod.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/tests/mod.rs @@ -17,11 +17,7 @@ fn server() -> State { } fn server_with_rate_and_criteria(rate: u64, criteria: Vec) -> State { - State(Server::new(slog::Logger::root(slog::Discard, slog::o!()), rate, criteria)) -} - -fn server_with_rate(rate: u64) -> State { - server_with_rate_and_criteria(rate, vec![]) + State(Server::new(slog::Logger::root(slog::Discard, slog::o!()), rate, criteria, None)) } fn server_with_criteria(criteria: Vec) -> State { From 8d3520d6c899dc4b331766b938320fa859608496 Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Mon, 1 Jul 2024 14:54:20 +0200 Subject: [PATCH 22/23] adding fix --- .../log-noise-filter-backend/src/tests/criteria_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs b/rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs index 1657890bd..17a1eeae4 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/tests/criteria_tests.rs @@ -14,7 +14,7 @@ async fn delete_criteria_test() { assert!(resp.is_ok()); let resp = resp.unwrap(); - assert_eq!(resp.is_empty(), false); + assert!(!resp.is_empty()); assert!(resp.first_key_value().is_some()); let (key, value) = resp.first_key_value().unwrap(); assert_eq!(key, &0); From a791891ee7dc3da07922e6aa207655cb34f7baa7 Mon Sep 17 00:00:00 2001 From: nikolamilosa Date: Mon, 1 Jul 2024 14:59:19 +0200 Subject: [PATCH 23/23] clippy --- rs/ic-observability/log-noise-filter-backend/src/main.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/rs/ic-observability/log-noise-filter-backend/src/main.rs b/rs/ic-observability/log-noise-filter-backend/src/main.rs index 5717b49e7..d8f617f4e 100644 --- a/rs/ic-observability/log-noise-filter-backend/src/main.rs +++ b/rs/ic-observability/log-noise-filter-backend/src/main.rs @@ -23,12 +23,7 @@ async fn main() { let (global_rate, criteria) = load_state(&cli.state_file, &logger).await; - let server = Server::new( - logger.clone(), - global_rate, - criteria.into_iter().map(|(_, c)| c).collect(), - Some(cli.state_file), - ); + let server = Server::new(logger.clone(), global_rate, criteria.into_values().collect(), Some(cli.state_file)); server.run(socket).await; info!(logger, "Noise filter manager stopped");