Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #15209: Control runlog consistency before inserting it #2305

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions relay/sources/relayd/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ use crate::{
use futures::Future;
use std::{
collections::HashMap,
fs,
net::SocketAddr,
str::FromStr,
sync::{Arc, RwLock},
};
use tracing::info;
Expand Down
2 changes: 1 addition & 1 deletion relay/sources/relayd/src/data/runinfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use std::{
};
use tracing::debug;

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct RunInfo {
pub node_id: NodeId,
pub timestamp: DateTime<FixedOffset>,
Expand Down
58 changes: 38 additions & 20 deletions relay/sources/relayd/src/data/runlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ use serde::{Deserialize, Serialize};
use std::{
convert::TryFrom,
fmt::{self, Display},
fs::read_to_string,
path::Path,
str::FromStr,
};
use tracing::{debug, error, warn};
Expand All @@ -59,46 +61,55 @@ impl Display for RunLog {
}
}

impl FromStr for RunLog {
type Err = Error;
impl RunLog {
/// Mainly used for testing
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let info = RunInfo::from_str(
path.as_ref()
.file_name()
.and_then(|r| r.to_str())
.ok_or_else(|| {
Error::InvalidRunInfo(path.as_ref().to_str().unwrap_or("").to_string())
})?,
)?;
RunLog::try_from((info, read_to_string(path)?.as_ref()))
}
}

fn from_str(s: &str) -> Result<Self, Self::Err> {
match runlog(s) {
impl TryFrom<(RunInfo, &str)> for RunLog {
type Error = Error;

fn try_from(raw_reports: (RunInfo, &str)) -> Result<Self, Self::Error> {
match runlog(raw_reports.1) {
Ok(raw_runlog) => {
debug!("Parsed runlog {:#?}", raw_runlog.1);
let (reports, failed): (Vec<_>, Vec<_>) =
raw_runlog.1.into_iter().partition(Result::is_ok);
for invalid_report in failed.into_iter().map(Result::unwrap_err) {
warn!("Invalid report: {}", invalid_report);
}
// TODO: avoid collecting?

let reports: Vec<RawReport> = reports.into_iter().map(Result::unwrap).collect();
RunLog::try_from(reports)
RunLog::try_from((raw_reports.0, reports))
}
Err(e) => {
warn!("{:?}: could not parse '{}'", e, s);
warn!("{:?}: could not parse '{}'", e, raw_reports.0);
Err(Error::InvalidRunLog)
}
}
}
}

impl TryFrom<Vec<RawReport>> for RunLog {
impl TryFrom<(RunInfo, Vec<RawReport>)> for RunLog {
type Error = Error;

fn try_from(raw_reports: Vec<RawReport>) -> Result<Self, Self::Error> {
fn try_from(raw_reports: (RunInfo, Vec<RawReport>)) -> Result<Self, Self::Error> {
let reports: Vec<Report> = raw_reports
.1
.into_iter()
.flat_map(RawReport::into_reports)
.collect();

let info = match reports.first() {
None => return Err(Error::EmptyRunlog),
Some(report) => RunInfo {
node_id: report.node_id.clone(),
timestamp: report.start_datetime,
},
};
let info = raw_reports.0;

for report in &reports {
if info.node_id != report.node_id {
Expand All @@ -123,7 +134,7 @@ impl TryFrom<Vec<RawReport>> for RunLog {
#[cfg(test)]
mod tests {
use super::*;
use std::fs::{read_dir, read_to_string};
use std::fs::read_dir;

#[test]
fn it_parses_runlog() {
Expand All @@ -132,8 +143,7 @@ mod tests {
for entry in read_dir("tests/runlogs/").unwrap() {
let path = entry.unwrap().path();
if path.extension().unwrap() == "json" {
let runlog =
RunLog::from_str(&read_to_string(path.with_extension("log")).unwrap()).unwrap();
let runlog = RunLog::new(&path.with_extension("log")).unwrap();
//println!("{}", serde_json::to_string_pretty(&runlog).unwrap());
let reference: RunLog =
serde_json::from_str(&read_to_string(path).unwrap()).unwrap();
Expand All @@ -144,4 +154,12 @@ mod tests {
// check we did at least one test
assert!(test_done > 0);
}

#[test]
fn it_detect_invalid_node_in_runlog() {
assert!(
RunLog::new("2018-08-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906c.log")
.is_err()
);
}
}
8 changes: 4 additions & 4 deletions relay/sources/relayd/src/output/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ mod tests {
use super::*;
use crate::{data::report::QueryableReport, output::database::schema::ruddersysevents::dsl::*};
use diesel;
use std::fs::read_to_string;
use std::str::FromStr;

pub fn db() -> PgPool {
let db_config = DatabaseConfig {
Expand All @@ -177,8 +175,10 @@ mod tests {
.unwrap();
assert_eq!(results.len(), 0);

let runlog =
RunLog::from_str(&read_to_string("tests/runlogs/normal.log").unwrap()).unwrap();
let runlog = RunLog::new(
"tests/runlogs/2018-08-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.log",
)
.unwrap();

// Test inserting the runlog

Expand Down
2 changes: 1 addition & 1 deletion relay/sources/relayd/src/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ fn output_report_database_inner(
.ok_or_else(|| Error::MissingCertificateForNode(run_info.node_id.clone()))?,
)?;

let parsed_runlog = signed_runlog.parse::<RunLog>()?;
let parsed_runlog = RunLog::try_from((run_info.clone(), signed_runlog.as_ref()))?;

let _inserted = insert_runlog(
&job_config
Expand Down
16 changes: 12 additions & 4 deletions relay/sources/relayd/tests/reports_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,16 @@ fn it_reads_and_inserts_a_runlog() {
create_dir_all("target/tmp/test_simple/incoming").unwrap();
let cli_cfg = CliConfiguration::new("tests/test_simple/config/", false);

let file_old = "target/tmp/test_simple/incoming/2017-01-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.log";
let file_new = "target/tmp/test_simple/incoming/2018-01-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.log";
let file_old = "target/tmp/test_simple/incoming/2017-08-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.log";
let file_new = "target/tmp/test_simple/incoming/2018-08-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.log";
let file_broken = "target/tmp/test_simple/incoming/2018-02-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.log";
let file_failed = "target/tmp/test_simple/failed/2018-02-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.log";

copy("tests/runlogs/normal_old.signed", file_old).unwrap();
copy(
"tests/runlogs/2017-08-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.signed",
file_old,
)
.unwrap();
// We need to file to be old
set_file_times(file_old, FileTime::zero(), FileTime::zero()).unwrap();

Expand All @@ -61,7 +65,11 @@ fn it_reads_and_inserts_a_runlog() {

assert!(start_number(&db, 1).is_ok());

copy("tests/runlogs/normal.signed", file_new).unwrap();
copy(
"tests/runlogs/2018-08-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.signed",
file_new,
)
.unwrap();
copy("tests/files/config/main.conf", file_broken).unwrap();

assert!(start_number(&db, 2).is_ok());
Expand Down