Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reporting followups #1020

Merged
merged 22 commits into from May 13, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
227 changes: 128 additions & 99 deletions apollo-router/src/plugins/telemetry/metrics/apollo.rs
@@ -1,12 +1,11 @@
//! Apollo metrics
// This entire file is license key functionality
//! Apollo metrics
use crate::plugins::telemetry::apollo::Config;
use crate::plugins::telemetry::config::MetricsCommon;
use crate::plugins::telemetry::metrics::{MetricsBuilder, MetricsConfigurator};
use crate::stream::StreamExt;
use apollo_spaceport::{
ContextualizedStats, ReferencedFieldsForType, Reporter, ReporterError, ReporterGraph,
StatsContext,
ReferencedFieldsForType, Report, ReportHeader, Reporter, ReporterError, StatsContext,
};
use async_trait::async_trait;
use deadpool::{managed, Runtime};
Expand All @@ -15,7 +14,7 @@ use futures_batch::ChunksTimeoutStreamExt;
use serde::Serialize;
use std::collections::HashMap;
use std::ops::AddAssign;
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tower::BoxError;
use url::Url;

Expand Down Expand Up @@ -50,17 +49,31 @@ impl Default for Sender {
}
}

#[derive(Clone, Hash, Eq, PartialEq, Debug, Serialize)]
pub enum MetricsKey {
Excluded,
Regular {
client_name: String,
client_version: String,
stats_report_key: String,
},
}
impl Default for MetricsKey {
fn default() -> Self {
MetricsKey::Excluded
}
}

#[derive(Default, Debug, Serialize)]
pub(crate) struct Metrics {
pub(crate) client_name: String,
pub(crate) client_version: String,
pub(crate) stats_report_key: String,
pub(crate) key: MetricsKey,
pub(crate) query_latency_stats: QueryLatencyStats,
BrynCooke marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) per_type_stat: HashMap<String, TypeStat>,
pub(crate) referenced_fields_by_type: HashMap<String, ReferencedFieldsForType>,
pub(crate) operation_count: u64,
}

// TODO Make some of these fields bool
#[derive(Default, Debug, Serialize)]
pub(crate) struct QueryLatencyStats {
pub(crate) latency_count: Duration,
Expand Down Expand Up @@ -100,8 +113,9 @@ pub(crate) struct FieldStat {
pub(crate) latency_count: Duration,
}

#[derive(Default)]
#[derive(Default, Serialize)]
struct AggregatedMetrics {
key: MetricsKey,
query_latency_stats: AggregatedQueryLatencyStats,
per_type_stat: HashMap<String, AggregatedTypeStat>,
referenced_fields_by_type: HashMap<String, ReferencedFieldsForType>,
Expand All @@ -114,17 +128,16 @@ impl AddAssign<Metrics> for AggregatedMetrics {
for (k, v) in metrics.per_type_stat {
*self.per_type_stat.entry(k).or_default() += v;
}
for (k, v) in metrics.referenced_fields_by_type {
// Merging is not required because metrics are always groupd by schema and query.
// The tuple (client_name, client_version, stats_report_key, referenced_fields_by_type) is always unique.
self.referenced_fields_by_type.entry(k).or_insert(v);
}

// Merging is not required because metrics are always grouped by schema and query.
// The tuple (client_name, client_version, stats_report_key, referenced_fields_by_type) is always unique.
BrynCooke marked this conversation as resolved.
Show resolved Hide resolved
// therefore we can just take ownership of the referenced_fields_by_type map.
self.referenced_fields_by_type = metrics.referenced_fields_by_type;
self.operation_count += metrics.operation_count;
}
}

#[derive(Default)]
#[derive(Default, Serialize)]
struct AggregatedQueryLatencyStats {
latency_count: DurationHistogram,
request_count: u64,
Expand Down Expand Up @@ -163,7 +176,7 @@ impl AddAssign<QueryLatencyStats> for AggregatedQueryLatencyStats {
}
}

#[derive(Default)]
#[derive(Default, Serialize)]
struct AggregatedPathErrorStats {
children: HashMap<String, AggregatedPathErrorStats>,
errors_count: u64,
Expand All @@ -180,7 +193,7 @@ impl AddAssign<PathErrorStats> for AggregatedPathErrorStats {
}
}

#[derive(Default)]
#[derive(Default, Serialize)]
struct AggregatedTypeStat {
per_field_stat: HashMap<String, AggregatedFieldStat>,
}
Expand All @@ -193,7 +206,7 @@ impl AddAssign<TypeStat> for AggregatedTypeStat {
}
}

