Skip to content

Commit

Permalink
Work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
amousset committed Dec 28, 2020
1 parent 634f1fb commit 8e5a07c
Show file tree
Hide file tree
Showing 14 changed files with 301 additions and 314 deletions.
2 changes: 2 additions & 0 deletions relay/sources/api-doc/openapi.src.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ paths:
$ref: paths/system/info.yml
"/system/reload":
$ref: paths/system/reload.yml
"/system/metrics":
$ref: paths/system/metrics.yml
"/shared-folder/{path}":
$ref: paths/shared-folder.yml
"/shared-files/{targetNodeId}/{sourceNodeId}/{fileId}":
Expand Down
32 changes: 32 additions & 0 deletions relay/sources/relayd/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions relay/sources/relayd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ humantime = "2"
hyper = { version = "0.13.8", default-features = false }
# 0.9 for tokio 0.3
inotify = "0.8"
lazy_static = "1.4"
log = "0.4"
md-5 = "0.9"
nom = "6"
openssl = "0.10"
prometheus = { version = "0.11", default-features = false, features = ["process"] }
regex = "1"
# Use openssl for TLS to be consistent
reqwest = { version = "0.10", default-features = false, features = ["stream", "blocking", "native-tls"] }
Expand Down
16 changes: 7 additions & 9 deletions relay/sources/relayd/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,9 @@ mod shared_files;
mod shared_folder;
mod system;

use crate::{stats::Stats, JobConfig};
use crate::JobConfig;
use serde::Serialize;
use std::{
fmt,
fmt::Display,
net::ToSocketAddrs,
sync::{Arc, RwLock},
};
use std::{fmt, fmt::Display, net::ToSocketAddrs, sync::Arc};
use tracing::{error, info, span, Level};
use warp::{http::StatusCode, path, reject, reject::Reject, reply, Filter, Rejection, Reply};

Expand Down Expand Up @@ -94,18 +89,21 @@ impl<T: Serialize> ApiResponse<T> {
}
}

