Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #18124: Add ReportsExecution insertion in relayd #3182

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1,137 changes: 620 additions & 517 deletions relay/sources/relayd/Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions relay/sources/relayd/Cargo.toml
Expand Up @@ -27,14 +27,14 @@ hex = "0.4"
hyper = { version = "0.12", default-features = false }
inotify = "0.7"
log = "0.4"
md-5 = "0.8"
md-5 = "0.9"
nom = "5"
openssl = "0.10"
regex = "1"
reqwest = "0.9"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha2 = "0.8"
sha2 = "0.9"
structopt = { version = "0.3", default-features = false }
thiserror = "1"
tokio = { version = "0.1", default-features = false, features = ["experimental-tracing"] }
Expand All @@ -55,4 +55,4 @@ zip = "0.5"
criterion = "0.3"
filetime = "0.2"
tempfile = "3"
proptest = "0.9"
proptest = "0.10"
3 changes: 1 addition & 2 deletions relay/sources/relayd/benches/benches.rs
Expand Up @@ -5,10 +5,9 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion};
use diesel::{self, prelude::*};
use flate2::read::GzDecoder;
use openssl::{stack::Stack, x509::X509};
use relayd::data::node::NodesList;
use relayd::{
configuration::{main::DatabaseConfig, Secret},
data::{report::QueryableReport, RunInfo, RunLog},
data::{node::NodesList, report::QueryableReport, RunInfo, RunLog},
input::signature,
output::database::{schema::ruddersysevents::dsl::*, *},
};
Expand Down
124 changes: 101 additions & 23 deletions relay/sources/relayd/src/data/runlog.rs
Expand Up @@ -7,7 +7,9 @@ use crate::{
Report, RunInfo,
},
error::Error,
output::database::schema::reportsexecution,
};
use chrono::prelude::*;
use serde::{Deserialize, Serialize};
use std::{
collections::HashSet,
Expand All @@ -17,11 +19,48 @@ use std::{
path::Path,
str::FromStr,
};

use tracing::{debug, error, warn};

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Insertable)]
#[table_name = "reportsexecution"]
/// Represents a runlog in the database
pub struct InsertedRunlog {
#[column_name = "nodeid"]
pub node_id: String,
#[column_name = "date"]
pub date: DateTime<FixedOffset>,
#[column_name = "complete"]
pub complete: bool,
#[column_name = "nodeconfigid"]
pub node_config_id: String,
#[column_name = "insertionid"]
pub insertion_id: i64,
#[column_name = "insertiondate"]
pub insertion_date: Option<DateTime<FixedOffset>>,
#[column_name = "compliancecomputationdate"]
pub compliance_computation_date: Option<DateTime<FixedOffset>>,
}

