diff --git a/rs/log-fetcher/src/journald_parser.rs b/rs/log-fetcher/src/journald_parser.rs index a1dd474e..34968761 100644 --- a/rs/log-fetcher/src/journald_parser.rs +++ b/rs/log-fetcher/src/journald_parser.rs @@ -12,7 +12,81 @@ pub struct JournalEntry { pub fields: Vec<(String, JournalField)>, } -pub fn parse_journal_entries(body: &[u8]) -> Vec { +#[derive(Debug, Clone)] +enum LineStatus { + NotStarted, + Started, + Utf8, + Binary, +} + +pub fn parse_journal_entries_new(body: &[u8]) -> Vec { + let mut entries = Vec::new(); + let mut current_entry = Vec::new(); + let mut current_line = Vec::new(); + + let mut first_found = LineStatus::NotStarted; + + let mut iter = body.iter(); + while let Some(byte) = iter.next() { + match (byte, first_found.clone()) { + (b'=', LineStatus::Started) => { + current_line.push(*byte); + first_found = LineStatus::Utf8; + } + (b'\n', LineStatus::Started) => { + current_entry.push(current_line.clone()); + current_line.clear(); + let mut next = vec![]; + for _ in 0..8 { + let current = iter.next().unwrap(); + next.push(*current); + current_line.push(*current) + } + + let to_take = + i64::from_le_bytes([next[0], next[1], next[2], next[3], next[4], next[5], next[6], next[7]]); + for _ in 0..to_take { + current_line.push(*iter.next().unwrap()) + } + // To remove the added '\n' by format + iter.next(); + current_entry.push(current_line.clone()); + current_line.clear(); + first_found = LineStatus::NotStarted; + } + (b'\n', LineStatus::Utf8) => { + current_entry.push(current_line.clone()); + current_line.clear(); + first_found = LineStatus::NotStarted; + } + (b'\n', LineStatus::NotStarted) => { + if let Some(entry) = parse_journal_entry(current_entry.as_slice()) { + entries.push(entry); + } + current_entry.clear(); + current_line.clear(); + first_found = LineStatus::NotStarted; + } + (_, LineStatus::Started) | (_, LineStatus::Utf8) => current_line.push(*byte), + (_, LineStatus::NotStarted) => { + current_line.push(*byte); + first_found = LineStatus::Started; + } + (a, b) => unreachable!("Shouldn't happen: {}, {:?}", a, b), + } + } + // Check if there's an entry at the end of the body + if !current_entry.is_empty() { + if let Some(entry) = parse_journal_entry(¤t_entry) { + entries.push(entry); + } + } + + entries +} + +pub fn _parse_journal_entries(body: &[u8]) -> Vec { let mut entries = Vec::new(); let mut current_entry = Vec::new(); let lines: Vec<_> = body.split(|&c| c == b'\n').collect(); @@ -140,7 +214,7 @@ mod tests { fn test_parse_journal_entries() { // Test case with two entries let body = b"field1=value1\nfield2=value2\n\nfield3=value3\nfield4=\n"; - let entries = parse_journal_entries(body); + let entries = parse_journal_entries_new(body); assert_eq!(entries.len(), 2); // Verify the first entry @@ -178,7 +252,7 @@ mod tests { serialize_string_field("MESSAGE", "foo\nbar", &mut serialized_data).unwrap(); body.extend(serialized_data.clone()); - let entries = parse_journal_entries(&body); + let entries = parse_journal_entries_new(&body); assert_eq!(entries.len(), 1); // Verify the entry with binary data @@ -211,7 +285,7 @@ mod tests { serialize_string_field("MESSAGE", "foo\nbar", &mut serialized_data).unwrap(); body.extend(serialized_data); - let entries = parse_journal_entries(&body); + let entries = parse_journal_entries_new(&body); assert_eq!(entries.len(), 1); // Verify the entry with binary data @@ -256,11 +330,73 @@ mod tests { ); } + #[test] + fn test_parse_journal_entries_binary_field_with_newline_end() { + // Test case with binary data + + let mut body = vec![]; + let mut serialized_data = Vec::new(); + body.extend(b"__CURSOR=s=bcce4fb8ffcb40e9a6e05eee8b7831bf;i=5ef603;b=ec25d6795f0645619ddac9afdef453ee;m=545242e7049;t=50f1202\n"); + body.extend(b"__REALTIME_TIMESTAMP=1423944916375353\n"); + body.extend(b"_SYSTEMD_OWNER_UID=1001\n"); + serialize_string_field("OTHER_BIN", "some random data\nbar\n", &mut serialized_data).unwrap(); + body.extend(serialized_data.clone()); + body.extend(b"_AUDIT_LOGINUID=1001\n"); + body.extend(b"SYSLOG_IDENTIFIER=python3\n"); + serialized_data.clear(); + serialize_string_field("MESSAGE", "foo\nbar", &mut serialized_data).unwrap(); + body.extend(serialized_data); + + let entries = parse_journal_entries_new(&body); + assert_eq!(entries.len(), 1); + + // Verify the entry with binary data + let entry = &entries[0]; + assert_eq!(entry.fields.len(), 7); + assert_eq!( + entry.fields[0], + ("__CURSOR".to_string(), JournalField::Utf8("s=bcce4fb8ffcb40e9a6e05eee8b7831bf;i=5ef603;b=ec25d6795f0645619ddac9afdef453ee;m=545242e7049;t=50f1202".to_string())) + ); + assert_eq!( + entry.fields[1], + ( + "__REALTIME_TIMESTAMP".to_string(), + JournalField::Utf8("1423944916375353".to_string()) + ) + ); + assert_eq!( + entry.fields[2], + ("_SYSTEMD_OWNER_UID".to_string(), JournalField::Utf8("1001".to_string())) + ); + assert_eq!( + entry.fields[3], + ( + "OTHER_BIN".to_string(), + JournalField::Binary("some random data\nbar\n".to_string()) + ) + ); + assert_eq!( + entry.fields[4], + ("_AUDIT_LOGINUID".to_string(), JournalField::Utf8("1001".to_string())) + ); + assert_eq!( + entry.fields[5], + ( + "SYSLOG_IDENTIFIER".to_string(), + JournalField::Utf8("python3".to_string()) + ) + ); + assert_eq!( + entry.fields[6], + ("MESSAGE".to_string(), JournalField::Binary("foo\nbar".to_string())) + ); + } + #[test] fn test_parse_journal_entries_empty() { // Test case with empty body let body = b""; - let entries = parse_journal_entries(body); + let entries = parse_journal_entries_new(body); assert_eq!(entries.len(), 0); } } diff --git a/rs/log-fetcher/src/main.rs b/rs/log-fetcher/src/main.rs index 49e0feda..d77fe7df 100644 --- a/rs/log-fetcher/src/main.rs +++ b/rs/log-fetcher/src/main.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::BTreeMap, fs::{self, File}, io::{Read, Write}, path::PathBuf, @@ -15,7 +15,7 @@ use tokio::select; use tokio::sync::mpsc; use url::Url; -use crate::journald_parser::{parse_journal_entries, JournalField}; +use crate::journald_parser::{parse_journal_entries_new, JournalField}; mod journald_parser; @@ -95,10 +95,10 @@ async fn main() -> Result<(), anyhow::Error> { } }; - let entries = parse_journal_entries(&body); + let entries = parse_journal_entries_new(&body); for entry in &entries { - let map: HashMap = entry + let map: BTreeMap = entry .fields .iter() .map(|(name, val)| match val {