From d3c7d0a3884d048d58ab782c653fd86f2920cd7f Mon Sep 17 00:00:00 2001 From: Chukwuemeka Ewurum Date: Thu, 9 May 2024 17:04:54 +0000 Subject: [PATCH 1/8] Add fuzzing test to flowgger Co-authored-by: Chukwuemeka Ewurum --- Cargo.toml | 1 + flowgger.toml | 8 ++ src/flowgger/config.rs | 2 +- src/flowgger/encoder/ltsv_encoder.rs | 9 +- src/flowgger/input/mod.rs | 2 +- src/flowgger/input/udp_input.rs | 2 +- src/flowgger/mod.rs | 16 +-- tests/fuzzer.rs | 156 +++++++++++++++++++++++++++ 8 files changed, 181 insertions(+), 15 deletions(-) create mode 100644 tests/fuzzer.rs diff --git a/Cargo.toml b/Cargo.toml index 354d183..f66e5fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ time-tz = "0.3" [dev-dependencies] tempdir = "0.3" +quickcheck = "1" [profile.release] opt-level = 3 diff --git a/flowgger.toml b/flowgger.toml index e2af12d..bb6e642 100644 --- a/flowgger.toml +++ b/flowgger.toml @@ -160,3 +160,11 @@ framing = "line" format = "rfc3164" # Format of the optional timestamp to be prepended to each event syslog_prepend_timestamp="[[[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:6]Z]" + + +[test] + +################### +# Fuzzing Test # +################### +fuzzed_message_count = 500 \ No newline at end of file diff --git a/src/flowgger/config.rs b/src/flowgger/config.rs index 0f37eea..fb3a75a 100644 --- a/src/flowgger/config.rs +++ b/src/flowgger/config.rs @@ -11,7 +11,7 @@ use toml::Value; /// [`Configuration`]: https://github.com/jedisct1/flowgger/wiki/Configuration #[derive(Clone)] pub struct Config { - config: Value, + pub config: Value, } impl Config { diff --git a/src/flowgger/encoder/ltsv_encoder.rs b/src/flowgger/encoder/ltsv_encoder.rs index 42cba04..a2859e7 100644 --- a/src/flowgger/encoder/ltsv_encoder.rs +++ b/src/flowgger/encoder/ltsv_encoder.rs @@ -134,9 +134,9 @@ use time::Month; #[test] fn test_ltsv_full_encode_no_sd() { let full_msg = "<23>Aug 6 11:15:24 testhostname appname[69]: 42 - some test message"; - let expected_msg = "host:testhostname\ttime:1659784524\tmessage:some test message\tfull_message:<23>Aug 6 11:15:24 testhostname appname[69]: 42 - some test message\tlevel:7\tfacility:2\tappname:appname\tprocid:69\tmsgid:42"; - let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"ltsv\"\n").unwrap(); let ts = ts_from_partial_date_time(Month::August, 6, 11, 15, 24); + let expected_msg = format!("host:testhostname\ttime:{}\tmessage:some test message\tfull_message:<23>Aug 6 11:15:24 testhostname appname[69]: 42 - some test message\tlevel:7\tfacility:2\tappname:appname\tprocid:69\tmsgid:42", ts); + let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"ltsv\"\n").unwrap(); let record = Record { ts, @@ -159,9 +159,10 @@ fn test_ltsv_full_encode_no_sd() { #[test] fn test_ltsv_full_encode_multiple_sd() { let full_msg = "<23>Aug 6 11:15:24 testhostname appname[69]: 42 [someid a=\"b\" c=\"123456\"][someid2 a2=\"b2\" c2=\"123456\"] some test message"; - let expected_msg = "a:b\tc:123456\ta2:b2\tc2:123456\thost:testhostname\ttime:1659784524\tmessage:some test message\tfull_message:<23>Aug 6 11:15:24 testhostname appname[69]: 42 [someid a=\"b\" c=\"123456\"][someid2 a2=\"b2\" c2=\"123456\"] some test message\tlevel:7\tfacility:2\tappname:appname\tprocid:69\tmsgid:42"; - let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"ltsv\"\n").unwrap(); let ts = ts_from_partial_date_time(Month::August, 6, 11, 15, 24); + let expected_msg = format!("a:b\tc:123456\ta2:b2\tc2:123456\thost:testhostname\ttime:{}\tmessage:some test message\tfull_message:<23>Aug 6 11:15:24 testhostname appname[69]: 42 [someid a=\"b\" c=\"123456\"][someid2 a2=\"b2\" c2=\"123456\"] some test message\tlevel:7\tfacility:2\tappname:appname\tprocid:69\tmsgid:42", ts); + let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"ltsv\"\n").unwrap(); + let record = Record { ts, diff --git a/src/flowgger/input/mod.rs b/src/flowgger/input/mod.rs index 250a9e9..0363f04 100644 --- a/src/flowgger/input/mod.rs +++ b/src/flowgger/input/mod.rs @@ -7,7 +7,7 @@ mod tcp; #[cfg(feature = "tls")] mod tls; #[cfg(feature = "syslog")] -mod udp_input; +pub mod udp_input; #[cfg(feature = "file")] pub use self::file::FileInput; diff --git a/src/flowgger/input/udp_input.rs b/src/flowgger/input/udp_input.rs index ebbe61b..6033a7e 100644 --- a/src/flowgger/input/udp_input.rs +++ b/src/flowgger/input/udp_input.rs @@ -97,7 +97,7 @@ impl Input for UdpInput { /// but could not be handled /// `Invalid UTF-8 input`: Bubble up from handle_record, the record is not in a valid utf-8 format, it could be a non /// supported compression format -fn handle_record_maybe_compressed( +pub fn handle_record_maybe_compressed( line: &[u8], tx: &SyncSender>, decoder: &Box, diff --git a/src/flowgger/mod.rs b/src/flowgger/mod.rs index 6aa408b..84678ad 100644 --- a/src/flowgger/mod.rs +++ b/src/flowgger/mod.rs @@ -1,9 +1,9 @@ -mod config; -mod decoder; -mod encoder; -mod input; -mod merger; -mod output; +pub mod config; +pub mod decoder; +pub mod encoder; +pub mod input; +pub mod merger; +pub mod output; mod record; mod splitter; mod utils; @@ -275,12 +275,12 @@ fn get_encoder_passthrough(config: &Config) -> Box { } #[cfg(feature = "rfc3164")] -fn get_decoder_rfc3164(config: &Config) -> Box { +pub fn get_decoder_rfc3164(config: &Config) -> Box { Box::new(RFC3164Decoder::new(config)) as Box } #[cfg(feature = "rfc3164")] -fn get_encoder_rfc3164(config: &Config) -> Box { +pub fn get_encoder_rfc3164(config: &Config) -> Box { Box::new(RFC3164Encoder::new(config)) as Box } diff --git a/tests/fuzzer.rs b/tests/fuzzer.rs new file mode 100644 index 0000000..5a717ed --- /dev/null +++ b/tests/fuzzer.rs @@ -0,0 +1,156 @@ +use flowgger; +use quickcheck; + +use quickcheck::QuickCheck; + +use flowgger::flowgger::config::Config; +use flowgger::flowgger::encoder::Encoder; +use flowgger::flowgger::decoder::Decoder; +use flowgger::flowgger::merger; +use flowgger::flowgger::output; + +use std::sync::mpsc::{Receiver, sync_channel, SyncSender}; + +use flowgger::flowgger::get_decoder_rfc3164; +use flowgger::flowgger::get_encoder_rfc3164; +use flowgger::flowgger::input::udp_input::handle_record_maybe_compressed; + +use self::merger::{LineMerger, Merger}; +use self::output::FileOutput; +use self::output::Output; + +use std::sync::{Arc, Mutex}; +use toml::Value; +use std::fs; +use std::{thread, time}; + +const DEFAULT_CONFIG_FILE: &str = "flowgger.toml"; +const DEFAULT_OUTPUT_FILEPATH: &str = "output.log"; +const DEFAULT_QUEUE_SIZE: usize = 10_000_000; + +const DEFAULT_OUTPUT_FORMAT: &str = "gelf"; +const DEFAULT_OUTPUT_FRAMING: &str = "noop"; +const DEFAULT_OUTPUT_TYPE: &str = "file"; + +const DEFAULT_FUZZED_MESSAGE_COUNT: u64 = 500; + +fn get_file_output(config: &Config) -> Box { + Box::new(FileOutput::new(config)) as Box +} + +pub fn start_file_output(config: &Config, rx: Receiver>){ + + let output_format = config + .lookup("output.format") + .map_or(DEFAULT_OUTPUT_FORMAT, |x| { + x.as_str().expect("output.format must be a string") + }); + + let output = get_file_output(&config); + let output_type = config + .lookup("output.type") + .map_or(DEFAULT_OUTPUT_TYPE, |x| { + x.as_str().expect("output.type must be a string") + }); + + let _output_framing = match config.lookup("output.framing") { + Some(framing) => framing.as_str().expect("output.framing must be a string"), + None => match (output_format, output_type) { + ("capnp", _) | (_, "kafka") => "noop", + (_, "debug") | ("ltsv", _) => "line", + ("gelf", _) => "nul", + _ => DEFAULT_OUTPUT_FRAMING, + }, + }; + let merger: Option> = Some(Box::new(LineMerger::new(&config)) as Box); + + let arx = Arc::new(Mutex::new(rx)); + output.start(arx, merger); + +} + +pub fn get_config() -> Config { + let mut config = match Config::from_path(DEFAULT_CONFIG_FILE) { + Ok(config) => config, + Err(e) => panic!( + "Unable to read the config file [{}]: {}", + "flowgger.toml", + e.to_string() + ), + }; + + if let Some(entry) = config.config.get_mut("output").unwrap().get_mut("file_rotation_time"){ + *entry = Value::Integer(0); + }else{ + panic!("Failed to find config entry"); + } + + return config; +} + +pub fn remove_output_file(file_output_path: &str){ + fs::remove_file(file_output_path); +} + +pub fn fuzz_target_rfc3164(data: &[u8]) { + let config = get_config(); + let file_output_path = config.lookup("output.file_path").map_or(DEFAULT_OUTPUT_FILEPATH, |x| { + x.as_str().expect("File output path missing in config") + }); + remove_output_file(&file_output_path); + + if let Ok(s) = std::str::from_utf8(data) { + let (tx, rx): (SyncSender>, Receiver>) = sync_channel(DEFAULT_QUEUE_SIZE); + start_file_output(&config, rx); + + let encoder = get_encoder_rfc3164(&config); + let decoder = get_decoder_rfc3164(&config); + let (decoder, encoder): (Box, Box) = + (decoder.clone_boxed(), encoder.clone_boxed()); + let result = handle_record_maybe_compressed(s.as_bytes(), &tx, &decoder, &encoder); + + match result { + Ok(_) => { + drop(tx); + thread::sleep(time::Duration::from_millis(100)); + + let file_contents = match fs::read_to_string(file_output_path){ + Ok(contents) => contents, + Err(_) => { + println!("Failed to read file"); + "".to_string() + } + }; + + let split_file_content: Vec<&str> = file_contents.split(" ").filter(|s| !s.is_empty()).collect(); + let split_input: Vec<&str> = s.split(" ").filter(|s| !s.is_empty()).collect(); + + let hostnames_match = split_file_content[3].trim() == split_input[3].trim(); + let appnames_match = split_file_content[4].trim() == split_input[4].trim(); + + if !(hostnames_match && appnames_match){ + panic!("Log output invalid"); + } + } + Err(_) => { + } + } + + + } +} + + +#[test] +fn test_fuzzer(){ + let config = get_config(); + let fuzzed_message_count = match config.lookup("test.fuzzed_message_count"){ + Some(count) => count.as_integer().unwrap() as u64, + None => DEFAULT_FUZZED_MESSAGE_COUNT, + }; + + fn fuzz(data: String){ + fuzz_target_rfc3164(data.as_bytes()); + } + QuickCheck::new().max_tests(fuzzed_message_count).quickcheck(fuzz as fn(String)); +} \ No newline at end of file From 7627f6ff36f98ee6ea6e9b7becc9538bf80b342d Mon Sep 17 00:00:00 2001 From: Chukwuemeka Ewurum Date: Tue, 14 May 2024 18:20:19 +0000 Subject: [PATCH 2/8] Add fuzzing test to flowgger Co-authored-by: Chukwuemeka Ewurum --- Cargo.toml | 1 + flowgger.toml | 8 -- src/flowgger/config.rs | 3 + src/flowgger/fuzzer.rs | 210 ++++++++++++++++++++++++++++++++++++++ src/flowgger/input/mod.rs | 4 +- src/flowgger/mod.rs | 37 ++++++- tests/fuzzer.rs | 156 ---------------------------- 7 files changed, 251 insertions(+), 168 deletions(-) create mode 100644 src/flowgger/fuzzer.rs delete mode 100644 tests/fuzzer.rs diff --git a/Cargo.toml b/Cargo.toml index f66e5fd..7ae92ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,6 +57,7 @@ time-tz = "0.3" [dev-dependencies] tempdir = "0.3" quickcheck = "1" +lazy_static = "1.4.0" [profile.release] opt-level = 3 diff --git a/flowgger.toml b/flowgger.toml index bb6e642..e2af12d 100644 --- a/flowgger.toml +++ b/flowgger.toml @@ -160,11 +160,3 @@ framing = "line" format = "rfc3164" # Format of the optional timestamp to be prepended to each event syslog_prepend_timestamp="[[[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:6]Z]" - - -[test] - -################### -# Fuzzing Test # -################### -fuzzed_message_count = 500 \ No newline at end of file diff --git a/src/flowgger/config.rs b/src/flowgger/config.rs index fb3a75a..9920400 100644 --- a/src/flowgger/config.rs +++ b/src/flowgger/config.rs @@ -11,6 +11,9 @@ use toml::Value; /// [`Configuration`]: https://github.com/jedisct1/flowgger/wiki/Configuration #[derive(Clone)] pub struct Config { + #[cfg(not(test))] + config: Value, + #[cfg(test)] pub config: Value, } diff --git a/src/flowgger/fuzzer.rs b/src/flowgger/fuzzer.rs new file mode 100644 index 0000000..b62faad --- /dev/null +++ b/src/flowgger/fuzzer.rs @@ -0,0 +1,210 @@ +extern crate quickcheck; + +use crate::flowgger; + +use quickcheck::QuickCheck; + +use std::sync::mpsc::{Receiver, sync_channel, SyncSender}; +use std::sync::{Arc, Mutex}; +use std::{fs, thread, time}; +use std::io::{BufReader, BufRead}; + +use flowgger::config::Config; +use flowgger::encoder::Encoder; +use flowgger::decoder::Decoder; +use flowgger::merger; +use flowgger::output; + +use flowgger::get_decoder_rfc3164; +use flowgger::get_encoder_rfc3164; +use flowgger::input::udp_input::handle_record_maybe_compressed; + +use self::merger::{LineMerger, Merger}; +use self::output::FileOutput; +use self::output::Output; +use toml::Value; +use lazy_static::lazy_static; + + +/// Fuzz testing logic defined in this module +/// This module depends on the default configuration file `flowgger.toml` +/// +/// The test in this module hits flowgger with random input. +/// The expected state is a failure for invalid inputs (no logs sent to output) +/// and successfully parsed logs written to output stream for valid inputs +/// +/// # Dependencies +/// It depend on the external crates [`QuickCheck`][https://docs.rs/quickcheck/latest/quickcheck/] and +/// [`LazyStatic`][https://docs.rs/lazy_static/latest/lazy_static/] +/// +/// `QuickCheck`is used to generate random string input, while LazyStatic is used to lazily intialize share resources at runtime +/// +/// # Errors +/// +/// This function will return an error if the default config does not exists,is unreadbale, or is not valid +/// toml format +#[cfg(test)] +mod tests { + use super::*; + + const DEFAULT_CONFIG_FILE: &str = "flowgger.toml"; + const DEFAULT_OUTPUT_FILEPATH: &str = "output.log"; + const DEFAULT_QUEUE_SIZE: usize = 10_000_000; + + const DEFAULT_OUTPUT_FORMAT: &str = "gelf"; + const DEFAULT_OUTPUT_FRAMING: &str = "noop"; + const DEFAULT_OUTPUT_TYPE: &str = "file"; + + const DEFAULT_FUZZED_MESSAGE_COUNT: u64 = 100; + + lazy_static! { + static ref STATIC_CONFIG: Config = get_config(); + static ref SYNC_SENDER:Mutex>>> = Mutex::new(None); + } + + #[test] + fn test_fuzzer(){ + let config = STATIC_CONFIG.clone(); + let file_output_path = config.lookup("output.file_path").map_or(DEFAULT_OUTPUT_FILEPATH, |x| { + x.as_str().expect("File output path missing in config") + }); + remove_output_file(&file_output_path); + + let (tx, rx): (SyncSender>, Receiver>) = sync_channel(DEFAULT_QUEUE_SIZE); + initialize_sender(tx); + start_file_output(&config, rx); + + QuickCheck::new().max_tests(DEFAULT_FUZZED_MESSAGE_COUNT).quickcheck(fuzz_target_rfc3164 as fn(String)); + let _ = check_result(&file_output_path); + } + + pub fn get_config() -> Config { + let mut config = match Config::from_path(DEFAULT_CONFIG_FILE) { + Ok(config) => config, + Err(e) => panic!( + "Unable to read the config file [{}]: {}", + "flowgger.toml", + e.to_string() + ), + }; + + update_file_rotation_defaults_in_config(&mut config); + + return config; + } + + /// Update the default file rotation size and time in the config file + /// This ensures output is sent to a single non-rotated file + pub fn update_file_rotation_defaults_in_config(config: &mut Config){ + if let Some(entry) = config.config.get_mut("output").unwrap().get_mut("file_rotation_size"){ + *entry = Value::Integer(0); + } + + if let Some(entry) = config.config.get_mut("output").unwrap().get_mut("file_rotation_time"){ + *entry = Value::Integer(0); + } + } + + pub fn remove_output_file(file_output_path: &str){ + let _ = fs::remove_file(file_output_path); + } + + // Initialize the SyncSender which will be used to send data to the output file + fn initialize_sender(tx: SyncSender>){ + let mut sender_guard = SYNC_SENDER.lock().unwrap(); + if sender_guard.is_none(){ + *sender_guard = Some(tx); + } + + drop(sender_guard); + } + + /// Start an input listener which writes data to the output file once received. + pub fn start_file_output(config: &Config, rx: Receiver>){ + + let output_format = config + .lookup("output.format") + .map_or(DEFAULT_OUTPUT_FORMAT, |x| { + x.as_str().expect("output.format must be a string") + }); + + let output = get_file_output(&config); + let output_type = config + .lookup("output.type") + .map_or(DEFAULT_OUTPUT_TYPE, |x| { + x.as_str().expect("output.type must be a string") + }); + + let _output_framing = match config.lookup("output.framing") { + Some(framing) => framing.as_str().expect("output.framing must be a string"), + None => match (output_format, output_type) { + ("capnp", _) | (_, "kafka") => "noop", + (_, "debug") | ("ltsv", _) => "line", + ("gelf", _) => "nul", + _ => DEFAULT_OUTPUT_FRAMING, + }, + }; + let merger: Option> = Some(Box::new(LineMerger::new(&config)) as Box); + + let arx = Arc::new(Mutex::new(rx)); + output.start(arx, merger); + } + + fn get_file_output(config: &Config) -> Box { + Box::new(FileOutput::new(config)) as Box + } + + pub fn fuzz_target_rfc3164(data: String) { + let config = STATIC_CONFIG.clone(); + + let mut sender_guard = match SYNC_SENDER.lock() { + Ok(guard) => guard, + Err(_poisoned_error) => { + // Handle the poisoned Mutex + let guard = _poisoned_error.into_inner(); + guard + } + }; + let tx: &mut SyncSender> = sender_guard.as_mut().unwrap(); + + let encoder = get_encoder_rfc3164(&config); + let decoder = get_decoder_rfc3164(&config); + let (decoder, encoder): (Box, Box) = + (decoder.clone_boxed(), encoder.clone_boxed()); + let _result = handle_record_maybe_compressed(data.as_bytes(), &tx, &decoder, &encoder); + } + + // Check for the result + // On invalid input, no logs are expected to be written to the output file + // For valid inputs, analyze each log entry, check that the hostname and appnames are defined + fn check_result(file_output_path: &str)-> Result<(), std::io::Error> { + + let mut sender_guard = match SYNC_SENDER.lock() { + Ok(guard) => guard, + Err(_poisoned_error) => { + // Handle the poisoned Mutex + let guard = _poisoned_error.into_inner(); + guard + } + }; + let tx: SyncSender> = sender_guard.take().unwrap(); + drop(tx); + + thread::sleep(time::Duration::from_millis(1000)); + + let file = fs::File::open(file_output_path).expect("Unable to open output file"); + let reader = BufReader::new(file); + + for line in reader.lines() { + let line_item: String = line?; + let split_line_content: Vec<&str> = line_item.split(" ").filter(|data| !data.is_empty()).collect(); + let hostname = split_line_content[3].trim(); + let appname = split_line_content[4].trim(); + + if hostname.is_empty() || appname.is_empty() { + panic!("Log output invalid"); + } + } + Ok(()) + } +} \ No newline at end of file diff --git a/src/flowgger/input/mod.rs b/src/flowgger/input/mod.rs index 0363f04..e4d9e7c 100644 --- a/src/flowgger/input/mod.rs +++ b/src/flowgger/input/mod.rs @@ -6,7 +6,9 @@ mod stdin_input; mod tcp; #[cfg(feature = "tls")] mod tls; -#[cfg(feature = "syslog")] +#[cfg(all(feature = "syslog", not(test)))] +mod udp_input; +#[cfg(test)] pub mod udp_input; #[cfg(feature = "file")] diff --git a/src/flowgger/mod.rs b/src/flowgger/mod.rs index 84678ad..7a13040 100644 --- a/src/flowgger/mod.rs +++ b/src/flowgger/mod.rs @@ -1,13 +1,36 @@ +#[cfg(not(test))] +mod config; +#[cfg(not(test))] +mod decoder; +#[cfg(not(test))] +mod encoder; +#[cfg(not(test))] +mod input; +#[cfg(not(test))] +mod merger; +#[cfg(not(test))] +mod output; + +#[cfg(test)] pub mod config; +#[cfg(test)] pub mod decoder; +#[cfg(test)] pub mod encoder; +#[cfg(test)] pub mod input; +#[cfg(test)] pub mod merger; +#[cfg(test)] pub mod output; + mod record; mod splitter; mod utils; +#[cfg(test)] +mod fuzzer; + use std::io::{stderr, Write}; #[cfg(feature = "capnp-recompile")] @@ -274,16 +297,24 @@ fn get_encoder_passthrough(config: &Config) -> Box { Box::new(PassthroughEncoder::new(config)) as Box } -#[cfg(feature = "rfc3164")] +#[cfg(all(feature = "rfc3164", test))] pub fn get_decoder_rfc3164(config: &Config) -> Box { Box::new(RFC3164Decoder::new(config)) as Box } - -#[cfg(feature = "rfc3164")] +#[cfg(all(feature = "rfc3164", test))] pub fn get_encoder_rfc3164(config: &Config) -> Box { Box::new(RFC3164Encoder::new(config)) as Box } +#[cfg(all(feature = "rfc3164", not(test)))] +fn get_decoder_rfc3164(config: &Config) -> Box { + Box::new(RFC3164Decoder::new(config)) as Box +} +#[cfg(all(feature = "rfc3164", not(test)))] +fn get_encoder_rfc3164(config: &Config) -> Box { + Box::new(RFC3164Encoder::new(config)) as Box +} + #[cfg(not(feature = "rfc5424"))] fn get_decoder_rfc5424(_config: &Config) -> ! { panic!("Support for rfc5424 hasn't been compiled in") diff --git a/tests/fuzzer.rs b/tests/fuzzer.rs deleted file mode 100644 index 5a717ed..0000000 --- a/tests/fuzzer.rs +++ /dev/null @@ -1,156 +0,0 @@ -use flowgger; -use quickcheck; - -use quickcheck::QuickCheck; - -use flowgger::flowgger::config::Config; -use flowgger::flowgger::encoder::Encoder; -use flowgger::flowgger::decoder::Decoder; -use flowgger::flowgger::merger; -use flowgger::flowgger::output; - -use std::sync::mpsc::{Receiver, sync_channel, SyncSender}; - -use flowgger::flowgger::get_decoder_rfc3164; -use flowgger::flowgger::get_encoder_rfc3164; -use flowgger::flowgger::input::udp_input::handle_record_maybe_compressed; - -use self::merger::{LineMerger, Merger}; -use self::output::FileOutput; -use self::output::Output; - -use std::sync::{Arc, Mutex}; -use toml::Value; -use std::fs; -use std::{thread, time}; - -const DEFAULT_CONFIG_FILE: &str = "flowgger.toml"; -const DEFAULT_OUTPUT_FILEPATH: &str = "output.log"; -const DEFAULT_QUEUE_SIZE: usize = 10_000_000; - -const DEFAULT_OUTPUT_FORMAT: &str = "gelf"; -const DEFAULT_OUTPUT_FRAMING: &str = "noop"; -const DEFAULT_OUTPUT_TYPE: &str = "file"; - -const DEFAULT_FUZZED_MESSAGE_COUNT: u64 = 500; - -fn get_file_output(config: &Config) -> Box { - Box::new(FileOutput::new(config)) as Box -} - -pub fn start_file_output(config: &Config, rx: Receiver>){ - - let output_format = config - .lookup("output.format") - .map_or(DEFAULT_OUTPUT_FORMAT, |x| { - x.as_str().expect("output.format must be a string") - }); - - let output = get_file_output(&config); - let output_type = config - .lookup("output.type") - .map_or(DEFAULT_OUTPUT_TYPE, |x| { - x.as_str().expect("output.type must be a string") - }); - - let _output_framing = match config.lookup("output.framing") { - Some(framing) => framing.as_str().expect("output.framing must be a string"), - None => match (output_format, output_type) { - ("capnp", _) | (_, "kafka") => "noop", - (_, "debug") | ("ltsv", _) => "line", - ("gelf", _) => "nul", - _ => DEFAULT_OUTPUT_FRAMING, - }, - }; - let merger: Option> = Some(Box::new(LineMerger::new(&config)) as Box); - - let arx = Arc::new(Mutex::new(rx)); - output.start(arx, merger); - -} - -pub fn get_config() -> Config { - let mut config = match Config::from_path(DEFAULT_CONFIG_FILE) { - Ok(config) => config, - Err(e) => panic!( - "Unable to read the config file [{}]: {}", - "flowgger.toml", - e.to_string() - ), - }; - - if let Some(entry) = config.config.get_mut("output").unwrap().get_mut("file_rotation_time"){ - *entry = Value::Integer(0); - }else{ - panic!("Failed to find config entry"); - } - - return config; -} - -pub fn remove_output_file(file_output_path: &str){ - fs::remove_file(file_output_path); -} - -pub fn fuzz_target_rfc3164(data: &[u8]) { - let config = get_config(); - let file_output_path = config.lookup("output.file_path").map_or(DEFAULT_OUTPUT_FILEPATH, |x| { - x.as_str().expect("File output path missing in config") - }); - remove_output_file(&file_output_path); - - if let Ok(s) = std::str::from_utf8(data) { - let (tx, rx): (SyncSender>, Receiver>) = sync_channel(DEFAULT_QUEUE_SIZE); - start_file_output(&config, rx); - - let encoder = get_encoder_rfc3164(&config); - let decoder = get_decoder_rfc3164(&config); - let (decoder, encoder): (Box, Box) = - (decoder.clone_boxed(), encoder.clone_boxed()); - let result = handle_record_maybe_compressed(s.as_bytes(), &tx, &decoder, &encoder); - - match result { - Ok(_) => { - drop(tx); - thread::sleep(time::Duration::from_millis(100)); - - let file_contents = match fs::read_to_string(file_output_path){ - Ok(contents) => contents, - Err(_) => { - println!("Failed to read file"); - "".to_string() - } - }; - - let split_file_content: Vec<&str> = file_contents.split(" ").filter(|s| !s.is_empty()).collect(); - let split_input: Vec<&str> = s.split(" ").filter(|s| !s.is_empty()).collect(); - - let hostnames_match = split_file_content[3].trim() == split_input[3].trim(); - let appnames_match = split_file_content[4].trim() == split_input[4].trim(); - - if !(hostnames_match && appnames_match){ - panic!("Log output invalid"); - } - } - Err(_) => { - } - } - - - } -} - - -#[test] -fn test_fuzzer(){ - let config = get_config(); - let fuzzed_message_count = match config.lookup("test.fuzzed_message_count"){ - Some(count) => count.as_integer().unwrap() as u64, - None => DEFAULT_FUZZED_MESSAGE_COUNT, - }; - - fn fuzz(data: String){ - fuzz_target_rfc3164(data.as_bytes()); - } - QuickCheck::new().max_tests(fuzzed_message_count).quickcheck(fuzz as fn(String)); -} \ No newline at end of file From 0d59908ad3e5d0b189e1a6ae2c975dd3927877ff Mon Sep 17 00:00:00 2001 From: Chukwuemeka George Ewurum Date: Wed, 15 May 2024 09:50:22 +0000 Subject: [PATCH 3/8] Add fuzzing test to flowgger --- src/flowgger/fuzzer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flowgger/fuzzer.rs b/src/flowgger/fuzzer.rs index b62faad..a488245 100644 --- a/src/flowgger/fuzzer.rs +++ b/src/flowgger/fuzzer.rs @@ -55,7 +55,7 @@ mod tests { const DEFAULT_OUTPUT_FRAMING: &str = "noop"; const DEFAULT_OUTPUT_TYPE: &str = "file"; - const DEFAULT_FUZZED_MESSAGE_COUNT: u64 = 100; + const DEFAULT_FUZZED_MESSAGE_COUNT: u64 = 500; lazy_static! { static ref STATIC_CONFIG: Config = get_config(); From fe72d173be8384fa12f0366638c2a8e4b5c729cb Mon Sep 17 00:00:00 2001 From: Chukwuemeka George Ewurum Date: Thu, 16 May 2024 14:57:02 +0000 Subject: [PATCH 4/8] Add fuzzing test to flowgger Co-authored-by: Chukwuemeka Ewurum --- src/flowgger/fuzzer.rs | 210 -------------------------- src/flowgger/mod.rs | 16 +- src/flowgger/test_fuzzer.rs | 283 ++++++++++++++++++++++++++++++++++++ 3 files changed, 296 insertions(+), 213 deletions(-) delete mode 100644 src/flowgger/fuzzer.rs create mode 100644 src/flowgger/test_fuzzer.rs diff --git a/src/flowgger/fuzzer.rs b/src/flowgger/fuzzer.rs deleted file mode 100644 index a488245..0000000 --- a/src/flowgger/fuzzer.rs +++ /dev/null @@ -1,210 +0,0 @@ -extern crate quickcheck; - -use crate::flowgger; - -use quickcheck::QuickCheck; - -use std::sync::mpsc::{Receiver, sync_channel, SyncSender}; -use std::sync::{Arc, Mutex}; -use std::{fs, thread, time}; -use std::io::{BufReader, BufRead}; - -use flowgger::config::Config; -use flowgger::encoder::Encoder; -use flowgger::decoder::Decoder; -use flowgger::merger; -use flowgger::output; - -use flowgger::get_decoder_rfc3164; -use flowgger::get_encoder_rfc3164; -use flowgger::input::udp_input::handle_record_maybe_compressed; - -use self::merger::{LineMerger, Merger}; -use self::output::FileOutput; -use self::output::Output; -use toml::Value; -use lazy_static::lazy_static; - - -/// Fuzz testing logic defined in this module -/// This module depends on the default configuration file `flowgger.toml` -/// -/// The test in this module hits flowgger with random input. -/// The expected state is a failure for invalid inputs (no logs sent to output) -/// and successfully parsed logs written to output stream for valid inputs -/// -/// # Dependencies -/// It depend on the external crates [`QuickCheck`][https://docs.rs/quickcheck/latest/quickcheck/] and -/// [`LazyStatic`][https://docs.rs/lazy_static/latest/lazy_static/] -/// -/// `QuickCheck`is used to generate random string input, while LazyStatic is used to lazily intialize share resources at runtime -/// -/// # Errors -/// -/// This function will return an error if the default config does not exists,is unreadbale, or is not valid -/// toml format -#[cfg(test)] -mod tests { - use super::*; - - const DEFAULT_CONFIG_FILE: &str = "flowgger.toml"; - const DEFAULT_OUTPUT_FILEPATH: &str = "output.log"; - const DEFAULT_QUEUE_SIZE: usize = 10_000_000; - - const DEFAULT_OUTPUT_FORMAT: &str = "gelf"; - const DEFAULT_OUTPUT_FRAMING: &str = "noop"; - const DEFAULT_OUTPUT_TYPE: &str = "file"; - - const DEFAULT_FUZZED_MESSAGE_COUNT: u64 = 500; - - lazy_static! { - static ref STATIC_CONFIG: Config = get_config(); - static ref SYNC_SENDER:Mutex>>> = Mutex::new(None); - } - - #[test] - fn test_fuzzer(){ - let config = STATIC_CONFIG.clone(); - let file_output_path = config.lookup("output.file_path").map_or(DEFAULT_OUTPUT_FILEPATH, |x| { - x.as_str().expect("File output path missing in config") - }); - remove_output_file(&file_output_path); - - let (tx, rx): (SyncSender>, Receiver>) = sync_channel(DEFAULT_QUEUE_SIZE); - initialize_sender(tx); - start_file_output(&config, rx); - - QuickCheck::new().max_tests(DEFAULT_FUZZED_MESSAGE_COUNT).quickcheck(fuzz_target_rfc3164 as fn(String)); - let _ = check_result(&file_output_path); - } - - pub fn get_config() -> Config { - let mut config = match Config::from_path(DEFAULT_CONFIG_FILE) { - Ok(config) => config, - Err(e) => panic!( - "Unable to read the config file [{}]: {}", - "flowgger.toml", - e.to_string() - ), - }; - - update_file_rotation_defaults_in_config(&mut config); - - return config; - } - - /// Update the default file rotation size and time in the config file - /// This ensures output is sent to a single non-rotated file - pub fn update_file_rotation_defaults_in_config(config: &mut Config){ - if let Some(entry) = config.config.get_mut("output").unwrap().get_mut("file_rotation_size"){ - *entry = Value::Integer(0); - } - - if let Some(entry) = config.config.get_mut("output").unwrap().get_mut("file_rotation_time"){ - *entry = Value::Integer(0); - } - } - - pub fn remove_output_file(file_output_path: &str){ - let _ = fs::remove_file(file_output_path); - } - - // Initialize the SyncSender which will be used to send data to the output file - fn initialize_sender(tx: SyncSender>){ - let mut sender_guard = SYNC_SENDER.lock().unwrap(); - if sender_guard.is_none(){ - *sender_guard = Some(tx); - } - - drop(sender_guard); - } - - /// Start an input listener which writes data to the output file once received. - pub fn start_file_output(config: &Config, rx: Receiver>){ - - let output_format = config - .lookup("output.format") - .map_or(DEFAULT_OUTPUT_FORMAT, |x| { - x.as_str().expect("output.format must be a string") - }); - - let output = get_file_output(&config); - let output_type = config - .lookup("output.type") - .map_or(DEFAULT_OUTPUT_TYPE, |x| { - x.as_str().expect("output.type must be a string") - }); - - let _output_framing = match config.lookup("output.framing") { - Some(framing) => framing.as_str().expect("output.framing must be a string"), - None => match (output_format, output_type) { - ("capnp", _) | (_, "kafka") => "noop", - (_, "debug") | ("ltsv", _) => "line", - ("gelf", _) => "nul", - _ => DEFAULT_OUTPUT_FRAMING, - }, - }; - let merger: Option> = Some(Box::new(LineMerger::new(&config)) as Box); - - let arx = Arc::new(Mutex::new(rx)); - output.start(arx, merger); - } - - fn get_file_output(config: &Config) -> Box { - Box::new(FileOutput::new(config)) as Box - } - - pub fn fuzz_target_rfc3164(data: String) { - let config = STATIC_CONFIG.clone(); - - let mut sender_guard = match SYNC_SENDER.lock() { - Ok(guard) => guard, - Err(_poisoned_error) => { - // Handle the poisoned Mutex - let guard = _poisoned_error.into_inner(); - guard - } - }; - let tx: &mut SyncSender> = sender_guard.as_mut().unwrap(); - - let encoder = get_encoder_rfc3164(&config); - let decoder = get_decoder_rfc3164(&config); - let (decoder, encoder): (Box, Box) = - (decoder.clone_boxed(), encoder.clone_boxed()); - let _result = handle_record_maybe_compressed(data.as_bytes(), &tx, &decoder, &encoder); - } - - // Check for the result - // On invalid input, no logs are expected to be written to the output file - // For valid inputs, analyze each log entry, check that the hostname and appnames are defined - fn check_result(file_output_path: &str)-> Result<(), std::io::Error> { - - let mut sender_guard = match SYNC_SENDER.lock() { - Ok(guard) => guard, - Err(_poisoned_error) => { - // Handle the poisoned Mutex - let guard = _poisoned_error.into_inner(); - guard - } - }; - let tx: SyncSender> = sender_guard.take().unwrap(); - drop(tx); - - thread::sleep(time::Duration::from_millis(1000)); - - let file = fs::File::open(file_output_path).expect("Unable to open output file"); - let reader = BufReader::new(file); - - for line in reader.lines() { - let line_item: String = line?; - let split_line_content: Vec<&str> = line_item.split(" ").filter(|data| !data.is_empty()).collect(); - let hostname = split_line_content[3].trim(); - let appname = split_line_content[4].trim(); - - if hostname.is_empty() || appname.is_empty() { - panic!("Log output invalid"); - } - } - Ok(()) - } -} \ No newline at end of file diff --git a/src/flowgger/mod.rs b/src/flowgger/mod.rs index 7a13040..0dc06f8 100644 --- a/src/flowgger/mod.rs +++ b/src/flowgger/mod.rs @@ -29,7 +29,7 @@ mod splitter; mod utils; #[cfg(test)] -mod fuzzer; +mod test_fuzzer; use std::io::{stderr, Write}; @@ -202,16 +202,26 @@ fn get_output_kafka(_config: &Config) -> ! { panic!("Support for Kafka hasn't been compiled in") } -#[cfg(feature = "file")] +#[cfg(all(feature = "file", not(test)))] fn get_output_file(config: &Config) -> Box { Box::new(FileOutput::new(config)) as Box } -#[cfg(not(feature = "file"))] +#[cfg(all(not(feature = "file"), not(test)))] fn get_output_file(_config: &Config) -> ! { panic!("Support for file hasn't been compiled in") } +#[cfg(all(feature = "file", test))] +pub fn get_output_file(config: &Config) -> Box { + Box::new(FileOutput::new(config)) as Box +} + +#[cfg(all(not(feature = "file"), test))] +pub fn get_output_file(_config: &Config) -> ! { + panic!("Support for file hasn't been compiled in") +} + #[cfg(feature = "tls")] fn get_output_tls(config: &Config) -> Box { Box::new(TlsOutput::new(config)) as Box diff --git a/src/flowgger/test_fuzzer.rs b/src/flowgger/test_fuzzer.rs new file mode 100644 index 0000000..0051c9b --- /dev/null +++ b/src/flowgger/test_fuzzer.rs @@ -0,0 +1,283 @@ +/// Fuzz testing logic defined in this module +/// This module depends on the default configuration file `flowgger.toml` +/// +/// The test in this module hits flowgger with random input. +/// The expected state is a failure for invalid inputs (no logs sent to output) +/// and successfully parsed logs written to output stream for valid inputs +/// +/// # Dependencies +/// It depend on the external crates [`QuickCheck`][https://docs.rs/quickcheck/latest/quickcheck/] and +/// [`LazyStatic`][https://docs.rs/lazy_static/latest/lazy_static/] +/// +/// `QuickCheck`is used to generate random string input, while LazyStatic is used to lazily intialize share resources at runtime +/// +/// # Errors +/// +/// This function will return an error if the default config does not exists,is unreadbale, or is not valid +/// toml format +#[cfg(test)] +mod tests { + extern crate quickcheck; + + use crate::flowgger; + + use quickcheck::QuickCheck; + + use std::sync::mpsc::{Receiver, sync_channel, SyncSender}; + use std::sync::{Arc, Mutex}; + use std::{fs}; + use std::io::{BufReader, BufRead}; + use std::sync::Once; + + use flowgger::config::Config; + use flowgger::encoder::Encoder; + use flowgger::decoder::Decoder; + use flowgger::merger; + use flowgger::output; + use flowgger::get_output_file; + use flowgger::get_decoder_rfc3164; + use flowgger::get_encoder_rfc3164; + use flowgger::input::udp_input::handle_record_maybe_compressed; + + use self::merger::{LineMerger, Merger}; + use toml::Value; + + const DEFAULT_CONFIG_FILE: &str = "flowgger.toml"; + const DEFAULT_OUTPUT_FILEPATH: &str = "output.log"; + const DEFAULT_QUEUE_SIZE: usize = 10_000_000; + + const DEFAULT_OUTPUT_FORMAT: &str = "gelf"; + const DEFAULT_OUTPUT_FRAMING: &str = "noop"; + const DEFAULT_OUTPUT_TYPE: &str = "file"; + + const DEFAULT_FUZZED_MESSAGE_COUNT: u64 = 500; + + + static INIT_CONFIG: Once = Once::new(); + static mut GLOB_CONFIG: Option = None; + + static INIT_FILEPATH: Once = Once::new(); + static mut GLOB_FILEPATH: String = String::new(); + + static INIT_DECODER: Once = Once::new(); + static mut GLOB_DECODER: Mutex>> = Mutex::new(None); + + static INIT_ENCODER: Once = Once::new(); + static mut GLOB_ENCODER: Mutex>> = Mutex::new(None); + + static INIT_SYNC_SENDER: Once = Once::new(); + static mut GLOB_SYNC_SENDER: Mutex>>> = Mutex::new(None); + + #[test] + fn test_fuzzer(){ + let config = get_config(); + let file_output_path = config.lookup("output.file_path").map_or(DEFAULT_OUTPUT_FILEPATH, |x| { + x.as_str().expect("File output path missing in config") + }); + remove_output_file(&file_output_path); + + let (tx, rx): (SyncSender>, Receiver>) = sync_channel(DEFAULT_QUEUE_SIZE); + start_file_output(&config, rx); + set_globals(&config, &file_output_path, tx); + + QuickCheck::new().tests(DEFAULT_FUZZED_MESSAGE_COUNT).quickcheck(fuzz_target_rfc3164 as fn(String)); + let _ = check_result(&file_output_path); + } + + fn set_globals(config: &Config, file_output_path: &str, sync_sender: SyncSender>){ + set_global_config(config.clone()); + set_global_output_filepath(file_output_path.to_string()); + set_global_rfc3164_decoder(config); + set_global_rfc3164_encoder(config); + set_global_sync_sender(sync_sender); + } + + fn set_global_rfc3164_decoder(config: &Config) { + INIT_DECODER.call_once(|| { + unsafe{ + let decoder = get_decoder_rfc3164(config); + let mut guard = GLOB_DECODER.lock().unwrap(); + if guard.is_none(){ + *guard = Some(decoder.clone_boxed()); + } + drop(guard); + } + }); + } + + fn set_global_rfc3164_encoder(config: &Config) { + INIT_ENCODER.call_once(|| { + unsafe{ + let encoder = get_encoder_rfc3164(config); + let mut guard = GLOB_ENCODER.lock().unwrap(); + if guard.is_none(){ + *guard = Some(encoder.clone_boxed()); + } + drop(guard); + } + }); + } + + fn set_global_sync_sender(tx: SyncSender>){ + INIT_SYNC_SENDER.call_once( || { + unsafe{ + let mut guard = GLOB_SYNC_SENDER.lock().unwrap(); + if guard.is_none(){ + *guard = Some(tx); + } + drop(guard); + } + }); + } + + fn set_global_output_filepath(filepath: String) { + INIT_FILEPATH.call_once(|| { + unsafe{ + GLOB_FILEPATH = filepath; + } + }); + } + + fn set_global_config(config: Config){ + INIT_CONFIG.call_once(|| { + unsafe{ + GLOB_CONFIG = Some(config); + } + }); + } + + fn get_config() -> Config { + let mut config = match Config::from_path(DEFAULT_CONFIG_FILE) { + Ok(config) => config, + Err(e) => panic!( + "Unable to read the config file [{}]: {}", + "flowgger.toml", + e.to_string() + ), + }; + + update_file_rotation_defaults_in_config(&mut config); + return config; + } + + /// Update the default file rotation size and time in the config file + /// This ensures output is sent to a single non-rotated file + pub fn update_file_rotation_defaults_in_config(config: &mut Config){ + if let Some(entry) = config.config.get_mut("output").unwrap().get_mut("file_rotation_size"){ + *entry = Value::Integer(0); + } + + if let Some(entry) = config.config.get_mut("output").unwrap().get_mut("file_rotation_time"){ + *entry = Value::Integer(0); + } + } + + pub fn remove_output_file(file_output_path: &str){ + let _ = fs::remove_file(file_output_path); + } + + /// Start an input listener which writes data to the output file once received. + pub fn start_file_output(config: &Config, rx: Receiver>){ + + let output_format = config + .lookup("output.format") + .map_or(DEFAULT_OUTPUT_FORMAT, |x| { + x.as_str().expect("output.format must be a string") + }); + + let output = get_output_file(&config); + let output_type = config + .lookup("output.type") + .map_or(DEFAULT_OUTPUT_TYPE, |x| { + x.as_str().expect("output.type must be a string") + }); + + let _output_framing = match config.lookup("output.framing") { + Some(framing) => framing.as_str().expect("output.framing must be a string"), + None => match (output_format, output_type) { + ("capnp", _) | (_, "kafka") => "noop", + (_, "debug") | ("ltsv", _) => "line", + ("gelf", _) => "nul", + _ => DEFAULT_OUTPUT_FRAMING, + }, + }; + let merger: Option> = Some(Box::new(LineMerger::new(&config)) as Box); + + let arx = Arc::new(Mutex::new(rx)); + output.start(arx, merger); + } + + pub fn fuzz_target_rfc3164(data: String) { + unsafe{ + let mut sender_guard = match GLOB_SYNC_SENDER.lock() { + Ok(guard) => guard, + Err(_poisoned_error) => { + // Handle the poisoned Mutex + let guard = _poisoned_error.into_inner(); + guard + } + }; + let sync_sender: &mut SyncSender> = sender_guard.as_mut().unwrap(); + + let mut encoder_guard = match GLOB_ENCODER.lock() { + Ok(guard) => guard, + Err(_poisoned_error) => { + let guard = _poisoned_error.into_inner(); + guard + } + }; + let encoder: &mut Box = encoder_guard.as_mut().unwrap(); + + let mut decoder_guard = match GLOB_DECODER.lock() { + Ok(guard) => guard, + Err(_poisoned_error) => { + let guard = _poisoned_error.into_inner(); + guard + } + }; + let decoder: &mut Box = decoder_guard.as_mut().unwrap(); + + let _result = handle_record_maybe_compressed(data.as_bytes(), &sync_sender, &decoder, &encoder); + } + } + + // Check for the result + // On invalid input, no logs are expected to be written to the output file + // For valid inputs, analyze each log entry, and check that the hostnames and appnames are in place + fn check_result(file_output_path: &str)-> Result<(), std::io::Error> { + unsafe{ + + let mut sender_guard = match GLOB_SYNC_SENDER.lock() { + Ok(guard) => guard, + Err(_poisoned_error) => { + let guard = _poisoned_error.into_inner(); + guard + } + }; + let tx: SyncSender> = sender_guard.take().unwrap(); + drop(tx); + + let file = fs::File::open(file_output_path).expect("Unable to open output file"); + let reader = BufReader::new(file); + + for line in reader.lines() { + let line_item: String = line?; + if !line_item.trim().is_empty() { + let split_line_content: Vec<&str> = line_item.split(" ").filter(|data| !data.is_empty()).collect(); + let hostname = split_line_content[3].trim(); + let appname = split_line_content[4].trim(); + + if hostname.is_empty() || appname.is_empty() { + panic!("Log output invalid"); + } + } + + } + Ok(()) + } + } +} + + + + From 5d0c2081fc923e9c34579f31ac68a74a16cfb9d7 Mon Sep 17 00:00:00 2001 From: Chukwuemeka George Ewurum Date: Thu, 16 May 2024 18:45:08 +0000 Subject: [PATCH 5/8] Add fuzzing test to flowgger Co-authored-by: Chukwuemeka Ewurum --- Cargo.toml | 3 +- src/flowgger/test_fuzzer.rs | 129 ++++++++++-------------------------- 2 files changed, 36 insertions(+), 96 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7ae92ac..c2846a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] edition = "2018" name = "flowgger" -version = "0.3.1" +version = "0.3.2" authors = ["Frank Denis ", "Matteo Bigoi ", "Vivien Chene ", "Francesco Berni "] build = "build.rs" repository = "https://github.com/awslabs/flowgger" @@ -57,7 +57,6 @@ time-tz = "0.3" [dev-dependencies] tempdir = "0.3" quickcheck = "1" -lazy_static = "1.4.0" [profile.release] opt-level = 3 diff --git a/src/flowgger/test_fuzzer.rs b/src/flowgger/test_fuzzer.rs index 0051c9b..2fcf430 100644 --- a/src/flowgger/test_fuzzer.rs +++ b/src/flowgger/test_fuzzer.rs @@ -7,13 +7,12 @@ /// /// # Dependencies /// It depend on the external crates [`QuickCheck`][https://docs.rs/quickcheck/latest/quickcheck/] and -/// [`LazyStatic`][https://docs.rs/lazy_static/latest/lazy_static/] /// -/// `QuickCheck`is used to generate random string input, while LazyStatic is used to lazily intialize share resources at runtime +/// `QuickCheck`is used to generate random string input, while a global structure is used to intialize shared resources at runtime /// /// # Errors /// -/// This function will return an error if the default config does not exists,is unreadbale, or is not valid +/// This function will return an error if the default config does not exists, is unreadbale, or is not valid /// toml format #[cfg(test)] mod tests { @@ -33,7 +32,6 @@ mod tests { use flowgger::encoder::Encoder; use flowgger::decoder::Decoder; use flowgger::merger; - use flowgger::output; use flowgger::get_output_file; use flowgger::get_decoder_rfc3164; use flowgger::get_encoder_rfc3164; @@ -50,23 +48,16 @@ mod tests { const DEFAULT_OUTPUT_FRAMING: &str = "noop"; const DEFAULT_OUTPUT_TYPE: &str = "file"; - const DEFAULT_FUZZED_MESSAGE_COUNT: u64 = 500; + const DEFAULT_FUZZED_MESSAGE_COUNT: u64 = 40; + static INIT_CONTEXT: Once = Once::new(); + static mut GLOBAL_CONTEXT: Mutex> = Mutex::new(None); - static INIT_CONFIG: Once = Once::new(); - static mut GLOB_CONFIG: Option = None; - - static INIT_FILEPATH: Once = Once::new(); - static mut GLOB_FILEPATH: String = String::new(); - - static INIT_DECODER: Once = Once::new(); - static mut GLOB_DECODER: Mutex>> = Mutex::new(None); - - static INIT_ENCODER: Once = Once::new(); - static mut GLOB_ENCODER: Mutex>> = Mutex::new(None); - - static INIT_SYNC_SENDER: Once = Once::new(); - static mut GLOB_SYNC_SENDER: Mutex>>> = Mutex::new(None); + struct Context { + encoder: Box, + decoder: Box, + sync_sender: SyncSender>, + } #[test] fn test_fuzzer(){ @@ -78,74 +69,37 @@ mod tests { let (tx, rx): (SyncSender>, Receiver>) = sync_channel(DEFAULT_QUEUE_SIZE); start_file_output(&config, rx); - set_globals(&config, &file_output_path, tx); + set_global_context(&config, tx); QuickCheck::new().tests(DEFAULT_FUZZED_MESSAGE_COUNT).quickcheck(fuzz_target_rfc3164 as fn(String)); let _ = check_result(&file_output_path); } - fn set_globals(config: &Config, file_output_path: &str, sync_sender: SyncSender>){ - set_global_config(config.clone()); - set_global_output_filepath(file_output_path.to_string()); - set_global_rfc3164_decoder(config); - set_global_rfc3164_encoder(config); - set_global_sync_sender(sync_sender); - } - - fn set_global_rfc3164_decoder(config: &Config) { - INIT_DECODER.call_once(|| { + // Set the global context for the fuzzer + // The global context is used to share resources across all test runs + // The once call ensures the static vairable referencing the struct is only ever set once + fn set_global_context(config: &Config, sync_sender: SyncSender>){ + INIT_CONTEXT.call_once(|| { unsafe{ let decoder = get_decoder_rfc3164(config); - let mut guard = GLOB_DECODER.lock().unwrap(); - if guard.is_none(){ - *guard = Some(decoder.clone_boxed()); - } - drop(guard); - } - }); - } - - fn set_global_rfc3164_encoder(config: &Config) { - INIT_ENCODER.call_once(|| { - unsafe{ let encoder = get_encoder_rfc3164(config); - let mut guard = GLOB_ENCODER.lock().unwrap(); - if guard.is_none(){ - *guard = Some(encoder.clone_boxed()); - } - drop(guard); - } - }); - } + let (decoder, encoder): (Box, Box) = (decoder.clone_boxed(), encoder.clone_boxed()); - fn set_global_sync_sender(tx: SyncSender>){ - INIT_SYNC_SENDER.call_once( || { - unsafe{ - let mut guard = GLOB_SYNC_SENDER.lock().unwrap(); + let context = Context{ + encoder: encoder, + decoder: decoder, + sync_sender: sync_sender, + }; + + let mut guard = GLOBAL_CONTEXT.lock().unwrap(); if guard.is_none(){ - *guard = Some(tx); + *guard = Some(context); } drop(guard); } }); } - fn set_global_output_filepath(filepath: String) { - INIT_FILEPATH.call_once(|| { - unsafe{ - GLOB_FILEPATH = filepath; - } - }); - } - - fn set_global_config(config: Config){ - INIT_CONFIG.call_once(|| { - unsafe{ - GLOB_CONFIG = Some(config); - } - }); - } - fn get_config() -> Config { let mut config = match Config::from_path(DEFAULT_CONFIG_FILE) { Ok(config) => config, @@ -209,7 +163,8 @@ mod tests { pub fn fuzz_target_rfc3164(data: String) { unsafe{ - let mut sender_guard = match GLOB_SYNC_SENDER.lock() { + // Extract the required fields from the global context structure, which is wrapped around by a Mutex + let mut guard = match GLOBAL_CONTEXT.lock() { Ok(guard) => guard, Err(_poisoned_error) => { // Handle the poisoned Mutex @@ -217,27 +172,13 @@ mod tests { guard } }; - let sync_sender: &mut SyncSender> = sender_guard.as_mut().unwrap(); - - let mut encoder_guard = match GLOB_ENCODER.lock() { - Ok(guard) => guard, - Err(_poisoned_error) => { - let guard = _poisoned_error.into_inner(); - guard - } - }; - let encoder: &mut Box = encoder_guard.as_mut().unwrap(); - - let mut decoder_guard = match GLOB_DECODER.lock() { - Ok(guard) => guard, - Err(_poisoned_error) => { - let guard = _poisoned_error.into_inner(); - guard - } - }; - let decoder: &mut Box = decoder_guard.as_mut().unwrap(); - + let context: &mut Context = guard.as_mut().unwrap(); + let sync_sender: &mut SyncSender> = &mut context.sync_sender; + let encoder: &mut Box = &mut context.encoder; + let decoder: &mut Box = &mut context.decoder; let _result = handle_record_maybe_compressed(data.as_bytes(), &sync_sender, &decoder, &encoder); + + drop(guard); } } @@ -247,14 +188,14 @@ mod tests { fn check_result(file_output_path: &str)-> Result<(), std::io::Error> { unsafe{ - let mut sender_guard = match GLOB_SYNC_SENDER.lock() { + let mut guard = match GLOBAL_CONTEXT.lock() { Ok(guard) => guard, Err(_poisoned_error) => { let guard = _poisoned_error.into_inner(); guard } }; - let tx: SyncSender> = sender_guard.take().unwrap(); + let tx: SyncSender> = guard.take().unwrap().sync_sender; drop(tx); let file = fs::File::open(file_output_path).expect("Unable to open output file"); From f4c98f61a3d7ca56b676b1eed22b8fab1a45531d Mon Sep 17 00:00:00 2001 From: Chukwuemeka George Ewurum Date: Thu, 16 May 2024 18:59:18 +0000 Subject: [PATCH 6/8] Add fuzzing test to flowgger Co-authored-by: Chukwuemeka Ewurum --- src/flowgger/test_fuzzer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flowgger/test_fuzzer.rs b/src/flowgger/test_fuzzer.rs index 2fcf430..4667fb2 100644 --- a/src/flowgger/test_fuzzer.rs +++ b/src/flowgger/test_fuzzer.rs @@ -8,7 +8,7 @@ /// # Dependencies /// It depend on the external crates [`QuickCheck`][https://docs.rs/quickcheck/latest/quickcheck/] and /// -/// `QuickCheck`is used to generate random string input, while a global structure is used to intialize shared resources at runtime +/// `QuickCheck`is used to generate random string input, while a global structure is used to initialize shared resources at runtime /// /// # Errors /// @@ -48,7 +48,7 @@ mod tests { const DEFAULT_OUTPUT_FRAMING: &str = "noop"; const DEFAULT_OUTPUT_TYPE: &str = "file"; - const DEFAULT_FUZZED_MESSAGE_COUNT: u64 = 40; + const DEFAULT_FUZZED_MESSAGE_COUNT: u64 = 500; static INIT_CONTEXT: Once = Once::new(); static mut GLOBAL_CONTEXT: Mutex> = Mutex::new(None); From eae05b525f055ac984e3b1d2ca83a36c01579640 Mon Sep 17 00:00:00 2001 From: Chukwuemeka George Ewurum Date: Thu, 16 May 2024 19:04:54 +0000 Subject: [PATCH 7/8] Add fuzzing test to flowgger Co-authored-by: Chukwuemeka Ewurum --- src/flowgger/test_fuzzer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flowgger/test_fuzzer.rs b/src/flowgger/test_fuzzer.rs index 4667fb2..d0c5e1e 100644 --- a/src/flowgger/test_fuzzer.rs +++ b/src/flowgger/test_fuzzer.rs @@ -77,7 +77,7 @@ mod tests { // Set the global context for the fuzzer // The global context is used to share resources across all test runs - // The once call ensures the static vairable referencing the struct is only ever set once + // Call Once ensures the static variable referencing the struct is only ever set once fn set_global_context(config: &Config, sync_sender: SyncSender>){ INIT_CONTEXT.call_once(|| { unsafe{ From 1248350a4dd11d02f67979ac9aa2e8857961ff61 Mon Sep 17 00:00:00 2001 From: Chukwuemeka George Ewurum Date: Thu, 16 May 2024 20:32:30 +0000 Subject: [PATCH 8/8] Updated code formatting Co-authored-by: Chukwuemeka Ewurum --- src/flowgger/encoder/ltsv_encoder.rs | 1 - src/flowgger/test_fuzzer.rs | 140 +++++++++++++++------------ 2 files changed, 80 insertions(+), 61 deletions(-) diff --git a/src/flowgger/encoder/ltsv_encoder.rs b/src/flowgger/encoder/ltsv_encoder.rs index a2859e7..8435ec3 100644 --- a/src/flowgger/encoder/ltsv_encoder.rs +++ b/src/flowgger/encoder/ltsv_encoder.rs @@ -162,7 +162,6 @@ fn test_ltsv_full_encode_multiple_sd() { let ts = ts_from_partial_date_time(Month::August, 6, 11, 15, 24); let expected_msg = format!("a:b\tc:123456\ta2:b2\tc2:123456\thost:testhostname\ttime:{}\tmessage:some test message\tfull_message:<23>Aug 6 11:15:24 testhostname appname[69]: 42 [someid a=\"b\" c=\"123456\"][someid2 a2=\"b2\" c2=\"123456\"] some test message\tlevel:7\tfacility:2\tappname:appname\tprocid:69\tmsgid:42", ts); let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"ltsv\"\n").unwrap(); - let record = Record { ts, diff --git a/src/flowgger/test_fuzzer.rs b/src/flowgger/test_fuzzer.rs index d0c5e1e..90092d9 100644 --- a/src/flowgger/test_fuzzer.rs +++ b/src/flowgger/test_fuzzer.rs @@ -1,18 +1,18 @@ /// Fuzz testing logic defined in this module /// This module depends on the default configuration file `flowgger.toml` -/// -/// The test in this module hits flowgger with random input. -/// The expected state is a failure for invalid inputs (no logs sent to output) +/// +/// The test in this module hits flowgger with random input. +/// The expected state is a failure for invalid inputs (no logs sent to output) /// and successfully parsed logs written to output stream for valid inputs -/// +/// /// # Dependencies -/// It depend on the external crates [`QuickCheck`][https://docs.rs/quickcheck/latest/quickcheck/] and -/// +/// It depends on the external crate [`QuickCheck`][https://docs.rs/quickcheck/latest/quickcheck/] +/// /// `QuickCheck`is used to generate random string input, while a global structure is used to initialize shared resources at runtime -/// +/// /// # Errors /// -/// This function will return an error if the default config does not exists, is unreadbale, or is not valid +/// This function will return an error if the default config does not exists, is unreadable, or is not valid /// toml format #[cfg(test)] mod tests { @@ -22,20 +22,21 @@ mod tests { use quickcheck::QuickCheck; - use std::sync::mpsc::{Receiver, sync_channel, SyncSender}; - use std::sync::{Arc, Mutex}; - use std::{fs}; - use std::io::{BufReader, BufRead}; + use std::fs; + use std::io::{BufRead, BufReader}; + use std::ptr::addr_of_mut; + use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::Once; + use std::sync::{Arc, Mutex}; use flowgger::config::Config; - use flowgger::encoder::Encoder; use flowgger::decoder::Decoder; - use flowgger::merger; - use flowgger::get_output_file; + use flowgger::encoder::Encoder; use flowgger::get_decoder_rfc3164; use flowgger::get_encoder_rfc3164; + use flowgger::get_output_file; use flowgger::input::udp_input::handle_record_maybe_compressed; + use flowgger::merger; use self::merger::{LineMerger, Merger}; use toml::Value; @@ -60,43 +61,50 @@ mod tests { } #[test] - fn test_fuzzer(){ + fn test_fuzzer() { let config = get_config(); - let file_output_path = config.lookup("output.file_path").map_or(DEFAULT_OUTPUT_FILEPATH, |x| { - x.as_str().expect("File output path missing in config") - }); + let file_output_path = config + .lookup("output.file_path") + .map_or(DEFAULT_OUTPUT_FILEPATH, |x| { + x.as_str().expect("File output path missing in config") + }); remove_output_file(&file_output_path); let (tx, rx): (SyncSender>, Receiver>) = sync_channel(DEFAULT_QUEUE_SIZE); start_file_output(&config, rx); set_global_context(&config, tx); - QuickCheck::new().tests(DEFAULT_FUZZED_MESSAGE_COUNT).quickcheck(fuzz_target_rfc3164 as fn(String)); + QuickCheck::new() + .tests(DEFAULT_FUZZED_MESSAGE_COUNT) + .quickcheck(fuzz_target_rfc3164 as fn(String)); let _ = check_result(&file_output_path); } + fn get_global_context() -> *mut Mutex> { + unsafe { addr_of_mut!(GLOBAL_CONTEXT) } + } + // Set the global context for the fuzzer // The global context is used to share resources across all test runs - // Call Once ensures the static variable referencing the struct is only ever set once - fn set_global_context(config: &Config, sync_sender: SyncSender>){ - INIT_CONTEXT.call_once(|| { - unsafe{ - let decoder = get_decoder_rfc3164(config); - let encoder = get_encoder_rfc3164(config); - let (decoder, encoder): (Box, Box) = (decoder.clone_boxed(), encoder.clone_boxed()); - - let context = Context{ - encoder: encoder, - decoder: decoder, - sync_sender: sync_sender, - }; - - let mut guard = GLOBAL_CONTEXT.lock().unwrap(); - if guard.is_none(){ - *guard = Some(context); - } - drop(guard); + // CallOnce routine ensures the static variable referencing the struct is only ever set once + fn set_global_context(config: &Config, sync_sender: SyncSender>) { + INIT_CONTEXT.call_once(|| unsafe { + let decoder = get_decoder_rfc3164(config); + let encoder = get_encoder_rfc3164(config); + let (decoder, encoder): (Box, Box) = + (decoder.clone_boxed(), encoder.clone_boxed()); + + let context = Context { + encoder: encoder, + decoder: decoder, + sync_sender: sync_sender, + }; + + let mut guard = GLOBAL_CONTEXT.lock().unwrap(); + if guard.is_none() { + *guard = Some(context); } + drop(guard); }); } @@ -116,23 +124,32 @@ mod tests { /// Update the default file rotation size and time in the config file /// This ensures output is sent to a single non-rotated file - pub fn update_file_rotation_defaults_in_config(config: &mut Config){ - if let Some(entry) = config.config.get_mut("output").unwrap().get_mut("file_rotation_size"){ + pub fn update_file_rotation_defaults_in_config(config: &mut Config) { + if let Some(entry) = config + .config + .get_mut("output") + .unwrap() + .get_mut("file_rotation_size") + { *entry = Value::Integer(0); } - if let Some(entry) = config.config.get_mut("output").unwrap().get_mut("file_rotation_time"){ + if let Some(entry) = config + .config + .get_mut("output") + .unwrap() + .get_mut("file_rotation_time") + { *entry = Value::Integer(0); } } - pub fn remove_output_file(file_output_path: &str){ + pub fn remove_output_file(file_output_path: &str) { let _ = fs::remove_file(file_output_path); } /// Start an input listener which writes data to the output file once received. - pub fn start_file_output(config: &Config, rx: Receiver>){ - + pub fn start_file_output(config: &Config, rx: Receiver>) { let output_format = config .lookup("output.format") .map_or(DEFAULT_OUTPUT_FORMAT, |x| { @@ -155,16 +172,19 @@ mod tests { _ => DEFAULT_OUTPUT_FRAMING, }, }; - let merger: Option> = Some(Box::new(LineMerger::new(&config)) as Box); + let merger: Option> = + Some(Box::new(LineMerger::new(&config)) as Box); let arx = Arc::new(Mutex::new(rx)); output.start(arx, merger); } pub fn fuzz_target_rfc3164(data: String) { - unsafe{ - // Extract the required fields from the global context structure, which is wrapped around by a Mutex - let mut guard = match GLOBAL_CONTEXT.lock() { + unsafe { + let global_context = get_global_context().as_ref().unwrap(); + + // Extract the required fields from the global context structure, which is wrapped around by a Mutex + let mut guard = match global_context.lock() { Ok(guard) => guard, Err(_poisoned_error) => { // Handle the poisoned Mutex @@ -176,7 +196,8 @@ mod tests { let sync_sender: &mut SyncSender> = &mut context.sync_sender; let encoder: &mut Box = &mut context.encoder; let decoder: &mut Box = &mut context.decoder; - let _result = handle_record_maybe_compressed(data.as_bytes(), &sync_sender, &decoder, &encoder); + let _result = + handle_record_maybe_compressed(data.as_bytes(), &sync_sender, &decoder, &encoder); drop(guard); } @@ -185,10 +206,10 @@ mod tests { // Check for the result // On invalid input, no logs are expected to be written to the output file // For valid inputs, analyze each log entry, and check that the hostnames and appnames are in place - fn check_result(file_output_path: &str)-> Result<(), std::io::Error> { - unsafe{ - - let mut guard = match GLOBAL_CONTEXT.lock() { + fn check_result(file_output_path: &str) -> Result<(), std::io::Error> { + unsafe { + let global_context = get_global_context().as_ref().unwrap(); + let mut guard = match global_context.lock() { Ok(guard) => guard, Err(_poisoned_error) => { let guard = _poisoned_error.into_inner(); @@ -197,6 +218,7 @@ mod tests { }; let tx: SyncSender> = guard.take().unwrap().sync_sender; drop(tx); + drop(guard); let file = fs::File::open(file_output_path).expect("Unable to open output file"); let reader = BufReader::new(file); @@ -204,7 +226,10 @@ mod tests { for line in reader.lines() { let line_item: String = line?; if !line_item.trim().is_empty() { - let split_line_content: Vec<&str> = line_item.split(" ").filter(|data| !data.is_empty()).collect(); + let split_line_content: Vec<&str> = line_item + .split(" ") + .filter(|data| !data.is_empty()) + .collect(); let hostname = split_line_content[3].trim(); let appname = split_line_content[4].trim(); @@ -212,13 +237,8 @@ mod tests { panic!("Log output invalid"); } } - } Ok(()) } } } - - - -