impl InsertedRunlog {
pub fn new(runlog: &RunLog, insertion_id: i64) -> Self {
Self {
node_id: runlog.info.node_id.clone(),
date: runlog.info.timestamp,
complete: true,
node_config_id: runlog.config_id.clone(),
insertion_id,
// None means default value will be inserted, here current_timestamp
insertion_date: None,
compliance_computation_date: None,
}
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct RunLog {
pub info: RunInfo,
pub config_id: String,
// Never empty vec
pub reports: Vec<Report>,
}
Expand Down Expand Up @@ -52,6 +91,7 @@ impl RunLog {
pub fn without_types(&self, types: &HashSet<String>) -> Self {
Self {
info: self.info.clone(),
config_id: self.config_id.clone(),
reports: self
.reports
.as_slice()
Expand Down Expand Up @@ -102,6 +142,13 @@ impl TryFrom<(RunInfo, Vec<RawReport>)> for RunLog {
.ok_or(Error::InconsistentRunlog)?
.start_datetime;

let config_id = reports
.iter()
.find(|r| r.event_type == "control" && r.component == "end")
.ok_or(Error::MissingEndRun)?
.key_value
.clone();

for report in &reports {
if info.node_id != report.node_id {
error!(
Expand All @@ -117,7 +164,11 @@ impl TryFrom<(RunInfo, Vec<RawReport>)> for RunLog {
);
}
}
Ok(Self { info, reports })
Ok(Self {
info,
reports,
config_id,
})
}
}

Expand Down Expand Up @@ -158,12 +209,34 @@ mod tests {
fn it_removes_logs_in_runlog() {
let mut filter = HashSet::new();
let _ = filter.insert("log_info".to_string());
let end_run = Report {
start_datetime: DateTime::parse_from_str(
"2018-08-24 15:55:01+00:00",
"%Y-%m-%d %H:%M:%S%z",
)
.unwrap(),
rule_id: "rudder".into(),
directive_id: "run".into(),
component: "CRON Daemon".into(),
key_value: "20180824-130007-3ad37587".into(),
event_type: "control".into(),
msg: "End execution".into(),
policy: "Common".into(),
node_id: "root".into(),
serial: 0,
execution_datetime: DateTime::parse_from_str(
"2018-08-24 15:55:01+00:00",
"%Y-%m-%d %H:%M:%S%z",
)
.unwrap(),
};
assert_eq!(
RunLog {
info: RunInfo::from_str(
"2018-08-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.log"
)
.unwrap(),
config_id: "20180824-130007-3ad37587".to_string(),
reports: vec![
Report {
start_datetime: DateTime::parse_from_str(
Expand Down Expand Up @@ -206,7 +279,8 @@ mod tests {
"%Y-%m-%d %H:%M:%S%z"
)
.unwrap(),
}
},
end_run.clone()
]
}
.without_types(&filter),
Expand All @@ -215,27 +289,31 @@ mod tests {
"2018-08-24T15:55:01+00:00@e745a140-40bc-4b86-b6dc-084488fc906b.log"
)
.unwrap(),
reports: vec![Report {
start_datetime: DateTime::parse_from_str(
"2018-08-24 15:55:01+00:00",
"%Y-%m-%d %H:%M:%S%z"
)
.unwrap(),
rule_id: "hasPolicyServer-root".into(),
directive_id: "common-root".into(),
component: "CRON Daemon".into(),
key_value: "None".into(),
event_type: "result_repaired".into(),
msg: "Cron daemon status was repaired".into(),
policy: "Common".into(),
node_id: "root".into(),
serial: 0,
execution_datetime: DateTime::parse_from_str(
"2018-08-24 15:55:01+00:00",
"%Y-%m-%d %H:%M:%S%z"
)
.unwrap(),
}]
config_id: "20180824-130007-3ad37587".to_string(),
reports: vec![
Report {
start_datetime: DateTime::parse_from_str(
"2018-08-24 15:55:01+00:00",
"%Y-%m-%d %H:%M:%S%z"
)
.unwrap(),
rule_id: "hasPolicyServer-root".into(),
directive_id: "common-root".into(),
component: "CRON Daemon".into(),
key_value: "None".into(),
event_type: "result_repaired".into(),
msg: "Cron daemon status was repaired".into(),
policy: "Common".into(),
node_id: "root".into(),
serial: 0,
execution_datetime: DateTime::parse_from_str(
"2018-08-24 15:55:01+00:00",
"%Y-%m-%d %H:%M:%S%z"
)
.unwrap(),
},
end_run
]
}
);
}
Expand Down
2 changes: 2 additions & 0 deletions relay/sources/relayd/src/error.rs
Expand Up @@ -21,6 +21,8 @@ pub enum Error {
InvalidFile(PathBuf),
#[error("inconsistent run log")]
InconsistentRunlog,
#[error("missing endRun report")]
MissingEndRun,
#[error("empty run log")]
EmptyRunlog,
#[error("missing id in certificate")]
Expand Down
8 changes: 4 additions & 4 deletions relay/sources/relayd/src/hashing.rs
Expand Up @@ -100,13 +100,13 @@ impl HashType {
let value = match self {
HashType::Sha256 => {
let mut hasher = Sha256::new();
hasher.input(bytes);
format!("{:x}", hasher.result())
hasher.update(bytes);
format!("{:x}", hasher.finalize())
}
HashType::Sha512 => {
let mut hasher = Sha512::new();
hasher.input(bytes);
format!("{:x}", hasher.result())
hasher.update(bytes);
format!("{:x}", hasher.finalize())
}
};
Hash {
Expand Down
3 changes: 1 addition & 2 deletions relay/sources/relayd/src/lib.rs
Expand Up @@ -47,10 +47,9 @@ use tracing::{debug, error, info};
use tracing_log::LogTracer;
use tracing_subscriber::{
filter::EnvFilter,
fmt::Subscriber,
fmt::{
format::{Format, Full, NewRecorder},
Formatter,
Formatter, Subscriber,
},
reload::Handle,
};
Expand Down
62 changes: 51 additions & 11 deletions relay/sources/relayd/src/output/database.rs
Expand Up @@ -3,7 +3,7 @@

use crate::{
configuration::main::DatabaseConfig,
data::{report::QueryableReport, RunLog},
data::{report::QueryableReport, runlog::InsertedRunlog, RunLog},
Error,
};
use diesel::{
Expand Down Expand Up @@ -34,6 +34,19 @@ pub mod schema {
serial -> Integer,
}
}

table! {
// (nodeid, date) is the primary key
reportsexecution(nodeid, date) {
nodeid -> Text,
date -> Timestamptz,
complete -> Bool,
nodeconfigid -> Text,
insertionid -> Nullable<BigInt>,
insertiondate -> Nullable<Timestamptz>,
compliancecomputationdate -> Nullable<Timestamptz>,
}
}
}

pub type PgPool = Pool<ConnectionManager<PgConnection>>;
Expand Down Expand Up @@ -76,7 +89,10 @@ pub fn insert_runlog(
runlog: &RunLog,
behavior: InsertionBehavior,
) -> Result<RunlogInsertion, Error> {
use self::schema::ruddersysevents::dsl::*;
use self::schema::{
reportsexecution::dsl::*,
ruddersysevents::dsl::{nodeid, *},
};
let report_span = span!(Level::TRACE, "database");
let _report_enter = report_span.enter();

Expand Down Expand Up @@ -107,15 +123,24 @@ pub fn insert_runlog(
.and(ruleid.eq(&first_report.rule_id))
.and(directiveid.eq(&first_report.directive_id)),
)
.limit(1)
.load::<QueryableReport>(connection)?
.is_empty();
.first::<QueryableReport>(connection)
.optional()?
.is_none();

if behavior == InsertionBehavior::AllowDuplicate || new_runlog {
trace!("Inserting runlog {:#?}", runlog);
insert_into(ruddersysevents)
let report_id = insert_into(ruddersysevents)
.values(&runlog.reports)
.get_results::<QueryableReport>(connection)?
.get(0)
.expect("inserted runlog cannot be empty")
.id;

let runlog_info = InsertedRunlog::new(&runlog, report_id);
insert_into(reportsexecution)
.values(runlog_info)
.execute(connection)?;

Ok(RunlogInsertion::Inserted)
} else {
error!(
Expand All @@ -135,10 +160,11 @@ pub fn insert_runlog(
mod tests {
use super::*;
use crate::{
configuration::Secret, data::report::QueryableReport,
output::database::schema::ruddersysevents::dsl::*,
configuration::Secret,
data::report::QueryableReport,
output::database::schema::{reportsexecution::dsl::*, ruddersysevents::dsl::*},
};
use diesel;
use diesel::dsl::count;

pub fn db() -> PgPool {
let db_config = DatabaseConfig {
Expand All @@ -155,6 +181,8 @@ mod tests {
let db = &*pool.get().unwrap();

diesel::delete(ruddersysevents).execute(db).unwrap();
diesel::delete(reportsexecution).execute(db).unwrap();

let results = ruddersysevents
.limit(1)
.load::<QueryableReport>(db)
Expand All @@ -177,7 +205,13 @@ mod tests {
.limit(100)
.load::<QueryableReport>(db)
.unwrap();
assert_eq!(results.len(), 71);
assert_eq!(results.len(), 72);

let results: i64 = reportsexecution
.select(count(insertionid))
.first(db)
.unwrap();
assert_eq!(results, 1);

// Test inserting twice the same runlog

Expand All @@ -190,6 +224,12 @@ mod tests {
.limit(100)
.load::<QueryableReport>(db)
.unwrap();
assert_eq!(results.len(), 71);
assert_eq!(results.len(), 72);

let results: i64 = reportsexecution
.select(count(insertionid))
.first(db)
.unwrap();
assert_eq!(results, 1);
}
}