Skip to content

Commit

Permalink
Fixes #15497: Add inventory forwarding on relays in relayd
Browse files Browse the repository at this point in the history
  • Loading branch information
amousset committed Aug 19, 2019
1 parent 5e1a529 commit 3eba4ec
Show file tree
Hide file tree
Showing 9 changed files with 358 additions and 73 deletions.
175 changes: 125 additions & 50 deletions relay/sources/relayd/Cargo.lock

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions relay/sources/relayd/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,10 @@ pub fn api(
parse_parameter_from_raw(ttl),
job_config.clone(),
buf,
)
{
) {
Ok(x) => x,
Err(_x) => StatusCode::from_u16(500).unwrap(),
}
,
},
)
},
);
Expand Down
8 changes: 7 additions & 1 deletion relay/sources/relayd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use crate::{
data::node::NodesList,
error::Error,
output::database::{pg_pool, PgPool},
processing::reporting,
processing::{inventory, reporting},
stats::Stats,
};
use futures::{
Expand Down Expand Up @@ -180,6 +180,12 @@ pub fn start(
info!("Skipping reporting as it is disabled");
}

if job_config.cfg.processing.inventory.output.is_enabled() {
inventory::start(&job_config, &tx_stats);
} else {
info!("Skipping inventory as it is disabled");
}

Ok(())
}));
runtime
Expand Down
4 changes: 3 additions & 1 deletion relay/sources/relayd/src/output/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
// along with Rudder. If not, see <http://www.gnu.org/licenses/>.

