Skip to content

Commit

Permalink
fix(log-fetcher): binary fields that end with \n are not parsed cor…
Browse files Browse the repository at this point in the history
…rectly (#213)

* fixed bug where the end was the new line

* refactoring

---------

Co-authored-by: pietrodimarco-dfinity <124565147+pietrodimarco-dfinity@users.noreply.github.com>
  • Loading branch information
NikolaMilosa and pietrodimarco-dfinity committed Feb 27, 2024
1 parent 31e9076 commit 2d96e38
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 9 deletions.
146 changes: 141 additions & 5 deletions rs/log-fetcher/src/journald_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,81 @@ pub struct JournalEntry {
pub fields: Vec<(String, JournalField)>,
}

pub fn parse_journal_entries(body: &[u8]) -> Vec<JournalEntry> {
#[derive(Debug, Clone)]
enum LineStatus {
NotStarted,
Started,
Utf8,
Binary,
}

pub fn parse_journal_entries_new(body: &[u8]) -> Vec<JournalEntry> {
let mut entries = Vec::new();
let mut current_entry = Vec::new();
let mut current_line = Vec::new();

let mut first_found = LineStatus::NotStarted;

let mut iter = body.iter();
while let Some(byte) = iter.next() {
match (byte, first_found.clone()) {
(b'=', LineStatus::Started) => {
current_line.push(*byte);
first_found = LineStatus::Utf8;
}
(b'\n', LineStatus::Started) => {
current_entry.push(current_line.clone());
current_line.clear();
let mut next = vec![];
for _ in 0..8 {
let current = iter.next().unwrap();
next.push(*current);
current_line.push(*current)
}

let to_take =
i64::from_le_bytes([next[0], next[1], next[2], next[3], next[4], next[5], next[6], next[7]]);
for _ in 0..to_take {
current_line.push(*iter.next().unwrap())
}
// To remove the added '\n' by format
iter.next();
current_entry.push(current_line.clone());
current_line.clear();
first_found = LineStatus::NotStarted;
}
(b'\n', LineStatus::Utf8) => {
current_entry.push(current_line.clone());
current_line.clear();
first_found = LineStatus::NotStarted;
}
(b'\n', LineStatus::NotStarted) => {
if let Some(entry) = parse_journal_entry(current_entry.as_slice()) {
entries.push(entry);
}
current_entry.clear();
current_line.clear();
first_found = LineStatus::NotStarted;
}
(_, LineStatus::Started) | (_, LineStatus::Utf8) => current_line.push(*byte),
(_, LineStatus::NotStarted) => {
current_line.push(*byte);
first_found = LineStatus::Started;
}
(a, b) => unreachable!("Shouldn't happen: {}, {:?}", a, b),
}
}
// Check if there's an entry at the end of the body
if !current_entry.is_empty() {
if let Some(entry) = parse_journal_entry(&current_entry) {
entries.push(entry);
}
}

entries
}

pub fn _parse_journal_entries(body: &[u8]) -> Vec<JournalEntry> {
let mut entries = Vec::new();
let mut current_entry = Vec::new();
let lines: Vec<_> = body.split(|&c| c == b'\n').collect();
Expand Down Expand Up @@ -140,7 +214,7 @@ mod tests {
fn test_parse_journal_entries() {
// Test case with two entries
let body = b"field1=value1\nfield2=value2\n\nfield3=value3\nfield4=\n";
let entries = parse_journal_entries(body);
let entries = parse_journal_entries_new(body);
assert_eq!(entries.len(), 2);

// Verify the first entry
Expand Down Expand Up @@ -178,7 +252,7 @@ mod tests {
serialize_string_field("MESSAGE", "foo\nbar", &mut serialized_data).unwrap();
body.extend(serialized_data.clone());

let entries = parse_journal_entries(&body);
let entries = parse_journal_entries_new(&body);
assert_eq!(entries.len(), 1);

// Verify the entry with binary data
Expand Down Expand Up @@ -211,7 +285,7 @@ mod tests {
serialize_string_field("MESSAGE", "foo\nbar", &mut serialized_data).unwrap();
body.extend(serialized_data);

let entries = parse_journal_entries(&body);
let entries = parse_journal_entries_new(&body);
assert_eq!(entries.len(), 1);

// Verify the entry with binary data
Expand Down Expand Up @@ -256,11 +330,73 @@ mod tests {
);
}

#[test]
fn test_parse_journal_entries_binary_field_with_newline_end() {
// Test case with binary data

let mut body = vec![];
let mut serialized_data = Vec::new();
body.extend(b"__CURSOR=s=bcce4fb8ffcb40e9a6e05eee8b7831bf;i=5ef603;b=ec25d6795f0645619ddac9afdef453ee;m=545242e7049;t=50f1202\n");
body.extend(b"__REALTIME_TIMESTAMP=1423944916375353\n");
body.extend(b"_SYSTEMD_OWNER_UID=1001\n");
serialize_string_field("OTHER_BIN", "some random data\nbar\n", &mut serialized_data).unwrap();
body.extend(serialized_data.clone());
body.extend(b"_AUDIT_LOGINUID=1001\n");
body.extend(b"SYSLOG_IDENTIFIER=python3\n");
serialized_data.clear();
serialize_string_field("MESSAGE", "foo\nbar", &mut serialized_data).unwrap();
body.extend(serialized_data);

let entries = parse_journal_entries_new(&body);
assert_eq!(entries.len(), 1);

// Verify the entry with binary data
let entry = &entries[0];
assert_eq!(entry.fields.len(), 7);
assert_eq!(
entry.fields[0],
("__CURSOR".to_string(), JournalField::Utf8("s=bcce4fb8ffcb40e9a6e05eee8b7831bf;i=5ef603;b=ec25d6795f0645619ddac9afdef453ee;m=545242e7049;t=50f1202".to_string()))
);
assert_eq!(
entry.fields[1],
(
"__REALTIME_TIMESTAMP".to_string(),
JournalField::Utf8("1423944916375353".to_string())
)
);
assert_eq!(
entry.fields[2],
("_SYSTEMD_OWNER_UID".to_string(), JournalField::Utf8("1001".to_string()))
);
assert_eq!(
entry.fields[3],
(
"OTHER_BIN".to_string(),
JournalField::Binary("some random data\nbar\n".to_string())
)
);
assert_eq!(
entry.fields[4],
("_AUDIT_LOGINUID".to_string(), JournalField::Utf8("1001".to_string()))
);
assert_eq!(
entry.fields[5],
(
"SYSLOG_IDENTIFIER".to_string(),
JournalField::Utf8("python3".to_string())
)
);
assert_eq!(
entry.fields[6],
("MESSAGE".to_string(), JournalField::Binary("foo\nbar".to_string()))
);
}

#[test]
fn test_parse_journal_entries_empty() {
// Test case with empty body
let body = b"";
let entries = parse_journal_entries(body);
let entries = parse_journal_entries_new(body);
assert_eq!(entries.len(), 0);
}
}
8 changes: 4 additions & 4 deletions rs/log-fetcher/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::HashMap,
collections::BTreeMap,
fs::{self, File},
io::{Read, Write},
path::PathBuf,
Expand All @@ -15,7 +15,7 @@ use tokio::select;
use tokio::sync::mpsc;
use url::Url;

use crate::journald_parser::{parse_journal_entries, JournalField};
use crate::journald_parser::{parse_journal_entries_new, JournalField};

mod journald_parser;

Expand Down Expand Up @@ -95,10 +95,10 @@ async fn main() -> Result<(), anyhow::Error> {
}
};

let entries = parse_journal_entries(&body);
let entries = parse_journal_entries_new(&body);

for entry in &entries {
let map: HashMap<String, String> = entry
let map: BTreeMap<String, String> = entry
.fields
.iter()
.map(|(name, val)| match val {
Expand Down

0 comments on commit 2d96e38

Please sign in to comment.