Skip to content

Commit

Permalink
use service instead of closure for handlers
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com>
  • Loading branch information
bnjjj committed Mar 31, 2022
1 parent eeb9ca7 commit 6f22710
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 74 deletions.
67 changes: 59 additions & 8 deletions apollo-router-core/src/plugin.rs
Expand Up @@ -4,22 +4,23 @@ use crate::{
};
use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
use once_cell::sync::Lazy;
use schemars::gen::SchemaGenerator;
use schemars::JsonSchema;
use serde::{de::DeserializeOwned, Deserialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use std::task::{Context, Poll};
use tower::buffer::future::ResponseFuture;
use tower::buffer::Buffer;
use tower::util::BoxService;
use tower::BoxError;
use tower::{BoxError, Service};

type InstanceFactory = fn(&serde_json::Value) -> Result<Box<dyn DynPlugin>, BoxError>;

type SchemaFactory = fn(&mut SchemaGenerator) -> schemars::schema::Schema;

pub type Handler =
Arc<dyn Fn(http_compat::Request<Bytes>) -> http_compat::Response<ResponseBody> + Send + Sync>;

#[derive(Clone)]
pub struct PluginFactory {
instance_factory: InstanceFactory,
Expand Down Expand Up @@ -105,7 +106,7 @@ pub trait Plugin: Send + Sync + 'static + Sized {
service
}

fn custom_endpoint(&self) -> Option<(String, Handler)> {
fn custom_endpoint(&self) -> Option<Handler> {
None
}

Expand Down Expand Up @@ -145,7 +146,7 @@ pub trait DynPlugin: Send + Sync + 'static {
_name: &str,
service: BoxService<SubgraphRequest, SubgraphResponse, BoxError>,
) -> BoxService<SubgraphRequest, SubgraphResponse, BoxError>;
fn custom_endpoint(&self) -> Option<(String, Handler)>;
fn custom_endpoint(&self) -> Option<Handler>;
fn name(&self) -> &'static str;
}

Expand Down Expand Up @@ -192,7 +193,7 @@ where
) -> BoxService<SubgraphRequest, SubgraphResponse, BoxError> {
self.subgraph_service(name, service)
}
fn custom_endpoint(&self) -> Option<(String, Handler)> {
fn custom_endpoint(&self) -> Option<Handler> {
self.custom_endpoint()
}

Expand Down Expand Up @@ -232,3 +233,53 @@ macro_rules! register_plugin {
}
};
}

#[derive(Clone)]
pub struct Handler {
service: Buffer<
BoxService<http_compat::Request<Bytes>, http_compat::Response<ResponseBody>, BoxError>,
http_compat::Request<Bytes>,
>,
}

impl Handler {
pub fn new(
service: BoxService<
http_compat::Request<Bytes>,
http_compat::Response<ResponseBody>,
BoxError,
>,
) -> Self {
Self {
service: Buffer::new(service, 20_000),
}
}
}

impl Service<http_compat::Request<Bytes>> for Handler {
type Response = http_compat::Response<ResponseBody>;
type Error = BoxError;
type Future = ResponseFuture<BoxFuture<'static, Result<Self::Response, Self::Error>>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}

fn call(&mut self, req: http_compat::Request<Bytes>) -> Self::Future {
self.service.call(req)
}
}