use crate::{
configuration::main::DatabaseConfig, data::report::QueryableReport, data::RunLog, error::Error,
configuration::main::DatabaseConfig,
data::{report::QueryableReport, RunLog},
Error,
};
use diesel::{
insert_into,
Expand Down
23 changes: 19 additions & 4 deletions relay/sources/relayd/src/output/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@
// You should have received a copy of the GNU General Public License
// along with Rudder. If not, see <http://www.gnu.org/licenses/>.

use crate::Error;
use crate::JobConfig;
use crate::{processing::inventory::InventoryType, Error, JobConfig};
use futures::Future;
use std::path::PathBuf;
use std::sync::Arc;
use std::{path::PathBuf, sync::Arc};
use tracing::{debug, span, Level};

pub fn send_report(
Expand All @@ -44,6 +42,23 @@ pub fn send_report(
Box::new(forward_file(job_config, "reports", path))
}

pub fn send_inventory(
job_config: Arc<JobConfig>,
path: PathBuf,
inventory_type: InventoryType,
) -> Box<dyn Future<Item = (), Error = Error> + Send> {
let report_span = span!(Level::TRACE, "upstream");
let _report_enter = report_span.enter();
Box::new(forward_file(
job_config,
match inventory_type {
InventoryType::New => "inventories",
InventoryType::Update => "inventory-updates",
},
path,
))
}

fn forward_file(
job_config: Arc<JobConfig>,
endpoint: &str,
Expand Down
4 changes: 2 additions & 2 deletions relay/sources/relayd/src/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ use tokio::{
};
use tracing::{debug, error};

//pub mod inventory;
pub mod inventory;
pub mod reporting;

pub type ReceivedFile = PathBuf;
pub type RootDirectory = PathBuf;

#[derive(Debug)]
#[derive(Debug, Copy, Clone)]
enum OutputError {
Transient,
Permanent,
Expand Down
192 changes: 192 additions & 0 deletions relay/sources/relayd/src/processing/inventory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2019 Normation SAS
//
// This file is part of Rudder.
//
// Rudder is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// In accordance with the terms of section 7 (7. Additional Terms.) of
// the GNU General Public License version 3, the copyright holders add
// the following Additional permissions:
// Notwithstanding to the terms of section 5 (5. Conveying Modified Source
// Versions) and 6 (6. Conveying Non-Source Forms.) of the GNU General
// Public License version 3, when you create a Related Module, this
// Related Module is not considered as a part of the work and may be
// distributed under the license agreement of your choice.
// A "Related Module" means a set of sources files including their
// documentation that, without modification of the Source Code, enables
// supplementary functions or services in addition to those offered by
// the Software.
//
// Rudder is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Rudder. If not, see <http://www.gnu.org/licenses/>.

use crate::{
configuration::main::InventoryOutputSelect,
input::watch::*,
output::upstream::send_inventory,
processing::{failure, success, OutputError, ReceivedFile},
stats::Event,
JobConfig,
};
use futures::{future::Future, lazy, sync::mpsc, Stream};
use md5::{Digest, Md5};
use std::{os::unix::ffi::OsStrExt, sync::Arc};
use tokio::prelude::*;
use tracing::{debug, error, info, span, Level};

static INVENTORY_EXTENSIONS: &[&str] = &["gz", "xml", "sign"];

#[derive(Debug, Copy, Clone)]
pub enum InventoryType {
New,
Update,
}

pub fn start(job_config: &Arc<JobConfig>, stats: &mpsc::Sender<Event>) {
let span = span!(Level::TRACE, "inventory");
let _enter = span.enter();

let (sender, receiver) = mpsc::channel(1_024);
tokio::spawn(serve(
job_config.clone(),
receiver,
InventoryType::New,
stats.clone(),
));
watch(
&job_config
.cfg
.processing
.inventory
.directory
.join("incoming"),
&job_config,
&sender,
);
let (sender, receiver) = mpsc::channel(1_024);
tokio::spawn(serve(
job_config.clone(),
receiver,
InventoryType::Update,
stats.clone(),
));
watch(
&job_config
.cfg
.processing
.inventory
.directory
.join("accepted-nodes-updates"),
&job_config,
&sender,
);
}

fn serve(
job_config: Arc<JobConfig>,
rx: mpsc::Receiver<ReceivedFile>,
inventory_type: InventoryType,
stats: mpsc::Sender<Event>,
) -> impl Future<Item = (), Error = ()> {
rx.for_each(move |file| {
// allows skipping temporary .dav files
if !file
.extension()
.map(|f| INVENTORY_EXTENSIONS.contains(&f.to_string_lossy().as_ref()))
.unwrap_or(false)
{
debug!(
"skipping {:#?} as it does not have a known inventory extension",
file
);
return Ok(());
}

let queue_id = format!(
"{:X}",
Md5::digest(
file.file_name()
.unwrap_or_else(|| file.as_os_str())
.as_bytes()
)
);
let span = span!(
Level::INFO,
"inventory",
queue_id = %queue_id,
);
let _enter = span.enter();

let stat_event = stats
.clone()
.send(Event::InventoryReceived)
.map_err(|e| error!("receive error: {}", e))
.map(|_| ());
// FIXME: no need for a spawn
tokio::spawn(lazy(|| stat_event));

debug!("received: {:?}", file);

let treat_file: Box<dyn Future<Item = (), Error = ()> + Send> =
match job_config.cfg.processing.inventory.output {
InventoryOutputSelect::Upstream => output_inventory_upstream(
file.clone(),
inventory_type,
job_config.clone(),
stats.clone(),
),
// The job should not be started in this case
InventoryOutputSelect::Disabled => {
unreachable!("Inventory server should be disabled")
}
};

tokio::spawn(lazy(|| treat_file));
Ok(())
})
}

fn output_inventory_upstream(
path: ReceivedFile,
inventory_type: InventoryType,
job_config: Arc<JobConfig>,
stats: mpsc::Sender<Event>,
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
let job_config_clone = job_config.clone();
let path_clone2 = path.clone();
let stats_clone = stats.clone();
Box::new(
send_inventory(job_config.clone(), path.clone(), inventory_type)
.map_err(|e| {
error!("output error: {}", e);
OutputError::from(e)
})
.or_else(move |e| match e {
OutputError::Permanent => failure(
path_clone2.clone(),
job_config_clone
.clone()
.cfg
.processing
.inventory
.directory
.clone(),
Event::InventoryRefused,
stats.clone(),
),
OutputError::Transient => {
info!("transient error, skipping");
Box::new(futures::future::err::<(), ()>(()))
}
})
.and_then(move |_| success(path.clone(), Event::InventorySent, stats_clone.clone())),
)
}
13 changes: 6 additions & 7 deletions relay/sources/relayd/src/processing/reporting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,16 @@ use futures::{
Stream,
};
use md5::{Digest, Md5};
use std::os::unix::ffi::OsStrExt;
use std::{convert::TryFrom, sync::Arc};
use std::{convert::TryFrom, os::unix::ffi::OsStrExt, sync::Arc};
use tokio::prelude::*;
use tokio_threadpool::blocking;
use tracing::{debug, error, info, span, warn, Level};

static REPORT_EXTENSIONS: &[&str] = &["gz", "log"];

pub fn start(job_config: &Arc<JobConfig>, stats: &mpsc::Sender<Event>) {
let report_span = span!(Level::TRACE, "reporting");
let _report_enter = report_span.enter();
let span = span!(Level::TRACE, "reporting");
let _enter = span.enter();

let (sender, receiver) = mpsc::channel(1_024);
tokio::spawn(serve(job_config.clone(), receiver, stats.clone()));
Expand Down Expand Up @@ -101,12 +100,12 @@ fn serve(
.as_bytes()
)
);
let report_span = span!(
let span = span!(
Level::INFO,
"report",
queue_id = %queue_id,
);
let _report_enter = report_span.enter();
let _enter = span.enter();

let stat_event = stats
.clone()
Expand Down Expand Up @@ -250,7 +249,7 @@ fn output_report_upstream(
Box::new(futures::future::err::<(), ()>(()))
}
})
.and_then(move |_| success(path.clone(), Event::ReportInserted, stats_clone.clone())),
.and_then(move |_| success(path.clone(), Event::ReportSent, stats_clone.clone())),
)
}

Expand Down
6 changes: 2 additions & 4 deletions relay/sources/relayd/src/shared_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,7 @@ pub fn put_handler(
get_pubkey(meta_pubkey.to_string()).unwrap(),
HashType::from_str(hash_type).unwrap(),
&meta.digest,
)?
{
)? {
return Ok(StatusCode::from_u16(404).unwrap());
}

Expand All @@ -280,8 +279,7 @@ pub fn put_handler(
get_pubkey(meta_pubkey.to_string()).unwrap(),
HashType::from_str(hash_type).unwrap(),
&hex::decode(digest).unwrap(),
)?
{
)? {
return Ok(StatusCode::from_u16(500).unwrap());
}

Expand Down

0 comments on commit 3eba4ec

Please sign in to comment.