Skip to content

Commit

Permalink
Index endpoints bases on their canonical names and method (closes #462)
Browse files Browse the repository at this point in the history
  • Loading branch information
vmalloc committed Dec 5, 2018
1 parent 8a0503e commit 0ba9c9b
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 23 deletions.
2 changes: 2 additions & 0 deletions api-server/src/main.rs
Expand Up @@ -56,6 +56,8 @@ fn main() {
.next()
.unwrap();

info!("Proxying to {:?}...", forwarded_addr);

let system = System::new("system");

let stats_collector = StatsCollector::init().start();
Expand Down
9 changes: 7 additions & 2 deletions api-server/src/proxy.rs
Expand Up @@ -18,7 +18,6 @@ const PROXY_VERSION: &'static str = env!("CARGO_PKG_VERSION");

pub fn forward(req: &HttpRequest<AppState>) -> Box<Future<Item = HttpResponse, Error = Error>> {
let state = req.state();
let path = req.uri().path().to_string();
let peer = req
.headers()
.get("X-Real-IP")
Expand Down Expand Up @@ -57,10 +56,16 @@ pub fn forward(req: &HttpRequest<AppState>) -> Box<Future<Item = HttpResponse, E
.send()
.map_err(Error::from)
.then(move |res| {
let endpoint = res
.as_ref()
.ok()
.and_then(|resp| resp.headers().get("x-api-endpoint"))
.and_then(|header| header.to_str().ok())
.map(String::from);
stats_collector
.try_send(RequestEnded {
timing: RequestTimes::from_headers(res.as_ref().ok()),
path,
endpoint,
is_success: res
.as_ref()
.map(|resp| resp.status())
Expand Down
45 changes: 24 additions & 21 deletions api-server/src/stats.rs
Expand Up @@ -15,6 +15,7 @@ use std::time::Duration;
const HISTOGRAM_RESOLUTION_SECONDS: usize = 60;
const HISTOGRAM_NUM_BINS: usize = 10;

// TODO: hashing requests should use Arc's, need to measure if performance justifies it
pub struct StatsCollector {
total_times: HashMap<String, DurationAggregator>,
active_times: HashMap<String, DurationAggregator>,
Expand Down Expand Up @@ -45,14 +46,14 @@ pub(crate) trait RequestTimesMap {
}

impl RequestTimesMap for HashMap<String, DurationAggregator> {
fn ingest(&mut self, path: &str, timing: Duration) {
if let Some(durations) = self.get_mut(path) {
fn ingest(&mut self, endpoint: &str, timing: Duration) {
if let Some(durations) = self.get_mut(endpoint) {
durations.ingest(timing);
return;
}
let mut durations = DurationAggregator::init();
durations.ingest(timing);
self.insert(path.to_string(), durations);
self.insert(endpoint.to_string(), durations);
}

fn as_endpoint_stats(&self) -> HashMap<String, EndpointStats> {
Expand Down Expand Up @@ -105,7 +106,7 @@ impl Handler<RequestStarted> for StatsCollector {
}

pub struct RequestEnded {
pub(crate) path: String,
pub(crate) endpoint: Option<String>,
pub(crate) peer: Option<IpAddr>,
pub(crate) timing: Option<RequestTimes>,
pub(crate) is_success: bool,
Expand Down Expand Up @@ -147,25 +148,27 @@ impl Handler<RequestEnded> for StatsCollector {
fn handle(&mut self, msg: RequestEnded, _ctx: &mut Context<Self>) {
self.num_pending_requests -= 1;

if msg.is_success {
if msg.path == "/api/report_test_start" {
self.num_tests += 1;
} else if msg.path == "/api/report_session_start" {
self.num_sessions += 1;
if let Some(endpoint) = msg.endpoint {
if msg.is_success {
if endpoint == "api.report_test_start" {
self.num_tests += 1;
} else if endpoint == "api.report_session_start" {
self.num_sessions += 1;
}
}
}

if let Some(timing) = msg.timing {
self.total_times.ingest(&msg.path, timing.total);
self.active_times.ingest(&msg.path, timing.active);
self.db_times.ingest(&msg.path, timing.db);

if let Some(addr) = msg.peer {
let hist = self
.clients
.entry(addr)
.or_insert_with(|| CountHistorgram::init(HISTOGRAM_NUM_BINS));
hist.inc();
if let Some(timing) = msg.timing {
self.total_times.ingest(&endpoint, timing.total);
self.active_times.ingest(&endpoint, timing.active);
self.db_times.ingest(&endpoint, timing.db);

if let Some(addr) = msg.peer {
let hist = self
.clients
.entry(addr)
.or_insert_with(|| CountHistorgram::init(HISTOGRAM_NUM_BINS));
hist.inc();
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions flask_app/utils/profiling.py
Expand Up @@ -7,6 +7,7 @@
_TOTAL_HEADER_NAME = "X-Timing-Total"
_ACTIVE_HEADER_NAME = "X-Timing-Active"
_DB_HEADER_NAME = "X-Timing-DB"
_API_ENDPOINT_HEADER_NAME = "X-API-Endpoint"


import time
Expand Down Expand Up @@ -46,6 +47,7 @@ def profile_request_end(response):
profile_data[_TOTAL_HEADER_NAME] += time.perf_counter()
profile_data[_ACTIVE_HEADER_NAME] += time.process_time()
profile_data[_DB_HEADER_NAME] += 0
profile_data[_API_ENDPOINT_HEADER_NAME] = f"{request.endpoint}:{request.method}"

response.headers.extend(profile_data)
return response

0 comments on commit 0ba9c9b

Please sign in to comment.