impl From<BoxService<http_compat::Request<Bytes>, http_compat::Response<ResponseBody>, BoxError>>
for Handler
{
fn from(
original: BoxService<
http_compat::Request<Bytes>,
http_compat::Response<ResponseBody>,
BoxError,
>,
) -> Self {
Self::new(original)
}
}
6 changes: 3 additions & 3 deletions apollo-router/src/http_server_factory.rs
Expand Up @@ -25,7 +25,7 @@ pub(crate) trait HttpServerFactory {
service: RS,
configuration: Arc<Configuration>,
listener: Option<Listener>,
custom_handlers: HashMap<String, Handler>,
plugin_handlers: HashMap<String, Handler>,
) -> Self::Future
where
RS: Service<Request<graphql::Request>, Response = Response<ResponseBody>, Error = BoxError>
Expand Down Expand Up @@ -87,7 +87,7 @@ impl HttpServerHandle {
factory: &SF,
router: RS,
configuration: Arc<Configuration>,
custom_handlers: HashMap<String, Handler>,
plugin_handlers: HashMap<String, Handler>,
) -> Result<Self, FederatedServerError>
where
SF: HttpServerFactory,
Expand Down Expand Up @@ -128,7 +128,7 @@ impl HttpServerHandle {
router,
Arc::clone(&configuration),
listener,
custom_handlers,
plugin_handlers,
)
.await?;
tracing::debug!("restarted on {}", handle.listen_address());
Expand Down
100 changes: 72 additions & 28 deletions apollo-router/src/plugins/telemetry/metrics.rs
@@ -1,15 +1,17 @@
use apollo_router_core::{
http_compat, Handler, Plugin, ResponseBody, RouterRequest, RouterResponse,
};
use http::StatusCode;
use bytes::Bytes;
use futures::future::BoxFuture;
use http::{Method, StatusCode};
use opentelemetry::{global, metrics::Counter, KeyValue};
use opentelemetry_prometheus::PrometheusExporter;
use prometheus::{Encoder, TextEncoder};
use prometheus::{Encoder, Registry, TextEncoder};
use reqwest::Url;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tower::{util::BoxService, BoxError, ServiceExt};
use std::task::{Context, Poll};
use tower::{service_fn, steer::Steer, util::BoxService, BoxError, Service, ServiceExt};

