Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #15883: Refactor api code in relayd #2509

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions relay/sources/relayd/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 20 additions & 20 deletions relay/sources/relayd/Cargo.toml
Expand Up @@ -14,39 +14,39 @@ name = "rudder-relayd"
path = "src/relayd.rs"

[[bench]]
name = "runlog"
harness = false
name = "runlog"

[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
toml = "0.5"
nom = "5.0"
flate2 = { version = "1.0", default-features = false, features = ["zlib"] }
bytes = "0.4"
chrono = { version = "0.4", features = ["serde"] }
diesel = { version = "1.4", default-features = false, features = ["postgres", "chrono", "r2d2"] }
regex = "1.3"
log = "0.4"
structopt = { version = "0.3", default-features = false }
flate2 = { version = "1.0", default-features = false, features = ["zlib"] }
futures = "0.1"
hex = "0.4"
hyper = { version = "0.12", default-features = false }
inotify = "0.7"
log = "0.4"
md-5 = "0.8"
nom = "5.0"
openssl = "0.10"
regex = "1.3"
reqwest = "0.9"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.8"
structopt = { version = "0.3", default-features = false }
tokio = { version = "0.1", default-features = false, features = ["experimental-tracing"] }
tokio-io = "0.1"
tokio-process = "0.2"
tokio-signal = "0.2"
tokio-threadpool = "0.1"
tokio-process = "0.2"
tokio-io = "0.1"
toml = "0.5"
# Compile dev and release with trace logs enabled
tracing = { version = "0.1", features = ["max_level_trace", "release_max_level_trace"] }
tracing-subscriber = { version = "0.1", default-features = false, features = ["env-filter", "fmt", "tracing-log"] }
tracing-log = { version = "0.1", default-features = false, features = ["log-tracer"] }
futures = "0.1"
hyper = { version = "0.12", default-features = false }
tracing-subscriber = { version = "0.1", default-features = false, features = ["env-filter", "fmt", "tracing-log"] }
warp = { version = "0.1", default-features = false }
reqwest = "0.9"
chrono = { version = "0.4", features = ["serde"] }
md-5 = "0.8"
sha2 = "0.8"
hex = "0.4"
bytes = "0.4"

[dev-dependencies]
criterion = "0.3"
Expand Down
61 changes: 25 additions & 36 deletions relay/sources/relayd/src/api.rs
Expand Up @@ -28,13 +28,20 @@
// You should have received a copy of the GNU General Public License
// along with Rudder. If not, see <http://www.gnu.org/licenses/>.

mod remote_run;
mod shared_files;
mod shared_folder;
mod system;

use crate::{
api::{
remote_run::{RemoteRun, RemoteRunTarget},
shared_files::{SharedFilesHeadParams, SharedFilesPutParams},
shared_folder::SharedFolderParams,
system::{Info, Status},
},
error::Error,
remote_run::{RemoteRun, RemoteRunTarget},
shared_files::{self, SharedFilesHeadParams, SharedFilesPutParams},
shared_folder::{self, SharedFolderParams},
stats::Stats,
status::Status,
JobConfig,
};
use futures::Future;
Expand All @@ -46,7 +53,6 @@ use std::{
path::Path,
sync::{Arc, RwLock},
};
use structopt::clap::crate_version;
use tracing::info;
use warp::{
body::{self, FullBody},
Expand All @@ -57,26 +63,6 @@ use warp::{
reply, Filter, Reply,
};

#[derive(Serialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
pub struct Info {
pub major_version: String,
pub full_version: String,
}

impl Info {
fn new() -> Self {
Info {
major_version: format!(
"{}.{}",
env!("CARGO_PKG_VERSION_MAJOR"),
env!("CARGO_PKG_VERSION_MINOR")
),
full_version: crate_version!().to_string(),
}
}
}

#[derive(Serialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ApiResult {
Expand All @@ -97,16 +83,19 @@ struct ApiResponse<T: Serialize> {

impl<T: Serialize> ApiResponse<T> {
fn new<E: Display>(action: &'static str, data: Result<T, E>) -> Self {
let (data, result, error_details) = match data {
Ok(d) => (Some(d), ApiResult::Success, None),
Err(e) => (None, ApiResult::Error, Some(e.to_string())),
};

Self {
data,
result,
action,
error_details,
match data {
Ok(d) => ApiResponse {
data: Some(d),
result: ApiResult::Success,
action,
error_details: None,
},
Err(e) => ApiResponse {
data: None,
result: ApiResult::Error,
action,
error_details: Some(e.to_string()),
},
}
}

Expand All @@ -121,7 +110,7 @@ impl<T: Serialize> ApiResponse<T> {
}
}

pub fn api(
pub fn run(
listen: SocketAddr,
shutdown: impl Future<Item = ()> + Send + 'static,
job_config: Arc<JobConfig>,
Expand Down
Expand Up @@ -47,7 +47,7 @@ use tracing::{debug, error, span, trace, Level};

// From futures_stream_select_all crate (https://github.com/swizard0/futures-stream-select-all)
// Will be in future versions of futures
pub fn select_all<I, T, E>(streams: I) -> Box<dyn Stream<Item = T, Error = E> + Send>
fn select_all<I, T, E>(streams: I) -> Box<dyn Stream<Item = T, Error = E> + Send>
where
I: IntoIterator + Send,
I::Item: Stream<Item = T, Error = E> + 'static + Send,
Expand Down Expand Up @@ -92,8 +92,8 @@ where

#[derive(Debug)]
pub struct RemoteRun {
pub target: RemoteRunTarget,
pub run_parameters: RunParameters,
target: RemoteRunTarget,
run_parameters: RunParameters,
}

impl RemoteRun {
Expand Down Expand Up @@ -270,7 +270,7 @@ impl RemoteRunTarget {
}

#[derive(Debug, PartialEq)]
pub struct Condition {
struct Condition {
data: String,
}

Expand Down Expand Up @@ -300,7 +300,7 @@ impl FromStr for Condition {
}

#[derive(Debug, PartialEq)]
pub struct RunParameters {
struct RunParameters {
asynchronous: bool,
keep_output: bool,
conditions: Vec<Condition>,
Expand Down
Expand Up @@ -98,7 +98,7 @@ impl Metadata {
fn parse_value(key: &str, file: &str) -> Result<String, Error> {
let regex_key = Regex::new(&format!(r"{}=(?P<key>[^\n]+)\n", key)).unwrap();

match regex_key.captures(&file) {
match regex_key.captures(file) {
Some(capture) => match capture.name("key") {
Some(x) => Ok(x.as_str().to_string()),
_ => Err(Error::InvalidHeader),
Expand All @@ -108,7 +108,7 @@ impl Metadata {
}
}

pub fn validate_signature(
fn validate_signature(
file: &[u8],
pubkey: PKey<Public>,
hash_type: HashType,
Expand All @@ -119,12 +119,6 @@ pub fn validate_signature(
verifier.verify(digest)
}

pub fn metadata_parser(buf: &mut FullBody) -> Result<Metadata, Error> {
let mut metadata: Vec<u8> = Vec::new();
let _ = buf.reader().read_to_end(&mut metadata)?;
Metadata::from_str(str::from_utf8(&metadata)?)
}

fn get_pubkey(pubkey: &str) -> Result<PKey<Public>, ErrorStack> {
PKey::from_rsa(Rsa::public_key_from_pem_pkcs1(
format!(
Expand Down
Expand Up @@ -31,7 +31,27 @@
use crate::{api::ApiResult, check_configuration, output::database::ping, Error, JobConfig};
use serde::Serialize;
use std::sync::Arc;
use structopt::clap::crate_version;

#[derive(Serialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
pub struct Info {
pub major_version: String,
pub full_version: String,
}

impl Info {
pub fn new() -> Self {
Info {
major_version: format!(
"{}.{}",
env!("CARGO_PKG_VERSION_MAJOR"),
env!("CARGO_PKG_VERSION_MINOR")
),
full_version: crate_version!().to_string(),
}
}
}
#[derive(Serialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub struct State {
Expand All @@ -55,14 +75,6 @@ impl From<Result<(), Error>> for State {
}
}

#[derive(Serialize, Debug, PartialEq, Eq)]
pub struct NodeCounts {
// Total nodes under this relays
pub sub_nodes: usize,
// Nodes directly managed by this relay
pub managed_nodes: usize,
}

#[derive(Serialize, Debug, PartialEq, Eq)]
pub struct Status {
database: Option<State>,
Expand Down
16 changes: 12 additions & 4 deletions relay/sources/relayd/src/data/node.rs
Expand Up @@ -28,9 +28,9 @@
// You should have received a copy of the GNU General Public License
// along with Rudder. If not, see <http://www.gnu.org/licenses/>.

use crate::{error::Error, status::NodeCounts};
use crate::error::Error;
use openssl::{stack::Stack, x509::X509};
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use serde_json;
use std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -197,7 +197,7 @@ impl NodesList {
pub fn neighbors_from(&self, nodes: &[String]) -> Vec<Host> {
nodes
.iter()
.filter_map(|n| self.list.data.get::<str>(&n))
.filter_map(|n| self.list.data.get::<str>(n))
.filter(|n| n.policy_server == self.my_id)
.map(|n| n.hostname.clone())
.collect()
Expand Down Expand Up @@ -245,6 +245,14 @@ impl FromStr for RawNodesList {
}
}

#[derive(Serialize, Debug, PartialEq, Eq)]
pub struct NodeCounts {
// Total nodes under this relays
pub sub_nodes: usize,
// Nodes directly managed by this relay
pub managed_nodes: usize,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -345,7 +353,7 @@ mod tests {

let mut actual = NodesList::new("root".to_string(), "tests/files/nodeslist.json", None)
.unwrap()
.sub_relays_from(&vec![
.sub_relays_from(&[
"b745a140-40bc-4b86-b6dc-084488fc906b".to_string(),
"a745a140-40bc-4b86-b6dc-084488fc906b".to_string(),
"root".to_string(),
Expand Down
7 changes: 1 addition & 6 deletions relay/sources/relayd/src/lib.rs
Expand Up @@ -41,14 +41,9 @@ pub mod hashing;
pub mod input;
pub mod output;
pub mod processing;
pub mod remote_run;
pub mod shared_files;
pub mod shared_folder;
pub mod stats;
pub mod status;

use crate::{
api::api,
configuration::{
cli::CliConfiguration,
logging::LogConfig,
Expand Down Expand Up @@ -170,7 +165,7 @@ pub fn start(cli_cfg: CliConfiguration, reload_handle: LogHandle) -> Result<(),
let (tx_stats, rx_stats) = mpsc::channel(1_024);

tokio::spawn(Stats::receiver(stats.clone(), rx_stats));
tokio::spawn(api(
tokio::spawn(api::run(
job_config.cfg.general.listen,
shutdown,
job_config.clone(),
Expand Down