Skip to content

Commit

Permalink
fix: Removed /edge/metrics and logic for posting to /edge/metrics (#446)
Browse files Browse the repository at this point in the history
* Added notes about 5.9.0
  • Loading branch information
Christopher Kolstad committed Mar 27, 2024
1 parent b581dee commit 9590344
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 323 deletions.
193 changes: 140 additions & 53 deletions README.md

Large diffs are not rendered by default.

35 changes: 22 additions & 13 deletions server/src/auth/token_validator.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::sync::Arc;

use dashmap::DashMap;
use tracing::{instrument, trace};
use unleash_types::Upsert;

use crate::http::feature_refresher::FeatureRefresher;
use crate::http::unleash_client::UnleashClient;
use crate::persistence::EdgePersistence;
use crate::types::{
EdgeResult, EdgeToken, TokenType, TokenValidationStatus, ValidateTokensRequest,
};
use std::sync::Arc;

use dashmap::DashMap;
use unleash_types::Upsert;

#[derive(Clone)]
pub struct TokenValidator {
Expand Down Expand Up @@ -72,13 +74,15 @@ impl TokenValidator {
.iter()
.find(|v| maybe_valid.token == v.token)
{
trace!("Validated token");
EdgeToken {
status: crate::types::TokenValidationStatus::Validated,
status: TokenValidationStatus::Validated,
..validated_token.clone()
}
} else {
trace!("Invalid token");
EdgeToken {
status: crate::types::TokenValidationStatus::Invalid,
status: TokenValidationStatus::Invalid,
token_type: Some(TokenType::Invalid),
..maybe_valid
}
Expand All @@ -96,6 +100,7 @@ impl TokenValidator {
}
}

#[instrument(skip(self))]
pub async fn schedule_validation_of_known_tokens(&self, validation_interval_seconds: u64) {
let sleep_duration = tokio::time::Duration::from_secs(validation_interval_seconds);
loop {
Expand All @@ -107,6 +112,7 @@ impl TokenValidator {
}
}

#[instrument(skip(self, tokens, refresher))]
pub async fn schedule_revalidation_of_startup_tokens(
&self,
tokens: Vec<String>,
Expand Down Expand Up @@ -161,18 +167,21 @@ impl TokenValidator {

#[cfg(test)]
mod tests {
use super::TokenValidator;
use crate::{
http::unleash_client::UnleashClient,
types::{EdgeToken, TokenType, TokenValidationStatus},
};
use std::sync::Arc;

use actix_http::HttpService;
use actix_http_test::{test_server, TestServer};
use actix_service::map_config;
use actix_web::{dev::AppConfig, web, App, HttpResponse};
use actix_web::{App, dev::AppConfig, HttpResponse, web};
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

use crate::{
http::unleash_client::UnleashClient,
types::{EdgeToken, TokenType, TokenValidationStatus},
};

use super::TokenValidator;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EdgeTokens {
Expand Down
114 changes: 18 additions & 96 deletions server/src/edge_api.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
use crate::types::{
EdgeJsonResult, EdgeToken, TokenStrings, TokenValidationStatus, ValidatedTokens,
};
use crate::{
auth::token_validator::TokenValidator,
metrics::client_metrics::MetricsCache,
types::{BatchMetricsRequestBody, EdgeResult},
};
use actix_web::{
HttpRequest,
post,
web::{self, Data, Json},
HttpRequest, HttpResponse,
};
use dashmap::DashMap;
use utoipa;

use crate::auth::token_validator::TokenValidator;
use crate::types::{
EdgeJsonResult, EdgeToken, TokenStrings, TokenValidationStatus, ValidatedTokens,
};

#[utoipa::path(
path = "/edge/validate",
responses(
Expand Down Expand Up @@ -53,103 +50,32 @@ pub async fn validate(
}
}

#[utoipa::path(
path = "/edge/metrics",
responses(
(status = 202, description = "Accepted the posted metrics")
),
request_body = BatchMetricsRequestBody,
security(
("Authorization" = [])
)
)]
#[post("/metrics")]
pub async fn metrics(
batch_metrics_request: web::Json<BatchMetricsRequestBody>,
metrics_cache: web::Data<MetricsCache>,
) -> EdgeResult<HttpResponse> {
metrics_cache.sink_metrics(&batch_metrics_request.metrics);
Ok(HttpResponse::Accepted().finish())
}

pub fn configure_edge_api(cfg: &mut web::ServiceConfig) {
cfg.service(validate).service(metrics);
cfg.service(validate);
}

#[cfg(test)]
mod tests {
use crate::auth::token_validator::TokenValidator;
use crate::edge_api::{metrics, validate};
use crate::metrics::client_metrics::MetricsCache;
use crate::types::{
BatchMetricsRequestBody, EdgeToken, TokenStrings, TokenType, TokenValidationStatus,
ValidatedTokens,
};
use std::sync::Arc;

use actix_web::{App, test, web};
use actix_web::http::header::ContentType;
use actix_web::web::Json;
use actix_web::{test, web, App};
use chrono::Utc;
use dashmap::DashMap;
use reqwest::StatusCode;
use std::sync::Arc;
use unleash_types::client_metrics::{ClientApplication, ClientMetricsEnv};

#[tokio::test]
pub async fn posting_bulk_metrics_gets_cached_properly() {
let metrics_cache = Arc::new(MetricsCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let app = test::init_service(
App::new()
.app_data(web::Data::from(metrics_cache.clone()))
.app_data(web::Data::from(token_cache.clone()))
.service(web::scope("/edge").service(metrics).service(validate)),
)
.await;

let request_body = BatchMetricsRequestBody {
applications: vec![
ClientApplication::new("app_one", 10),
ClientApplication::new("app_two", 20),
],
metrics: vec![
ClientMetricsEnv {
feature_name: "test_feature_one".to_string(),
app_name: "test_application".to_string(),
environment: "development".to_string(),
timestamp: Utc::now(),
yes: 100,
no: 50,
variants: Default::default(),
},
ClientMetricsEnv {
feature_name: "test_feature_two".to_string(),
app_name: "test_application".to_string(),
environment: "production".to_string(),
timestamp: Utc::now(),
yes: 1000,
no: 800,
variants: Default::default(),
},
],
};
let req = test::TestRequest::post()
.uri("/edge/metrics")
.insert_header(ContentType::json())
.set_json(Json(request_body))
.to_request();
let res = test::call_service(&app, req).await;
assert_eq!(res.status(), StatusCode::ACCEPTED);
}
use crate::auth::token_validator::TokenValidator;
use crate::edge_api::validate;
use crate::types::{
EdgeToken, TokenStrings, TokenType, TokenValidationStatus, ValidatedTokens,
};

#[tokio::test]
pub async fn validating_incorrect_tokens_returns_empty_list() {
let metrics_cache = Arc::new(MetricsCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let app = test::init_service(
App::new()
.app_data(web::Data::from(metrics_cache.clone()))
.app_data(web::Data::from(token_cache.clone()))
.service(web::scope("/edge").service(metrics).service(validate)),
.service(web::scope("/edge").service(validate)),
)
.await;
let mut valid_token =
Expand All @@ -171,13 +97,11 @@ mod tests {

#[tokio::test]
pub async fn validating_a_mix_of_tokens_only_returns_valid_tokens() {
let metrics_cache = Arc::new(MetricsCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let app = test::init_service(
App::new()
.app_data(web::Data::from(metrics_cache.clone()))
.app_data(web::Data::from(token_cache.clone()))
.service(web::scope("/edge").service(metrics).service(validate)),
.service(web::scope("/edge").service(validate)),
)
.await;
let mut valid_token =
Expand All @@ -204,7 +128,6 @@ mod tests {

#[tokio::test]
pub async fn adding_a_token_validator_filters_so_only_validated_tokens_are_returned() {
let metrics_cache = Arc::new(MetricsCache::default());
let token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let token_validator = TokenValidator {
unleash_client: Arc::new(Default::default()),
Expand All @@ -213,10 +136,9 @@ mod tests {
};
let app = test::init_service(
App::new()
.app_data(web::Data::from(metrics_cache.clone()))
.app_data(web::Data::from(token_cache.clone()))
.app_data(web::Data::new(token_validator))
.service(web::scope("/edge").service(metrics).service(validate)),
.service(web::scope("/edge").service(validate)),
)
.await;
let mut valid_token =
Expand Down
61 changes: 14 additions & 47 deletions server/src/http/background_send_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use actix_web::http::StatusCode;
use std::cmp::max;
use tracing::{debug, error, info, trace, warn};
use std::sync::Arc;

use super::feature_refresher::FeatureRefresher;
use actix_web::http::StatusCode;
use chrono::Duration;
use dashmap::DashMap;
use lazy_static::lazy_static;
use prometheus::{IntGauge, IntGaugeVec, Opts, register_int_gauge, register_int_gauge_vec};
use tracing::{error, info, trace, warn};

use crate::types::TokenRefresh;
use crate::{
error::EdgeError,
metrics::client_metrics::{size_of_batch, MetricsCache},
metrics::client_metrics::{MetricsCache, size_of_batch},
};
use chrono::Duration;
use dashmap::DashMap;
use lazy_static::lazy_static;
use prometheus::{register_int_gauge, register_int_gauge_vec, IntGauge, IntGaugeVec, Opts};
use crate::types::TokenRefresh;

use std::sync::Arc;
use super::feature_refresher::FeatureRefresher;

lazy_static! {
pub static ref METRICS_UPSTREAM_HTTP_ERRORS: IntGaugeVec = register_int_gauge_vec!(
Expand Down Expand Up @@ -58,19 +58,10 @@ fn decide_where_to_post(
.iter()
.find(|t| t.token.environment == Some(environment.to_string()))
{
if token_refresh.use_client_bulk_endpoint {
debug!("Sending metrics to client bulk endpoint");
METRICS_UPSTREAM_CLIENT_BULK
.with_label_values(&[environment])
.inc();
(true, token_refresh.token.token.clone())
} else {
info!("Your upstream is outdated. Please upgrade to at least Unleash version 5.9.0 (when ready) or Edge Version 17.0.0 (this one)");
METRICS_UPSTREAM_OUTDATED
.with_label_values(&[environment])
.inc();
(false, "".into())
}
METRICS_UPSTREAM_CLIENT_BULK
.with_label_values(&[environment])
.inc();
(true, token_refresh.token.token.clone())
} else {
(false, "".into())
}
Expand Down Expand Up @@ -181,34 +172,10 @@ fn new_interval(send_interval: i64, failures: i64) -> Duration {
#[cfg(test)]
mod tests {
use crate::http::background_send_metrics::new_interval;
use crate::types::{EdgeToken, TokenRefresh};
use dashmap::DashMap;
use std::sync::Arc;

#[tokio::test]
pub async fn new_interval_does_not_overflow() {
let metrics = new_interval(300, 10);
assert!(metrics.num_seconds() < 3305);
}

#[tokio::test]
pub async fn decides_correctly_whether_to_post_to_client_bulk_or_edge_bulk() {
let refreshing_tokens = Arc::new(DashMap::default());
let old_token = EdgeToken::validated_client_token("*:development.somesecret");
let mut old_endpoint_refresh = TokenRefresh::new(old_token.clone(), None);
old_endpoint_refresh.use_client_bulk_endpoint = false;
let new_token = EdgeToken::validated_client_token("*:production.someothersecret");
let mut new_endpoint_refresh = TokenRefresh::new(new_token.clone(), None);
new_endpoint_refresh.use_client_bulk_endpoint = true;
refreshing_tokens.insert(old_token.token.clone(), old_endpoint_refresh);
refreshing_tokens.insert(new_token.token.clone(), new_endpoint_refresh);
let (use_new_endpoint, token) =
super::decide_where_to_post(&"development".to_string(), refreshing_tokens.clone());
assert!(!use_new_endpoint);
assert_eq!(&token, "");
let (use_new_endpoint, other_token) =
super::decide_where_to_post(&"production".to_string(), refreshing_tokens.clone());
assert!(use_new_endpoint);
assert_eq!(other_token, new_token.token);
}
}
Loading

0 comments on commit 9590344

Please sign in to comment.