diff --git a/Cargo.toml b/Cargo.toml index 354d183..c2846a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] edition = "2018" name = "flowgger" -version = "0.3.1" +version = "0.3.2" authors = ["Frank Denis ", "Matteo Bigoi ", "Vivien Chene ", "Francesco Berni "] build = "build.rs" repository = "https://github.com/awslabs/flowgger" @@ -56,6 +56,7 @@ time-tz = "0.3" [dev-dependencies] tempdir = "0.3" +quickcheck = "1" [profile.release] opt-level = 3 diff --git a/src/flowgger/config.rs b/src/flowgger/config.rs index 0f37eea..9920400 100644 --- a/src/flowgger/config.rs +++ b/src/flowgger/config.rs @@ -11,7 +11,10 @@ use toml::Value; /// [`Configuration`]: https://github.com/jedisct1/flowgger/wiki/Configuration #[derive(Clone)] pub struct Config { + #[cfg(not(test))] config: Value, + #[cfg(test)] + pub config: Value, } impl Config { diff --git a/src/flowgger/encoder/ltsv_encoder.rs b/src/flowgger/encoder/ltsv_encoder.rs index 42cba04..8435ec3 100644 --- a/src/flowgger/encoder/ltsv_encoder.rs +++ b/src/flowgger/encoder/ltsv_encoder.rs @@ -134,9 +134,9 @@ use time::Month; #[test] fn test_ltsv_full_encode_no_sd() { let full_msg = "<23>Aug 6 11:15:24 testhostname appname[69]: 42 - some test message"; - let expected_msg = "host:testhostname\ttime:1659784524\tmessage:some test message\tfull_message:<23>Aug 6 11:15:24 testhostname appname[69]: 42 - some test message\tlevel:7\tfacility:2\tappname:appname\tprocid:69\tmsgid:42"; - let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"ltsv\"\n").unwrap(); let ts = ts_from_partial_date_time(Month::August, 6, 11, 15, 24); + let expected_msg = format!("host:testhostname\ttime:{}\tmessage:some test message\tfull_message:<23>Aug 6 11:15:24 testhostname appname[69]: 42 - some test message\tlevel:7\tfacility:2\tappname:appname\tprocid:69\tmsgid:42", ts); + let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"ltsv\"\n").unwrap(); let record = Record { ts, @@ -159,9 +159,9 @@ fn test_ltsv_full_encode_no_sd() { #[test] fn test_ltsv_full_encode_multiple_sd() { let full_msg = "<23>Aug 6 11:15:24 testhostname appname[69]: 42 [someid a=\"b\" c=\"123456\"][someid2 a2=\"b2\" c2=\"123456\"] some test message"; - let expected_msg = "a:b\tc:123456\ta2:b2\tc2:123456\thost:testhostname\ttime:1659784524\tmessage:some test message\tfull_message:<23>Aug 6 11:15:24 testhostname appname[69]: 42 [someid a=\"b\" c=\"123456\"][someid2 a2=\"b2\" c2=\"123456\"] some test message\tlevel:7\tfacility:2\tappname:appname\tprocid:69\tmsgid:42"; - let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"ltsv\"\n").unwrap(); let ts = ts_from_partial_date_time(Month::August, 6, 11, 15, 24); + let expected_msg = format!("a:b\tc:123456\ta2:b2\tc2:123456\thost:testhostname\ttime:{}\tmessage:some test message\tfull_message:<23>Aug 6 11:15:24 testhostname appname[69]: 42 [someid a=\"b\" c=\"123456\"][someid2 a2=\"b2\" c2=\"123456\"] some test message\tlevel:7\tfacility:2\tappname:appname\tprocid:69\tmsgid:42", ts); + let cfg = Config::from_string("[input]\n[input.ltsv_schema]\nformat = \"ltsv\"\n").unwrap(); let record = Record { ts, diff --git a/src/flowgger/input/mod.rs b/src/flowgger/input/mod.rs index 250a9e9..e4d9e7c 100644 --- a/src/flowgger/input/mod.rs +++ b/src/flowgger/input/mod.rs @@ -6,8 +6,10 @@ mod stdin_input; mod tcp; #[cfg(feature = "tls")] mod tls; -#[cfg(feature = "syslog")] +#[cfg(all(feature = "syslog", not(test)))] mod udp_input; +#[cfg(test)] +pub mod udp_input; #[cfg(feature = "file")] pub use self::file::FileInput; diff --git a/src/flowgger/input/udp_input.rs b/src/flowgger/input/udp_input.rs index ebbe61b..6033a7e 100644 --- a/src/flowgger/input/udp_input.rs +++ b/src/flowgger/input/udp_input.rs @@ -97,7 +97,7 @@ impl Input for UdpInput { /// but could not be handled /// `Invalid UTF-8 input`: Bubble up from handle_record, the record is not in a valid utf-8 format, it could be a non /// supported compression format -fn handle_record_maybe_compressed( +pub fn handle_record_maybe_compressed( line: &[u8], tx: &SyncSender>, decoder: &Box, diff --git a/src/flowgger/mod.rs b/src/flowgger/mod.rs index 6aa408b..0dc06f8 100644 --- a/src/flowgger/mod.rs +++ b/src/flowgger/mod.rs @@ -1,13 +1,36 @@ +#[cfg(not(test))] mod config; +#[cfg(not(test))] mod decoder; +#[cfg(not(test))] mod encoder; +#[cfg(not(test))] mod input; +#[cfg(not(test))] mod merger; +#[cfg(not(test))] mod output; + +#[cfg(test)] +pub mod config; +#[cfg(test)] +pub mod decoder; +#[cfg(test)] +pub mod encoder; +#[cfg(test)] +pub mod input; +#[cfg(test)] +pub mod merger; +#[cfg(test)] +pub mod output; + mod record; mod splitter; mod utils; +#[cfg(test)] +mod test_fuzzer; + use std::io::{stderr, Write}; #[cfg(feature = "capnp-recompile")] @@ -179,16 +202,26 @@ fn get_output_kafka(_config: &Config) -> ! { panic!("Support for Kafka hasn't been compiled in") } -#[cfg(feature = "file")] +#[cfg(all(feature = "file", not(test)))] fn get_output_file(config: &Config) -> Box { Box::new(FileOutput::new(config)) as Box } -#[cfg(not(feature = "file"))] +#[cfg(all(not(feature = "file"), not(test)))] fn get_output_file(_config: &Config) -> ! { panic!("Support for file hasn't been compiled in") } +#[cfg(all(feature = "file", test))] +pub fn get_output_file(config: &Config) -> Box { + Box::new(FileOutput::new(config)) as Box +} + +#[cfg(all(not(feature = "file"), test))] +pub fn get_output_file(_config: &Config) -> ! { + panic!("Support for file hasn't been compiled in") +} + #[cfg(feature = "tls")] fn get_output_tls(config: &Config) -> Box { Box::new(TlsOutput::new(config)) as Box @@ -274,12 +307,20 @@ fn get_encoder_passthrough(config: &Config) -> Box { Box::new(PassthroughEncoder::new(config)) as Box } -#[cfg(feature = "rfc3164")] -fn get_decoder_rfc3164(config: &Config) -> Box { +#[cfg(all(feature = "rfc3164", test))] +pub fn get_decoder_rfc3164(config: &Config) -> Box { Box::new(RFC3164Decoder::new(config)) as Box } +#[cfg(all(feature = "rfc3164", test))] +pub fn get_encoder_rfc3164(config: &Config) -> Box { + Box::new(RFC3164Encoder::new(config)) as Box +} -#[cfg(feature = "rfc3164")] +#[cfg(all(feature = "rfc3164", not(test)))] +fn get_decoder_rfc3164(config: &Config) -> Box { + Box::new(RFC3164Decoder::new(config)) as Box +} +#[cfg(all(feature = "rfc3164", not(test)))] fn get_encoder_rfc3164(config: &Config) -> Box { Box::new(RFC3164Encoder::new(config)) as Box } diff --git a/src/flowgger/test_fuzzer.rs b/src/flowgger/test_fuzzer.rs new file mode 100644 index 0000000..90092d9 --- /dev/null +++ b/src/flowgger/test_fuzzer.rs @@ -0,0 +1,244 @@ +/// Fuzz testing logic defined in this module +/// This module depends on the default configuration file `flowgger.toml` +/// +/// The test in this module hits flowgger with random input. +/// The expected state is a failure for invalid inputs (no logs sent to output) +/// and successfully parsed logs written to output stream for valid inputs +/// +/// # Dependencies +/// It depends on the external crate [`QuickCheck`][https://docs.rs/quickcheck/latest/quickcheck/] +/// +/// `QuickCheck`is used to generate random string input, while a global structure is used to initialize shared resources at runtime +/// +/// # Errors +/// +/// This function will return an error if the default config does not exists, is unreadable, or is not valid +/// toml format +#[cfg(test)] +mod tests { + extern crate quickcheck; + + use crate::flowgger; + + use quickcheck::QuickCheck; + + use std::fs; + use std::io::{BufRead, BufReader}; + use std::ptr::addr_of_mut; + use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; + use std::sync::Once; + use std::sync::{Arc, Mutex}; + + use flowgger::config::Config; + use flowgger::decoder::Decoder; + use flowgger::encoder::Encoder; + use flowgger::get_decoder_rfc3164; + use flowgger::get_encoder_rfc3164; + use flowgger::get_output_file; + use flowgger::input::udp_input::handle_record_maybe_compressed; + use flowgger::merger; + + use self::merger::{LineMerger, Merger}; + use toml::Value; + + const DEFAULT_CONFIG_FILE: &str = "flowgger.toml"; + const DEFAULT_OUTPUT_FILEPATH: &str = "output.log"; + const DEFAULT_QUEUE_SIZE: usize = 10_000_000; + + const DEFAULT_OUTPUT_FORMAT: &str = "gelf"; + const DEFAULT_OUTPUT_FRAMING: &str = "noop"; + const DEFAULT_OUTPUT_TYPE: &str = "file"; + + const DEFAULT_FUZZED_MESSAGE_COUNT: u64 = 500; + + static INIT_CONTEXT: Once = Once::new(); + static mut GLOBAL_CONTEXT: Mutex> = Mutex::new(None); + + struct Context { + encoder: Box, + decoder: Box, + sync_sender: SyncSender>, + } + + #[test] + fn test_fuzzer() { + let config = get_config(); + let file_output_path = config + .lookup("output.file_path") + .map_or(DEFAULT_OUTPUT_FILEPATH, |x| { + x.as_str().expect("File output path missing in config") + }); + remove_output_file(&file_output_path); + + let (tx, rx): (SyncSender>, Receiver>) = sync_channel(DEFAULT_QUEUE_SIZE); + start_file_output(&config, rx); + set_global_context(&config, tx); + + QuickCheck::new() + .tests(DEFAULT_FUZZED_MESSAGE_COUNT) + .quickcheck(fuzz_target_rfc3164 as fn(String)); + let _ = check_result(&file_output_path); + } + + fn get_global_context() -> *mut Mutex> { + unsafe { addr_of_mut!(GLOBAL_CONTEXT) } + } + + // Set the global context for the fuzzer + // The global context is used to share resources across all test runs + // CallOnce routine ensures the static variable referencing the struct is only ever set once + fn set_global_context(config: &Config, sync_sender: SyncSender>) { + INIT_CONTEXT.call_once(|| unsafe { + let decoder = get_decoder_rfc3164(config); + let encoder = get_encoder_rfc3164(config); + let (decoder, encoder): (Box, Box) = + (decoder.clone_boxed(), encoder.clone_boxed()); + + let context = Context { + encoder: encoder, + decoder: decoder, + sync_sender: sync_sender, + }; + + let mut guard = GLOBAL_CONTEXT.lock().unwrap(); + if guard.is_none() { + *guard = Some(context); + } + drop(guard); + }); + } + + fn get_config() -> Config { + let mut config = match Config::from_path(DEFAULT_CONFIG_FILE) { + Ok(config) => config, + Err(e) => panic!( + "Unable to read the config file [{}]: {}", + "flowgger.toml", + e.to_string() + ), + }; + + update_file_rotation_defaults_in_config(&mut config); + return config; + } + + /// Update the default file rotation size and time in the config file + /// This ensures output is sent to a single non-rotated file + pub fn update_file_rotation_defaults_in_config(config: &mut Config) { + if let Some(entry) = config + .config + .get_mut("output") + .unwrap() + .get_mut("file_rotation_size") + { + *entry = Value::Integer(0); + } + + if let Some(entry) = config + .config + .get_mut("output") + .unwrap() + .get_mut("file_rotation_time") + { + *entry = Value::Integer(0); + } + } + + pub fn remove_output_file(file_output_path: &str) { + let _ = fs::remove_file(file_output_path); + } + + /// Start an input listener which writes data to the output file once received. + pub fn start_file_output(config: &Config, rx: Receiver>) { + let output_format = config + .lookup("output.format") + .map_or(DEFAULT_OUTPUT_FORMAT, |x| { + x.as_str().expect("output.format must be a string") + }); + + let output = get_output_file(&config); + let output_type = config + .lookup("output.type") + .map_or(DEFAULT_OUTPUT_TYPE, |x| { + x.as_str().expect("output.type must be a string") + }); + + let _output_framing = match config.lookup("output.framing") { + Some(framing) => framing.as_str().expect("output.framing must be a string"), + None => match (output_format, output_type) { + ("capnp", _) | (_, "kafka") => "noop", + (_, "debug") | ("ltsv", _) => "line", + ("gelf", _) => "nul", + _ => DEFAULT_OUTPUT_FRAMING, + }, + }; + let merger: Option> = + Some(Box::new(LineMerger::new(&config)) as Box); + + let arx = Arc::new(Mutex::new(rx)); + output.start(arx, merger); + } + + pub fn fuzz_target_rfc3164(data: String) { + unsafe { + let global_context = get_global_context().as_ref().unwrap(); + + // Extract the required fields from the global context structure, which is wrapped around by a Mutex + let mut guard = match global_context.lock() { + Ok(guard) => guard, + Err(_poisoned_error) => { + // Handle the poisoned Mutex + let guard = _poisoned_error.into_inner(); + guard + } + }; + let context: &mut Context = guard.as_mut().unwrap(); + let sync_sender: &mut SyncSender> = &mut context.sync_sender; + let encoder: &mut Box = &mut context.encoder; + let decoder: &mut Box = &mut context.decoder; + let _result = + handle_record_maybe_compressed(data.as_bytes(), &sync_sender, &decoder, &encoder); + + drop(guard); + } + } + + // Check for the result + // On invalid input, no logs are expected to be written to the output file + // For valid inputs, analyze each log entry, and check that the hostnames and appnames are in place + fn check_result(file_output_path: &str) -> Result<(), std::io::Error> { + unsafe { + let global_context = get_global_context().as_ref().unwrap(); + let mut guard = match global_context.lock() { + Ok(guard) => guard, + Err(_poisoned_error) => { + let guard = _poisoned_error.into_inner(); + guard + } + }; + let tx: SyncSender> = guard.take().unwrap().sync_sender; + drop(tx); + drop(guard); + + let file = fs::File::open(file_output_path).expect("Unable to open output file"); + let reader = BufReader::new(file); + + for line in reader.lines() { + let line_item: String = line?; + if !line_item.trim().is_empty() { + let split_line_content: Vec<&str> = line_item + .split(" ") + .filter(|data| !data.is_empty()) + .collect(); + let hostname = split_line_content[3].trim(); + let appname = split_line_content[4].trim(); + + if hostname.is_empty() || appname.is_empty() { + panic!("Log output invalid"); + } + } + } + Ok(()) + } + } +}