#[derive(Default)]
#[derive(Default, Serialize)]
struct AggregatedFieldStat {
return_type: String,
errors_count: u64,
Expand Down Expand Up @@ -229,7 +242,7 @@ impl MetricsConfigurator for Config {
apollo_graph_ref: Some(reference),
..
} => {
let exporter = ApolloMetricsExporter::new(endpoint, key, reference)?;
let exporter = ApolloMetricsExporter::new(endpoint, key, reference, "TODO")?;

builder
.with_apollo_metrics_collector(exporter.provider())
Expand All @@ -247,20 +260,27 @@ struct ApolloMetricsExporter {
impl ApolloMetricsExporter {
fn new(
endpoint: &Url,
key: &String,
reference: &String,
apollo_key: &str,
apollo_graph_ref: &str,
executable_schema_id: &str,
) -> Result<ApolloMetricsExporter, BoxError> {
let apollo_key = apollo_key.to_string();
// Desired behavior:
// * Metrics are batched with a timeout.
// * If we cannot connect to spaceport metrics are discarded and a warning raised.
// * When the stream of metrics finishes we terminate the thread.
// * If the exporter is dropped the remaining records are flushed.

let (tx, rx) = mpsc::channel::<Metrics>(DEFAULT_QUEUE_SIZE);

let reporter_graph = ReporterGraph {
key: key.to_owned(),
reference: reference.to_owned(),
// TODO fill out this stuff
BrynCooke marked this conversation as resolved.
Show resolved Hide resolved
let header = apollo_spaceport::ReportHeader {
graph_ref: apollo_graph_ref.to_string(),
hostname: "".to_string(),
agent_version: "".to_string(),
BrynCooke marked this conversation as resolved.
Show resolved Hide resolved
service_version: "".to_string(),
runtime_version: "".to_string(),
uname: "".to_string(),
executable_schema_id: executable_schema_id.to_string(),
};

// Deadpool gives us connection pooling to spaceport
Expand All @@ -281,32 +301,23 @@ impl ApolloMetricsExporter {
// But in the interested of getting something over the line quickly let's go with this as it is simple to understand.
rx.chunks_timeout(DEFAULT_BATCH_SIZE, Duration::from_secs(10))
.for_each(|stats| async {
let stats = aggregate(stats);
let aggregated_metrics = aggregate(stats);

match pool.get().await {
Ok(mut reporter) => {
for (key, contextualized_stats, field_usage, operation_count) in
stats.into_iter()
let report = to_report(header.clone(), aggregated_metrics);
match reporter
.submit(apollo_spaceport::ReporterRequest {
apollo_key: apollo_key.clone(),
report: Some(report),
})
.await
{
match reporter
.submit_stats(
reporter_graph.clone(),
key,
contextualized_stats,
field_usage,
operation_count,
)
.await
{
Ok(_) => {}
Err(e) => {
tracing::warn!(
"failed to submit stats to spaceport: {}",
e
);
}
};
}
Ok(_) => {}
Err(e) => {
tracing::warn!("failed to submit stats to spaceport: {}", e);
}
};
}
Err(err) => {
tracing::warn!(
Expand All @@ -326,55 +337,66 @@ impl ApolloMetricsExporter {
}
}

fn aggregate(
metrics: Vec<Metrics>,
) -> Vec<(
String,
ContextualizedStats,
HashMap<String, ReferencedFieldsForType>,
u64,
)> {
fn aggregate(metrics: Vec<Metrics>) -> HashMap<MetricsKey, AggregatedMetrics> {
let mut aggregated_metrics = HashMap::<_, AggregatedMetrics>::new();

for metric in metrics {
*aggregated_metrics
.entry(
(
metric.client_name.clone(),
metric.client_version.clone(),
metric.stats_report_key.clone(),
)
.clone(),
)
.or_default() += metric;
*aggregated_metrics.entry(metric.key.clone()).or_default() += metric;
}

aggregated_metrics
.into_iter()
.map(|((client_name, client_version, key), aggregated_metric)| {
let mut contextualized_stats = ContextualizedStats {
context: Some(StatsContext {
client_name,
client_version,
}),
..Default::default()
};
contextualized_stats.per_type_stat = aggregated_metric
.per_type_stat
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect();
contextualized_stats.query_latency_stats =
Some(aggregated_metric.query_latency_stats.into());

(
key,
contextualized_stats,
aggregated_metric.referenced_fields_by_type,
aggregated_metric.operation_count,
)
})
.collect()
}

fn to_report(
header: ReportHeader,
aggregated_metrics: HashMap<MetricsKey, AggregatedMetrics>,
) -> apollo_spaceport::Report {
let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
let seconds = time.as_secs();
let nanos = time.as_nanos() - (seconds as u128 * 1_000_000_000);
BrynCooke marked this conversation as resolved.
Show resolved Hide resolved
let end_time = apollo_spaceport::Timestamp {
seconds: seconds as i64,
nanos: nanos as i32,
};

let mut report = Report {
header: Some(header),
traces_per_query: Default::default(),
end_time: Some(end_time),
operation_count: 0,
};

for (key, metrics) in aggregated_metrics {
report.operation_count += metrics.operation_count;
if let MetricsKey::Regular {
client_name,
client_version,
stats_report_key,
} = key
{
report.traces_per_query.insert(
stats_report_key,
BrynCooke marked this conversation as resolved.
Show resolved Hide resolved
apollo_spaceport::TracesAndStats {
trace: vec![],
BrynCooke marked this conversation as resolved.
Show resolved Hide resolved
stats_with_context: vec![apollo_spaceport::ContextualizedStats {
context: Some(StatsContext {
client_name,
client_version,
}),
query_latency_stats: Some(metrics.query_latency_stats.into()),
per_type_stat: metrics
.per_type_stat
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect(),
}],
referenced_fields_by_type: metrics.referenced_fields_by_type,
internal_traces_contributing_to_stats: vec![],
},
);
}
}
report
}

impl From<AggregatedQueryLatencyStats> for apollo_spaceport::QueryLatencyStats {
Expand Down Expand Up @@ -455,6 +477,7 @@ impl managed::Manager for ReporterManager {
}
}

#[derive(Serialize)]
struct DurationHistogram {
buckets: Vec<i64>,
entries: u64,
Expand Down Expand Up @@ -517,15 +540,19 @@ impl DurationHistogram {

#[cfg(test)]
mod test {
use super::super::super::config;
use super::*;
use crate::plugins::telemetry::{apollo, Telemetry, STUDIO_EXCLUDE};
use std::future::Future;

use http::header::HeaderName;

use apollo_router_core::utils::test::IntoSchema::Canned;
BrynCooke marked this conversation as resolved.
Show resolved Hide resolved
use apollo_router_core::utils::test::PluginTestHarness;
use apollo_router_core::RouterRequest;
use apollo_router_core::{Context, Plugin};
use http::header::HeaderName;
use std::future::Future;

use crate::plugins::telemetry::{apollo, Telemetry, STUDIO_EXCLUDE};

use super::super::super::config;
use super::*;

// DurationHistogram Tests
impl DurationHistogram {
Expand Down Expand Up @@ -679,9 +706,11 @@ mod test {
let mut count = Count::default();

Metrics {
client_name: client_name.into(),
client_version: client_version.into(),
stats_report_key: stats_report_key.into(),
key: MetricsKey::Regular {
client_name: client_name.into(),
client_version: client_version.into(),
stats_report_key: stats_report_key.into(),
},
query_latency_stats: QueryLatencyStats {
latency_count: Duration::from_secs(1),
request_count: count.inc_u64(),
Expand Down