Skip to content

Commit

Permalink
Fixes #15209: Control runlog consistency before inserting it
Browse files Browse the repository at this point in the history
  • Loading branch information
amousset committed Jul 12, 2019
1 parent e966115 commit 99a50a7
Show file tree
Hide file tree
Showing 13 changed files with 129 additions and 32 deletions.
2 changes: 0 additions & 2 deletions relay/sources/relayd/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ use crate::{
use futures::Future;
use std::{
collections::HashMap,
fs,
net::SocketAddr,
str::FromStr,
sync::{Arc, RwLock},
};
use tracing::info;
Expand Down
2 changes: 1 addition & 1 deletion relay/sources/relayd/src/data/runinfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use std::{
};
use tracing::debug;

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct RunInfo {
pub node_id: NodeId,
pub timestamp: DateTime<FixedOffset>,
Expand Down
58 changes: 38 additions & 20 deletions relay/sources/relayd/src/data/runlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ use serde::{Deserialize, Serialize};
use std::{
convert::TryFrom,
fmt::{self, Display},
fs::read_to_string,
path::Path,
str::FromStr,
};
use tracing::{debug, error, warn};
Expand All @@ -59,46 +61,55 @@ impl Display for RunLog {
}
}

impl FromStr for RunLog {
type Err = Error;
impl RunLog {
/// Mainly used for testing
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let info = RunInfo::from_str(
path.as_ref()
.file_name()
.and_then(|r| r.to_str())
.ok_or_else(|| {
Error::InvalidRunInfo(path.as_ref().to_str().unwrap_or("").to_string())
})?,
)?;
RunLog::try_from((info, read_to_string(path)?.as_ref()))
}
}

fn from_str(s: &str) -> Result<Self, Self::Err> {
match runlog(s) {
impl TryFrom<(RunInfo, &str)> for RunLog {
type Error = Error;

fn try_from(raw_reports: (RunInfo, &str)) -> Result<Self, Self::Error> {
match runlog(raw_reports.1) {
Ok(raw_runlog) => {
debug!("Parsed runlog {:#?}", raw_runlog.1);
let (reports, failed): (Vec<_>, Vec<_>) =
raw_runlog.1.into_iter().partition(Result::is_ok);
for invalid_report in failed.into_iter().map(Result::unwrap_err) {
warn!("Invalid report: {}", invalid_report);
}
// TODO: avoid collecting?

let reports: Vec<RawReport> = reports.into_iter().map(Result::unwrap).collect();
RunLog::try_from(reports)
RunLog::try_from((raw_reports.0, reports))
}
Err(e) => {
warn!("{:?}: could not parse '{}'", e, s);
warn!("{:?}: could not parse '{}'", e, raw_reports.0);
Err(Error::InvalidRunLog)
}
}
}
}

impl TryFrom<Vec<RawReport>> for RunLog {
impl TryFrom<(RunInfo, Vec<RawReport>)> for RunLog {
type Error = Error;

fn try_from(raw_reports: Vec<RawReport>) -> Result<Self, Self::Error> {
fn try_from(raw_reports: (RunInfo, Vec<RawReport>)) -> Result<Self, Self::Error> {
let reports: Vec<Report> = raw_reports
.1
.into_iter()
.flat_map(RawReport::into_reports)
.collect();

let info = match reports.first() {
None => return Err(Error::EmptyRunlog),
Some(report) => RunInfo {
node_id: report.node_id.clone(),
timestamp: report.start_datetime,
},
};
let info = raw_reports.0;

for report in &reports {
if info.node_id != report.node_id {
Expand All @@ -123,7 +134,7 @@ impl TryFrom<Vec<RawReport>> for RunLog {
#[cfg(test)]
mod tests {
use super::*;
use std::fs::{read_dir, read_to_string};
use std::fs::read_dir;

#[test]
fn it_parses_runlog() {
Expand All @@ -132,8 +143,7 @@ mod tests {
for entry in read_dir("tests/runlogs/").unwrap() {
let path = entry.unwrap().path();
if path.extension().unwrap() == "json" {
let runlog =
RunLog::from_str(&read_to_string(path.with_extension("log")).unwrap()).unwrap();
let runlog = RunLog::new(&path.with_extension("log")).unwrap();
//println!("{}", serde_json::to_string_pretty(&runlog).unwrap());
let reference: RunLog =
serde_json::from_str(&read_to_string(path).unwrap()).unwrap();
Expand All @@ -144,4 +154,12 @@ mod tests {
// check we did at least one test
assert!(test_done > 0);
}

#[test]
fn it_detect_invalid_node_in_runlog() {
assert!(
RunLog::new("2018-08-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906c.log")
.is_err()
);
}
}
8 changes: 4 additions & 4 deletions relay/sources/relayd/src/output/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ mod tests {
use super::*;
use crate::{data::report::QueryableReport, output::database::schema::ruddersysevents::dsl::*};
use diesel;
use std::fs::read_to_string;
use std::str::FromStr;

pub fn db() -> PgPool {
let db_config = DatabaseConfig {
Expand All @@ -177,8 +175,10 @@ mod tests {
.unwrap();
assert_eq!(results.len(), 0);

let runlog =
RunLog::from_str(&read_to_string("tests/runlogs/normal.log").unwrap()).unwrap();
let runlog = RunLog::new(
"tests/runlogs/2018-08-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.log",
)
.unwrap();

// Test inserting the runlog

Expand Down
2 changes: 1 addition & 1 deletion relay/sources/relayd/src/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ fn output_report_database_inner(
.ok_or_else(|| Error::MissingCertificateForNode(run_info.node_id.clone()))?,
)?;

let parsed_runlog = signed_runlog.parse::<RunLog>()?;
let parsed_runlog = RunLog::try_from((run_info.clone(), signed_runlog.as_ref()))?;

let _inserted = insert_runlog(
&job_config
Expand Down
File renamed without changes.
16 changes: 12 additions & 4 deletions relay/sources/relayd/tests/reports_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,16 @@ fn it_reads_and_inserts_a_runlog() {
create_dir_all("target/tmp/test_simple/incoming").unwrap();
let cli_cfg = CliConfiguration::new("tests/test_simple/config/", false);

let file_old = "target/tmp/test_simple/incoming/2017-01-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.log";
let file_new = "target/tmp/test_simple/incoming/2018-01-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.log";
let file_old = "target/tmp/test_simple/incoming/2017-08-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.log";
let file_new = "target/tmp/test_simple/incoming/2018-08-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.log";
let file_broken = "target/tmp/test_simple/incoming/2018-02-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.log";
let file_failed = "target/tmp/test_simple/failed/2018-02-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.log";

copy("tests/runlogs/normal_old.signed", file_old).unwrap();
copy(
"tests/runlogs/2017-08-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.signed",
file_old,
)
.unwrap();
// We need to file to be old
set_file_times(file_old, FileTime::zero(), FileTime::zero()).unwrap();

Expand All @@ -61,7 +65,11 @@ fn it_reads_and_inserts_a_runlog() {

assert!(start_number(&db, 1).is_ok());

copy("tests/runlogs/normal.signed", file_new).unwrap();
copy(
"tests/runlogs/2018-08-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.signed",
file_new,
)
.unwrap();
copy("tests/files/config/main.conf", file_broken).unwrap();

assert!(start_number(&db, 2).is_ok());
Expand Down

0 comments on commit 99a50a7

Please sign in to comment.