pub async fn run(job_config: Arc<JobConfig>, stats: Arc<RwLock<Stats>>) -> Result<(), ()> {
pub async fn run(job_config: Arc<JobConfig>) -> Result<(), ()> {
let span = span!(Level::TRACE, "api");
let _enter = span.enter();

let routes_1 = path!("rudder" / "relay-api" / "1" / ..).and(
system::routes_1(job_config.clone(), stats.clone())
system::routes_1(job_config.clone())
.or(shared_folder::routes_1(job_config.clone()))
.or(shared_files::routes_1(job_config.clone()))
.or(remote_run::routes_1(job_config.clone())),
);
// special case for /metrics
let routes_special = system::routes_metrics();

let routes = routes_1
.or(routes_special)
.recover(customize_error)
.with(warp::log("relayd::api"));

Expand Down
26 changes: 13 additions & 13 deletions relay/sources/relayd/src/api/remote_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl RemoteRun {
// Async and output -> spawn in background and stream output
(true, true) => {
let mut streams = futures::stream::SelectAll::new();
for (relay, target) in self.target.next_hops(job_config.clone()) {
for (relay, target) in self.target.next_hops(job_config.clone()).await {
let stream = self
.forward_call(job_config.clone(), relay.clone(), target.clone())
.await;
Expand All @@ -158,7 +158,7 @@ impl RemoteRun {
self.run_parameters
.remote_run(
&job_config.cfg.remote_run,
self.target.neighbors(job_config.clone()),
self.target.neighbors(job_config.clone()).await,
self.run_parameters.asynchronous,
)
.await,
Expand All @@ -167,15 +167,15 @@ impl RemoteRun {
}
// Async and no output -> spawn in background and return early
(true, false) => {
for (relay, target) in self.target.next_hops(job_config.clone()) {
for (relay, target) in self.target.next_hops(job_config.clone()).await {
let stream = self.forward_call(job_config.clone(), relay, target).await;
tokio::spawn(RemoteRun::consume(stream));
}
tokio::spawn(RemoteRun::consume(
self.run_parameters
.remote_run(
&job_config.cfg.remote_run,
self.target.neighbors(job_config.clone()),
self.target.neighbors(job_config.clone()).await,
self.run_parameters.asynchronous,
)
.await,
Expand All @@ -185,7 +185,7 @@ impl RemoteRun {
// Sync and no output -> wait until the send and return empty output
(false, false) => {
let mut streams = futures::stream::SelectAll::new();
for (relay, target) in self.target.next_hops(job_config.clone()) {
for (relay, target) in self.target.next_hops(job_config.clone()).await {
let stream = self
.forward_call(job_config.clone(), relay.clone(), target.clone())
.await;
Expand All @@ -196,7 +196,7 @@ impl RemoteRun {
self.run_parameters
.remote_run(
&job_config.cfg.remote_run,
self.target.neighbors(job_config.clone()),
self.target.neighbors(job_config.clone()).await,
self.run_parameters.asynchronous,
)
.await
Expand All @@ -207,7 +207,7 @@ impl RemoteRun {
// Sync and output -> wait until the end and return output
(false, true) => {
let mut streams = futures::stream::SelectAll::new();
for (relay, target) in self.target.next_hops(job_config.clone()) {
for (relay, target) in self.target.next_hops(job_config.clone()).await {
let stream = self
.forward_call(job_config.clone(), relay.clone(), target.clone())
.await;
Expand All @@ -218,7 +218,7 @@ impl RemoteRun {
self.run_parameters
.remote_run(
&job_config.cfg.remote_run,
self.target.neighbors(job_config.clone()),
self.target.neighbors(job_config.clone()).await,
self.run_parameters.asynchronous,
)
.await,
Expand Down Expand Up @@ -280,7 +280,7 @@ impl RemoteRun {
Err(e) => {
error!("forward error: {}", e);
// TODO find a better way to chain errors
return Box::new(futures::stream::empty());
Box::new(futures::stream::empty())
}
}
}
Expand All @@ -293,8 +293,8 @@ pub enum RemoteRunTarget {
}

impl RemoteRunTarget {
pub fn neighbors(&self, job_config: Arc<JobConfig>) -> Vec<Host> {
let nodes = job_config.nodes.read().expect("Cannot read nodes list");
pub async fn neighbors(&self, job_config: Arc<JobConfig>) -> Vec<Host> {
let nodes = job_config.nodes.read().await;
let neighbors = match self {
RemoteRunTarget::All => nodes.my_neighbors(),
RemoteRunTarget::Nodes(nodeslist) => nodes.my_neighbors_from(nodeslist),
Expand All @@ -303,8 +303,8 @@ impl RemoteRunTarget {
neighbors
}

pub fn next_hops(&self, job_config: Arc<JobConfig>) -> Vec<(Host, RemoteRunTarget)> {
let nodes = job_config.nodes.read().expect("Cannot read nodes list");
pub async fn next_hops(&self, job_config: Arc<JobConfig>) -> Vec<(Host, RemoteRunTarget)> {
let nodes = job_config.nodes.read().await;
let next_hops = match self {
RemoteRunTarget::All => nodes
.my_sub_relays()
Expand Down
23 changes: 4 additions & 19 deletions relay/sources/relayd/src/api/shared_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,7 @@ pub async fn put(

let file = SharedFile::new(source_id, target_id, file_id)?;

if job_config
.nodes
.read()
.expect("Cannot read nodes list")
.is_subnode(&file.target_id)
{
if job_config.nodes.read().await.is_subnode(&file.target_id) {
put_local(file, params, job_config, body).await
} else if job_config.cfg.general.node_id == "root" {
Err(RudderError::UnknownNode(file.target_id).into())
Expand Down Expand Up @@ -188,12 +183,7 @@ pub async fn put_local(
job_config: Arc<JobConfig>,
body: Bytes,
) -> Result<StatusCode, Error> {
if !job_config
.nodes
.read()
.expect("Cannot read nodes list")
.is_subnode(&file.source_id)
{
if !job_config.nodes.read().await.is_subnode(&file.source_id) {
warn!("unknown source {}", file.source_id);
return Ok(StatusCode::NOT_FOUND);
}
Expand All @@ -220,7 +210,7 @@ pub async fn put_local(
let known_key_hash = job_config
.nodes
.read()
.expect("Cannot read nodes list")
.await
.key_hash(&file.source_id)
.ok_or_else(|| RudderError::UnknownNode(file.source_id.to_string()))?;
let key_hash = known_key_hash.hash_type.hash(&pubkey.public_key_to_der()?);
Expand Down Expand Up @@ -300,12 +290,7 @@ pub async fn head(

let file = SharedFile::new(source_id, target_id, file_id)?;

if job_config
.nodes
.read()
.expect("Cannot read nodes list")
.is_subnode(&file.target_id)
{
if job_config.nodes.read().await.is_subnode(&file.target_id) {
head_local(file, params, job_config).await
} else if job_config.cfg.general.node_id == "root" {
Err(RudderError::UnknownNode(file.target_id).into())
Expand Down
84 changes: 64 additions & 20 deletions relay/sources/relayd/src/api/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
use crate::{
api::{ApiResponse, ApiResult},
check_configuration,
metrics::REGISTRY,
output::database::ping,
stats::Stats,
Error, JobConfig,
};
use serde::Serialize;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use structopt::clap::crate_version;
use warp::{filters::method, path, reply, Filter, Reply};
use warp::{filters::method, path, Filter, Reply};

pub fn routes_1(
job_config: Arc<JobConfig>,
stats: Arc<RwLock<Stats>>,
) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
let base = path!("system" / ..);

Expand All @@ -24,14 +23,11 @@ pub fn routes_1(
});

let job_config_reload = job_config.clone();
let reload = method::post().and(base).and(path!("reload")).map(move || {
Ok(ApiResponse::<()>::new::<Error>(
"reloadConfiguration",
job_config_reload.clone().reload().map(|_| None),
None,
)
.reply())
});
let reload = method::post()
.and(base)
.and(path!("reload"))
.map(move || job_config_reload.clone())
.and_then(|j| handlers::reload(j));

let job_config_status = job_config;
let status = method::get().and(base).and(path!("status")).map(move || {
Expand All @@ -43,15 +39,63 @@ pub fn routes_1(
.reply())
});

// WARNING: Not stable, will be replaced soon
// Kept for testing mainly
let stats = method::get().and(base).and(path!("stats")).map(move || {
Ok(reply::json(
&(*stats.clone().read().expect("open stats database")),
))
});
let metrics = method::get()
.and(base)
.and(path!("metrics"))
.and_then(handlers::metrics);

info.or(reload).or(status).or(metrics)
}

/// Special case for /metrics, pretty standard for prometheus
/// alias for /rudder/relay-api/1/system/metrics
pub fn routes_metrics() -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone {
method::get()
.and(path!("metrics"))
.and_then(handlers::metrics)
}

pub mod handlers {
use super::*;
use crate::api::RudderReject;
use warp::{reject, Rejection, Reply};

info.or(reload).or(status).or(stats)
pub async fn metrics() -> Result<impl Reply, Rejection> {
use prometheus::Encoder;
let encoder = prometheus::TextEncoder::new();

let mut buffer = Vec::new();
if let Err(e) = encoder.encode(&REGISTRY.gather(), &mut buffer) {
return Err(reject::custom(RudderReject::new(e)));
};
let mut res = match String::from_utf8(buffer.clone()) {
Ok(v) => v,
Err(e) => return Err(reject::custom(RudderReject::new(e))),
};
buffer.clear();

let mut buffer = Vec::new();
if let Err(e) = encoder.encode(&prometheus::gather(), &mut buffer) {
return Err(reject::custom(RudderReject::new(e)));
};
let res_custom = match String::from_utf8(buffer.clone()) {
Ok(v) => v,
Err(e) => return Err(reject::custom(RudderReject::new(e))),
};
buffer.clear();

res.push_str(&res_custom);
Ok(res)
}

pub async fn reload(job_config: Arc<JobConfig>) -> Result<impl Reply, Rejection> {
Ok(ApiResponse::<()>::new::<Error>(
"reloadConfiguration",
job_config.reload().await.map(|_| None),
None,
)
.reply())
}
}

// TODO could be in once_cell
Expand Down
Loading

0 comments on commit 8e5a07c

Please sign in to comment.