#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
pub struct MetricsConfiguration {
Expand Down Expand Up @@ -39,21 +41,16 @@ pub struct MetricsPlugin {
impl Plugin for MetricsPlugin {
type Config = MetricsConfiguration;

fn new(mut config: Self::Config) -> Result<Self, BoxError> {
fn new(config: Self::Config) -> Result<Self, BoxError> {
let exporter = opentelemetry_prometheus::exporter().init();
let meter = global::meter("apollo/router");

// TODO to delete when oltp is implemented
#[allow(irrefutable_let_patterns)]
if let MetricsExporter::Prometheus(prom_exporter_cfg) = &mut config.exporter {
prom_exporter_cfg.endpoint = prom_exporter_cfg
.endpoint
.trim_start_matches('/')
.to_string();

if Url::parse(&format!("http://test/{}", prom_exporter_cfg.endpoint)).is_err() {
if let MetricsExporter::Prometheus(prom_exporter_cfg) = &config.exporter {
if Url::parse(&format!("http://test:8080{}", prom_exporter_cfg.endpoint)).is_err() {
return Err(BoxError::from(
"cannot use your endpoint set for prometheus as a path in an URL",
"cannot use your endpoint set for prometheus as a path in an URL, your path need to be absolute (starting with a '/'",
));
}
}
Expand Down Expand Up @@ -85,7 +82,7 @@ impl Plugin for MetricsPlugin {
.boxed()
}

fn custom_endpoint(&self) -> Option<(String, Handler)> {
fn custom_endpoint(&self) -> Option<Handler> {
let prometheus_endpoint = match &self.conf.exporter {
MetricsExporter::Prometheus(prom) => Some(prom.endpoint.clone()),
// MetricsExporter::OLTP(_) => None,
Expand All @@ -95,25 +92,72 @@ impl Plugin for MetricsPlugin {
Some(endpoint) => {
let registry = self.exporter.registry().clone();

let handler = move |_req| {
let encoder = TextEncoder::new();
let metric_families = registry.gather();
let mut result = Vec::new();
encoder.encode(&metric_families, &mut result).unwrap();

http_compat::Response {
let not_found_handler = service_fn(|_req: http_compat::Request<Bytes>| async {
Ok::<_, BoxError>(http_compat::Response {
inner: http::Response::builder()
.status(StatusCode::OK)
.body(ResponseBody::Text(
String::from_utf8_lossy(&result).into_owned(),
))
.status(StatusCode::NOT_FOUND)
.body(ResponseBody::Text(String::new()))
.unwrap(),
}
};
})
})
.boxed();
let metrics_handler = PrometheusService { registry }.boxed();

let svc = Steer::new(
// All services we route between
vec![metrics_handler, not_found_handler],
// How we pick which service to send the request to
move |req: &http_compat::Request<Bytes>, _services: &[_]| {
if req.method() == Method::GET
&& req
.url()
.path()
.trim_start_matches("/plugins/apollo.telemetry")
== endpoint
{
0 // Index of `metrics handler`
} else {
1 // Index of `not_found`
}
},
);

Some((endpoint, Arc::new(handler)))
Some(svc.boxed().into())
}
None => None,
}
}
}

#[derive(Clone)]
pub struct PrometheusService {
registry: Registry,
}

impl Service<http_compat::Request<Bytes>> for PrometheusService {
type Response = http_compat::Response<ResponseBody>;
type Error = BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}

fn call(&mut self, _req: http_compat::Request<Bytes>) -> Self::Future {
let encoder = TextEncoder::new();
let metric_families = self.registry.gather();
let mut result = Vec::new();
encoder.encode(&metric_families, &mut result).unwrap();

Box::pin(async move {
Ok(http_compat::Response {
inner: http::Response::builder()
.status(StatusCode::OK)
.body(ResponseBody::Text(
String::from_utf8_lossy(&result).into_owned(),
))
.map_err(|err| BoxError::from(err.to_string()))?,
})
})
}
}
2 changes: 1 addition & 1 deletion apollo-router/src/plugins/telemetry/mod.rs
Expand Up @@ -367,7 +367,7 @@ impl Plugin for Telemetry {
}
}

fn custom_endpoint(&self) -> Option<(String, Handler)> {
fn custom_endpoint(&self) -> Option<Handler> {
match &self.metrics_plugin {
Some(metrics_plugin) => metrics_plugin.custom_endpoint(),
None => None,
Expand Down
12 changes: 7 additions & 5 deletions apollo-router/src/state_machine.rs
Expand Up @@ -264,7 +264,7 @@ where
Errored(FederatedServerError::ServiceCreationError(err))
})?;

let custom_handlers: HashMap<String, Handler> = self
let plugin_handlers: HashMap<String, Handler> = self
.router_factory
.plugins()
.iter()
Expand All @@ -273,12 +273,13 @@ where
.starts_with("apollo.")
.then(|| plugin.custom_endpoint())
.flatten()
.map(|h| (plugin_name.clone(), h))
})
.collect();

let server_handle = self
.http_server_factory
.create(router.clone(), configuration.clone(), None, custom_handlers)
.create(router.clone(), configuration.clone(), None, plugin_handlers)
.await
.map_err(|err| {
tracing::error!("Cannot start the router: {}", err);
Expand Down Expand Up @@ -320,7 +321,7 @@ where
.await
{
Ok(new_router_service) => {
let custom_handlers: HashMap<String, Handler> = self
let plugin_handlers: HashMap<String, Handler> = self
.router_factory
.plugins()
.iter()
Expand All @@ -329,6 +330,7 @@ where
.starts_with("apollo.")
.then(|| plugin.custom_endpoint())
.flatten()
.map(|handler| (plugin_name.clone(), handler))
})
.collect();

Expand All @@ -337,7 +339,7 @@ where
&self.http_server_factory,
new_router_service.clone(),
new_configuration.clone(),
custom_handlers,
plugin_handlers,
)
.await
.map_err(|err| {
Expand Down Expand Up @@ -723,7 +725,7 @@ mod tests {
_service: RS,
configuration: Arc<Configuration>,
listener: Option<Listener>,
_custom_handlers: HashMap<String, Handler>,
_plugin_handlers: HashMap<String, Handler>,
) -> Pin<Box<dyn Future<Output = Result<HttpServerHandle, FederatedServerError>> + Send>>
where
RS: Service<
Expand Down

0 comments on commit 6f22710

Please sign in to comment.