Skip to content

Commit

Permalink
Parallelize and cache web host meta resolving
Browse files Browse the repository at this point in the history
- Use rayon to parallelize to requests to .well-known/host-meta.json resources
- Use cached to implement caching of these responses
- Use a timeout of 500ms to retrieve web host meta
- Optimize JSON serialization of service struct: serialize version only when
  web host meta is not empty
- Minor code improvements
  • Loading branch information
schrieveslaach committed May 22, 2019
1 parent b29784b commit abccc44
Show file tree
Hide file tree
Showing 9 changed files with 533 additions and 416 deletions.
709 changes: 396 additions & 313 deletions api/Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions api/Cargo.toml
Expand Up @@ -6,6 +6,7 @@ repository = "https://github.com/aixigo/PREvant/"
edition = "2018"

[dependencies]
cached = "0.8"
clap = "2.33"
crossbeam = "0.7"
crossbeam-utils = "0.6"
Expand All @@ -24,11 +25,13 @@ serde_yaml = "0.7"
tokio = "0.1"
tokio-core = "0.1"
toml = "0.4"
rayon = "1.0"
regex = "1.0"
rocket = "0.4"
rocket_contrib = "0.4"
rocket-cache-response = "0.5"
url = "1.7"
yansi = "0.5"

[dependencies.chrono]
version = "0.4"
Expand Down
9 changes: 5 additions & 4 deletions api/src/apps.rs
Expand Up @@ -45,10 +45,11 @@ pub fn apps(
) -> Result<Json<MultiMap<String, Service>>, HttpApiProblem> {
let mut apps = apps_service.get_apps(&request_info)?;

for (_, services) in apps.iter_all_mut() {
for service in services.iter_mut() {
service.set_base_url(request_info.get_base_url());
}
for service in apps
.iter_all_mut()
.flat_map(|(_, services)| services.iter_mut())
{
service.set_base_url(request_info.get_base_url());
}

Ok(Json(apps))
Expand Down
7 changes: 4 additions & 3 deletions api/src/main.rs
Expand Up @@ -26,6 +26,8 @@

#![feature(custom_attribute, proc_macro_hygiene, decl_macro)]

#[macro_use]
extern crate cached;
#[macro_use]
extern crate clap;
#[macro_use]
Expand All @@ -42,6 +44,7 @@ use crate::services::apps_service::AppsService;
use crate::services::config_service::Config;
use crate::services::docker::DockerInfrastructure;
use clap::{App, Arg};
use env_logger::Env;
use rocket::response::NamedFile;
use rocket_cache_response::CacheResponse;
use serde_yaml::{from_reader, to_string, Value};
Expand Down Expand Up @@ -103,9 +106,7 @@ fn main() {
)
.get_matches();

if cfg!(not(debug_assertions)) {
env_logger::init();
}
env_logger::from_env(Env::default().default_filter_or("info")).init();

let config = match Config::load(argument_matches.value_of("config").unwrap_or("config.toml")) {
Ok(config) => config,
Expand Down
38 changes: 26 additions & 12 deletions api/src/models/service.rs
Expand Up @@ -36,6 +36,8 @@ use url::Url;

#[derive(Clone, Debug)]
pub struct Service {
/// An unique identifier of the service, e.g. the container id
id: String,
app_name: String,
service_name: String,
container_type: ContainerType,
Expand Down Expand Up @@ -166,8 +168,14 @@ impl ServiceConfig {
}

impl Service {
pub fn new(app_name: String, service_name: String, container_type: ContainerType) -> Service {
pub fn new(
id: String,
app_name: String,
service_name: String,
container_type: ContainerType,
) -> Service {
Service {
id,
app_name,
service_name,
container_type,
Expand All @@ -177,6 +185,10 @@ impl Service {
}
}

pub fn app_name(&self) -> &String {
&self.app_name
}

pub fn set_app_name(&mut self, app_name: &String) {
self.app_name = app_name.clone();
}
Expand All @@ -196,6 +208,10 @@ impl Service {
})
}

pub fn id(&self) -> &String {
&self.id
}

pub fn service_name(&self) -> &String {
&self.service_name
}
Expand Down Expand Up @@ -254,13 +270,15 @@ impl Serialize for Service {
date_modified: Option<DateTime<Utc>>,
}

let software_version = self.web_host_meta.clone().and_then(|meta| meta.version());
let open_api_url = self.web_host_meta.clone().and_then(|meta| meta.openapi());
let git_commit = self.web_host_meta.clone().and_then(|meta| meta.commit());
let date_modified = self
.web_host_meta
.clone()
.and_then(|meta| meta.date_modified());
let version = match &self.web_host_meta {
Some(meta) if !meta.is_empty() => Some(Version {
git_commit: meta.commit(),
software_version: meta.version(),
date_modified: meta.date_modified(),
}),
_ => None,
};

let s = Service {
name: &self.service_name,
Expand All @@ -269,11 +287,7 @@ impl Serialize for Service {
Some(_) => self.service_url().map(|url| url.to_string()),
},
service_type: self.container_type.to_string(),
version: Some(Version {
git_commit,
software_version,
date_modified,
}),
version,
open_api_url,
};

Expand Down
4 changes: 4 additions & 0 deletions api/src/models/web_host_meta.rs
Expand Up @@ -55,6 +55,10 @@ impl WebHostMeta {
}
}

pub fn is_empty(&self) -> bool {
self.properties.is_none() && self.links.is_none()
}

pub fn version(&self) -> Option<String> {
match &self.properties {
None => None,
Expand Down
115 changes: 61 additions & 54 deletions api/src/services/apps_service.rs
Expand Up @@ -35,53 +35,33 @@ use crate::services::{
images_service::{ImagesService, ImagesServiceError},
infrastructure::Infrastructure,
};
use cached::SizedCache;
use handlebars::TemplateRenderError;
use http_api_problem::{HttpApiProblem, StatusCode};
use multimap::MultiMap;
use rayon::prelude::*;
use std::collections::HashSet;
use std::convert::From;
use std::sync::Mutex;
use url::Url;
use std::time::Duration;
use yansi::Paint;

pub struct AppsService {
config: Config,
infrastructure: Box<dyn Infrastructure>,
apps_in_deployment: Mutex<HashSet<String>>,
}
cached_key_result! {
WEB_HOST_META: SizedCache<String, WebHostMeta> = SizedCache::with_size(500);

struct DeploymentGuard<'a, 'b> {
apps_service: &'a AppsService,
app_name: &'b String,
}

impl<'a, 'b> Drop for DeploymentGuard<'a, 'b> {
fn drop(&mut self) {
let mut apps_in_deployment = self.apps_service.apps_in_deployment.lock().unwrap();
apps_in_deployment.remove(self.app_name);
}
}

impl AppsService {
pub fn new(
config: Config,
infrastructure: Box<dyn Infrastructure>,
) -> Result<AppsService, AppsServiceError> {
Ok(AppsService {
config,
infrastructure,
apps_in_deployment: Mutex::new(HashSet::new()),
})
}
Key = { format!("{}", service.id()) };

fn resolve_web_host_meta(
app_name: &String,
service_name: &String,
endpoint_url: &Url,
request_info: &RequestInfo,
) -> Option<WebHostMeta> {
let url = endpoint_url.join(".well-known/host-meta.json").unwrap();
service: &Service,
request_info: &RequestInfo
) -> Result<WebHostMeta, ()> = {
let url = match service.endpoint_url() {
None => return Ok(WebHostMeta::empty()),
Some(endpoint_url) => endpoint_url.join(".well-known/host-meta.json").unwrap()
};

let get_request = reqwest::Client::builder()
.timeout(Duration::from_millis(500))
.build()
.unwrap()
.get(url)
Expand All @@ -95,26 +75,57 @@ impl AppsService {
)
.header(
"X-Forwarded-Prefix",
format!("/{}/{}", app_name, service_name),
format!("/{}/{}", service.app_name(), service.service_name()),
)
.header("Accept", "application/json")
.header("User-Agent", format!("PREvant/{}", crate_version!()))
.send();

match get_request {
Err(err) => {
debug!("Cannot acquire host meta: {}", err);
None
debug!("Cannot acquire host meta for service {} of {}: {}", Paint::magenta(service.service_name()), Paint::magenta(service.app_name()), err);
Err(())
}
Ok(mut response) => match response.json::<WebHostMeta>() {
Err(err) => {
error!("Cannot parse host meta: {}", err);
Some(WebHostMeta::empty())
error!("Cannot parse host meta for service {} of {}: {}", Paint::magenta(service.service_name()), Paint::magenta(service.app_name()), err);
Ok(WebHostMeta::empty())
}
Ok(meta) => Some(meta),
Ok(meta) => Ok(meta),
},
}
}
}

pub struct AppsService {
config: Config,
infrastructure: Box<dyn Infrastructure>,
apps_in_deployment: Mutex<HashSet<String>>,
}

struct DeploymentGuard<'a, 'b> {
apps_service: &'a AppsService,
app_name: &'b String,
}

impl<'a, 'b> Drop for DeploymentGuard<'a, 'b> {
fn drop(&mut self) {
let mut apps_in_deployment = self.apps_service.apps_in_deployment.lock().unwrap();
apps_in_deployment.remove(self.app_name);
}
}

impl AppsService {
pub fn new(
config: Config,
infrastructure: Box<dyn Infrastructure>,
) -> Result<AppsService, AppsServiceError> {
Ok(AppsService {
config,
infrastructure,
apps_in_deployment: Mutex::new(HashSet::new()),
})
}

/// Analyzes running containers and returns a map of `app-name` with the
/// corresponding list of `Service`s.
Expand All @@ -124,19 +135,14 @@ impl AppsService {
) -> Result<MultiMap<String, Service>, AppsServiceError> {
let mut services = self.infrastructure.get_services()?;

for (app_name, services) in services.iter_all_mut() {
for service in services {
service.set_web_host_meta(match service.endpoint_url() {
None => None,
Some(endpoint_url) => AppsService::resolve_web_host_meta(
app_name,
service.service_name(),
&endpoint_url,
request_info,
),
});
}
}
let mut all_services: Vec<&mut Service> = services
.iter_all_mut()
.flat_map(|(_, services)| services.iter_mut())
.collect();

all_services.par_iter_mut().for_each(|service| {
service.set_web_host_meta(resolve_web_host_meta(&service, request_info).ok());
});

Ok(services)
}
Expand Down Expand Up @@ -390,6 +396,7 @@ mod tests {
use crate::services::dummy_infrastructure::DummyInfrastructure;
use sha2::{Digest, Sha256};
use std::str::FromStr;
use url::Url;

macro_rules! service_configs {
( $( $x:expr ),* ) => {
Expand Down

0 comments on commit abccc44

Please sign in to comment.