Skip to content

Commit

Permalink
refactor(MSD): migrate warp to axum (#162)
Browse files Browse the repository at this point in the history
* migrating to axum

* refactored

* repinning
  • Loading branch information
NikolaMilosa committed Feb 6, 2024
1 parent e09734b commit 096ed24
Show file tree
Hide file tree
Showing 12 changed files with 1,082 additions and 1,193 deletions.
1,527 changes: 751 additions & 776 deletions Cargo.Bazel.lock

Large diffs are not rendered by default.

366 changes: 192 additions & 174 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion rs/ic-observability/multiservice-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ slog-async = { workspace = true }
slog-term = { workspace = true }
tokio = { workspace = true }
url = { workspace = true }
warp = { workspace = true }
futures.workspace = true
axum = "0.7.4"
20 changes: 12 additions & 8 deletions rs/ic-observability/multiservice-discovery/src/definition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,19 @@ pub struct RunningDefinition {
}

pub struct TestDefinition {
pub(crate) running_def: RunningDefinition
pub(crate) running_def: RunningDefinition,
}

impl TestDefinition {
pub(crate) fn new(definition: Definition) -> Self {
let (_, stop_signal) = crossbeam::channel::bounded::<()>(0);
let ender: Arc<Mutex<Option<Ender>>> = Arc::new(Mutex::new(None));
Self {
running_def: RunningDefinition{
running_def: RunningDefinition {
definition,
stop_signal,
ender
}
ender,
},
}
}

Expand All @@ -102,10 +102,12 @@ impl TestDefinition {
// E.g. telemetry should collect this.
// return;
// }
let _ = self.running_def.definition.ic_discovery
let _ = self
.running_def
.definition
.ic_discovery
.load_new_ics(self.running_def.definition.log.clone());
}

}

impl Definition {
Expand Down Expand Up @@ -256,7 +258,6 @@ impl RunningDefinition {
}
}


// Syncs the registry and keeps running, syncing as new
// registry versions come in.
async fn run(&self) {
Expand Down Expand Up @@ -419,7 +420,10 @@ impl DefinitionsSupervisor {
ic_names_to_add.insert(ic_name);
}

if !self.allow_mercury_deletion && !ic_names_to_add.contains(&Network::Mainnet.legacy_name()) {
if !self.allow_mercury_deletion
&& !ic_names_to_add.contains(&Network::Mainnet.legacy_name())
&& start_mode == StartMode::ReplaceExistingDefinitions
{
error
.errors
.push(StartDefinitionError::DeletionDisallowed(Network::Mainnet.legacy_name()))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
use slog::Logger;
use axum::extract::State;
use axum::http::StatusCode;
use axum::Json;
use std::error::Error;
use std::fmt::{Display, Error as FmtError, Formatter};

use warp::Reply;

use crate::definition::DefinitionsSupervisor;
use crate::server_handlers::dto::BoundaryNodeDto;
use crate::server_handlers::{bad_request, not_found, ok, WebResult};

#[derive(Clone)]
pub(super) struct AddBoundaryNodeToDefinitionBinding {
pub(crate) supervisor: DefinitionsSupervisor,
pub(crate) log: Logger,
}
use super::{bad_request, not_found, ok, Server};

#[derive(Debug)]

Expand All @@ -29,28 +23,33 @@ impl Display for DefinitionNotFound {
}

pub(super) async fn add_boundary_node(
boundary_node: BoundaryNodeDto,
binding: AddBoundaryNodeToDefinitionBinding,
) -> WebResult<impl Reply> {
let log = binding.log.clone();
State(binding): State<Server>,
Json(boundary_node): Json<BoundaryNodeDto>,
) -> Result<String, (StatusCode, String)> {
let name = boundary_node.name.clone();
let ic_name = boundary_node.ic_name.clone();
let rej: String = format!("Definition {} could not be added", name);
let rejection = format!("Definition {} could not be added", name);

let mut definitions = binding.supervisor.definitions.lock().await;

let running_definition = match definitions.get_mut(&ic_name) {
Some(d) => d,
None => return not_found(log, rej, DefinitionNotFound { ic_name }),
None => {
return not_found(
binding.log,
format!("Couldn't find definition: '{}'", ic_name),
DefinitionNotFound { ic_name },
)
}
};

let bn = match boundary_node.try_into_boundary_node() {
Ok(bn) => bn,
Err(e) => return bad_request(log, rej, e),
Err(e) => return bad_request(binding.log, rejection, e),
};

match running_definition.add_boundary_node(bn).await {
Ok(()) => ok(log, format!("Definition {} added successfully", name)),
Err(e) => bad_request(log, rej, e),
Ok(()) => ok(binding.log, format!("Definition {} added successfully", name)),
Err(e) => bad_request(binding.log, rejection, e),
}
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
use slog::Logger;
use axum::extract::State;
use axum::http::StatusCode;
use axum::Json;

use std::path::PathBuf;
use std::time::Duration;

use super::{bad_request, ok, WebResult};
use warp::Reply;

use crate::definition::{DefinitionsSupervisor, StartMode};
use crate::definition::StartMode;
use crate::server_handlers::dto::DefinitionDto;

#[derive(Clone)]
pub(super) struct AddDefinitionBinding {
pub(crate) supervisor: DefinitionsSupervisor,
pub(crate) log: Logger,
pub(crate) registry_path: PathBuf,
pub(crate) poll_interval: Duration,
pub(crate) registry_query_timeout: Duration,
}
use super::{bad_request, ok, Server};

pub(super) async fn add_definition(definition: DefinitionDto, binding: AddDefinitionBinding) -> WebResult<impl Reply> {
let log = binding.log.clone();
pub(super) async fn add_definition(
State(binding): State<Server>,
Json(definition): Json<DefinitionDto>,
) -> Result<String, (StatusCode, String)> {
let dname = definition.name.clone();
let rej = format!("Definition {} could not be added", dname);
let new_definition = match definition
Expand All @@ -32,14 +23,14 @@ pub(super) async fn add_definition(definition: DefinitionDto, binding: AddDefini
.await
{
Ok(def) => def,
Err(e) => return bad_request(log, rej, e),
Err(e) => return bad_request(binding.log, rej, e),
};
match binding
.supervisor
.start(vec![new_definition], StartMode::AddToDefinitions)
.await
{
Ok(()) => ok(log, format!("Definition {} added successfully", dname)),
Err(e) => bad_request(log, rej, e.errors.into_iter().next().unwrap()),
Ok(()) => ok(binding.log, format!("Definition {} added successfully", dname)),
Err(e) => bad_request(binding.log, rej, e),
}
}
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
use crate::definition::DefinitionsSupervisor;
use crate::definition::StopDefinitionError;
use slog::Logger;
use warp::Reply;
use axum::extract::{Path, State};
use axum::http::StatusCode;

use super::{forbidden, not_found, ok, WebResult};
use super::{forbidden, not_found, Server};

#[derive(Clone)]
pub(super) struct DeleteDefinitionBinding {
pub(crate) supervisor: DefinitionsSupervisor,
pub(crate) log: Logger,
}

pub(super) async fn delete_definition(name: String, binding: DeleteDefinitionBinding) -> WebResult<impl Reply> {
let rej = format!("Definition {} could not be deleted", name);
pub(super) async fn delete_definition(
Path(name): Path<String>,
State(binding): State<Server>,
) -> Result<String, (StatusCode, String)> {
match binding.supervisor.stop(vec![name.clone()]).await {
Ok(_) => ok(binding.log, format!("Deleted definition {}", name.clone())),
Ok(_) => Ok(format!("Deleted definition {}", name.clone())),
Err(e) => match e.errors.into_iter().next().unwrap() {
StopDefinitionError::DoesNotExist(e) => {
not_found(binding.log, "FUCK".to_string(), StopDefinitionError::DoesNotExist(e))
not_found(binding.log, format!("Definition with name '{}' doesn't exist", name), e)
}
StopDefinitionError::DeletionDisallowed(e) => {
forbidden(binding.log, rej, StopDefinitionError::DeletionDisallowed(e))
forbidden(binding.log, "That definition cannot be deleted".to_string(), e)
}
},
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::definition::RunningDefinition;

use super::WebResult;
use super::{ok, Server};
use axum::extract::State;
use axum::http::StatusCode;
use multiservice_discovery_shared::builders::prometheus_config_structure::{map_target_group, PrometheusStaticConfig};
use multiservice_discovery_shared::contracts::target::{map_to_target_dto, TargetDto};
use service_discovery::job_types::{JobType, NodeOS};
use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::Mutex;
use warp::reply::Reply;
use std::collections::BTreeMap;

pub fn serialize_definitions_to_prometheus_config(definitions: BTreeMap<String, RunningDefinition>) -> (usize, String) {
let mut ic_node_targets: Vec<TargetDto> = vec![];
Expand Down Expand Up @@ -80,20 +80,12 @@ pub fn serialize_definitions_to_prometheus_config(definitions: BTreeMap<String,
)
}

#[derive(Clone)]
pub(super) struct ExportDefinitionConfigBinding {
pub(crate) definitions_ref: Arc<Mutex<BTreeMap<String, RunningDefinition>>>,
}

pub(super) async fn export_prometheus_config(binding: ExportDefinitionConfigBinding) -> WebResult<impl Reply> {
let definitions = binding.definitions_ref.lock().await;
pub(super) async fn export_prometheus_config(State(binding): State<Server>) -> Result<String, (StatusCode, String)> {
let definitions = binding.supervisor.definitions.lock().await;
let (targets_len, text) = serialize_definitions_to_prometheus_config(definitions.clone());
Ok(warp::reply::with_status(
text,
if targets_len > 0 {
warp::http::StatusCode::OK
} else {
warp::http::StatusCode::NOT_FOUND
},
))
if targets_len > 0 {
ok(binding.log, text)
} else {
Err((StatusCode::NOT_FOUND, "No targets found".to_string()))
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
use crate::definition::RunningDefinition;

use super::WebResult;
use axum::{extract::State, http::StatusCode, Json};
use ic_types::{NodeId, PrincipalId};
use multiservice_discovery_shared::contracts::target::{map_to_target_dto, TargetDto};
use service_discovery::job_types::{JobType, NodeOS};
use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::Mutex;
use warp::reply::Reply;
use std::collections::BTreeMap;

#[derive(Clone)]
pub(super) struct ExportTargetsBinding {
pub(crate) definitions_ref: Arc<Mutex<BTreeMap<String, RunningDefinition>>>,
}
use super::Server;

pub(super) async fn export_targets(binding: ExportTargetsBinding) -> WebResult<impl Reply> {
let definitions = binding.definitions_ref.lock().await;
pub(super) async fn export_targets(
State(binding): State<Server>,
) -> Result<Json<Vec<TargetDto>>, (StatusCode, String)> {
let definitions = binding.supervisor.definitions.lock().await;

let mut ic_node_targets: Vec<TargetDto> = vec![];

Expand Down Expand Up @@ -77,12 +72,9 @@ pub(super) async fn export_targets(binding: ExportTargetsBinding) -> WebResult<i

let total_targets = [ic_node_targets, boundary_nodes_targets].concat();

Ok(warp::reply::with_status(
serde_json::to_string_pretty(&total_targets).unwrap(),
if !total_targets.is_empty() {
warp::http::StatusCode::OK
} else {
warp::http::StatusCode::NOT_FOUND
},
))
if !total_targets.is_empty() {
Ok(Json(total_targets))
} else {
Err((StatusCode::NOT_FOUND, "No targets found".to_string()))
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use warp::reply::json;
use warp::Reply;
use axum::extract::State;
use axum::http::StatusCode;
use axum::Json;

use crate::definition::DefinitionsSupervisor;
use crate::server_handlers::dto::DefinitionDto;
use crate::server_handlers::WebResult;

pub(super) async fn get_definitions(supervisor: DefinitionsSupervisor) -> WebResult<impl Reply> {
let definitions = supervisor.definitions.lock().await;
use super::Server;

let list = &definitions
pub(super) async fn get_definitions(
State(supervisor): State<Server>,
) -> Result<Json<Vec<DefinitionDto>>, (StatusCode, String)> {
let definitions = supervisor.supervisor.definitions.lock().await;

let list = definitions
.iter()
.map(|(_, d)| {
let x = &d.definition;
x.into()
})
.collect::<Vec<DefinitionDto>>();
Ok(json(list))
Ok(Json(list))
}
Loading

0 comments on commit 096ed24

Please sign in to comment.