diff --git a/src/lib.rs b/src/lib.rs index 5bcd492..64f371e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,9 +2,9 @@ mod admin; pub mod config; mod gateway; mod proxy; -mod utils; +pub mod utils; -use std::sync::Arc; +use std::{fmt::Debug, sync::Arc}; use anyhow::{Context, Result, anyhow}; use axum::Router; @@ -12,6 +12,8 @@ use clap::Parser; use log::{error, info}; use tokio::{select, sync::oneshot}; +use crate::utils::observability::init_observability; + /// Git hash of the aisix core at build time. pub const GIT_HASH: &str = env!("VERGEN_GIT_SHA"); @@ -119,88 +121,6 @@ pub async fn run_with_provider( res } -/// Initialize observability (logging, tracing, metrics). -/// -/// Returns `(shutdown_sender, shutdown_task_handle)`. -/// Call `shutdown_sender.send(())` to flush and shut down observability. -pub fn init_observability() -> Result<(oneshot::Sender<()>, tokio::task::JoinHandle<()>)> { - use std::{borrow::Cow, time::Duration}; - - use fastrace::collector::Config; - use fastrace_opentelemetry::OpenTelemetryReporter; - use logforth::{ - append::{FastraceEvent, Stdout}, - filter::env_filter::EnvFilterBuilder, - layout::TextLayout, - }; - use metrics_exporter_otel::OpenTelemetryRecorder; - use opentelemetry::{InstrumentationScope, metrics::MeterProvider}; - use opentelemetry_otlp::SpanExporter; - use opentelemetry_sdk::{ - Resource, - metrics::{PeriodicReader, SdkMeterProvider}, - }; - - const INSTRUMENTATION_NAME: &str = "aisix"; - - let (tx, rx) = oneshot::channel::<()>(); - - // log - logforth::starter_log::builder() - .dispatch(|d| { - d.filter(EnvFilterBuilder::from_default_env_or("info,opentelemetry_sdk=off").build()) - .append(Stdout::default().with_layout(TextLayout::default())) - }) - .dispatch(|d| { - d.filter(EnvFilterBuilder::from_default_env_or("info").build()) - .append(FastraceEvent::default()) - }) - .apply(); - - // trace - let reporter = OpenTelemetryReporter::new( - SpanExporter::builder() - .build() - .context("failed to initialize otlp exporter")?, - Cow::Owned(Resource::builder().build()), - InstrumentationScope::builder(INSTRUMENTATION_NAME) - .with_version(env!("CARGO_PKG_VERSION")) - .build(), - ); - fastrace::set_reporter( - reporter, - Config::default().report_interval(Duration::from_secs(1)), - ); - - // metric - let exporter = opentelemetry_otlp::MetricExporter::builder().build()?; - - let reader = PeriodicReader::builder(exporter).build(); - - let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); - let meter = meter_provider.meter(INSTRUMENTATION_NAME); - - metrics::set_global_recorder(OpenTelemetryRecorder::new(meter)) - .context("failed to initialize metrics recorder")?; - utils::metrics::describe_metrics(); - - // shutting down signal handler - let shutdown_handle = tokio::spawn(async move { - let _ = rx.await; - - fastrace::flush(); - - if let Err(e) = meter_provider.shutdown() { - error!("Error shutting down meter provider: {}", e); - } - - logforth::core::default_logger().flush(); - logforth::core::default_logger().exit(); - }); - - Ok((tx, shutdown_handle)) -} - async fn serve_proxy(config: Arc, router: Router) -> Result<()> { serve( "Proxy", diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 72b37f9..7641248 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,3 +1,4 @@ pub mod future; pub mod jsonschema; pub mod metrics; +pub mod observability; diff --git a/src/utils/observability/mod.rs b/src/utils/observability/mod.rs new file mode 100644 index 0000000..2fa9dfe --- /dev/null +++ b/src/utils/observability/mod.rs @@ -0,0 +1,122 @@ +mod trace; + +use std::borrow::Cow; + +use anyhow::{Context, Result}; +use fastrace::collector::Config as FastraceConfig; +use fastrace_opentelemetry::OpenTelemetryReporter; +use log::error; +use logforth::{ + append::{FastraceEvent, Stdout}, + filter::env_filter::EnvFilterBuilder, + layout::TextLayout, +}; +use metrics_exporter_otel::OpenTelemetryRecorder; +use opentelemetry::{InstrumentationScope, metrics::MeterProvider}; +use opentelemetry_otlp::SpanExporter; +use opentelemetry_sdk::{ + Resource, + metrics::{PeriodicReader, SdkMeterProvider}, +}; +use tokio::{sync::oneshot, task::JoinHandle}; +pub use trace::*; + +use crate::utils; + +pub const INSTRUMENTATION_NAME: &str = "aisix"; + +pub fn shutdown_handler(f: F) -> (oneshot::Sender<()>, tokio::task::JoinHandle<()>) +where + F: FnOnce() -> Fut + Send + 'static, + Fut: Future + Send + 'static, +{ + let (tx, rx) = oneshot::channel::<()>(); + let shutdown_handle = tokio::spawn(async move { + let _ = rx.await; + f().await; + }); + (tx, shutdown_handle) +} + +/// Initialize observability logging. +pub fn init_observability_log() -> Result<(oneshot::Sender<()>, JoinHandle<()>)> { + logforth::starter_log::builder() + .dispatch(|d| { + d.filter(EnvFilterBuilder::from_default_env_or("info,opentelemetry_sdk=off").build()) + .append(Stdout::default().with_layout(TextLayout::default())) + }) + .dispatch(|d| { + d.filter(EnvFilterBuilder::from_default_env_or("info").build()) + .append(FastraceEvent::default()) + }) + .apply(); + + Ok(shutdown_handler(|| async move { + logforth::core::default_logger().flush(); + logforth::core::default_logger().exit(); + })) +} + +/// Initialize observability tracing. +pub fn init_observability_trace( + span_exporter: Option, + config: Option, +) -> Result<(oneshot::Sender<()>, JoinHandle<()>)> { + let reporter = OpenTelemetryReporter::new( + match span_exporter { + Some(exporter) => exporter, + None => BoxedSpanExporter::new( + SpanExporter::builder() + .build() + .context("failed to initialize otlp exporter")?, + ), + }, + Cow::Owned(Resource::builder().build()), + InstrumentationScope::builder(INSTRUMENTATION_NAME) + .with_version(env!("CARGO_PKG_VERSION")) + .build(), + ); + fastrace::set_reporter(reporter, config.unwrap_or_default()); + + Ok(shutdown_handler(|| async move { fastrace::flush() })) +} + +/// Initialize observability metrics. +pub fn init_observability_metric() -> Result<(oneshot::Sender<()>, JoinHandle<()>)> { + let exporter = opentelemetry_otlp::MetricExporter::builder().build()?; + + let reader = PeriodicReader::builder(exporter).build(); + + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let meter = meter_provider.meter(INSTRUMENTATION_NAME); + + metrics::set_global_recorder(OpenTelemetryRecorder::new(meter)) + .context("failed to initialize metrics recorder")?; + utils::metrics::describe_metrics(); + + // shutting down signal handler + Ok(shutdown_handler(|| async move { + if let Err(e) = meter_provider.shutdown() { + error!("Error shutting down meter provider: {}", e); + } + })) +} + +/// Initialize observability (logging, tracing, metrics). +/// +/// Returns `(shutdown_sender, shutdown_task_handle)`. +/// Call `shutdown_sender.send(())` to flush and shut down observability. +pub fn init_observability() -> Result<(oneshot::Sender<()>, tokio::task::JoinHandle<()>)> { + let (log_tx, log_shutdown_handle) = init_observability_log()?; + let (trace_tx, trace_shutdown_handle) = init_observability_trace(None, None)?; + let (metric_tx, metric_shutdown_handle) = init_observability_metric()?; + + Ok(shutdown_handler(|| async move { + let _ = trace_tx.send(()); + let _ = trace_shutdown_handle.await; + let _ = metric_tx.send(()); + let _ = metric_shutdown_handle.await; + let _ = log_tx.send(()); + let _ = log_shutdown_handle.await; + })) +} diff --git a/src/utils/observability/trace.rs b/src/utils/observability/trace.rs new file mode 100644 index 0000000..51411ff --- /dev/null +++ b/src/utils/observability/trace.rs @@ -0,0 +1,67 @@ +use std::{fmt::Debug, time::Duration}; + +use async_trait::async_trait; +use opentelemetry_sdk::{ + Resource, + error::OTelSdkResult, + trace::{SpanData, SpanExporter}, +}; + +/// Type-erased span exporter trait object. +#[async_trait] +pub trait DynSpanExporter: Send + Sync + Debug { + async fn export(&self, batch: Vec) -> OTelSdkResult; + + fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult; + + fn force_flush(&self) -> OTelSdkResult; + + fn set_resource(&mut self, resource: &Resource); +} + +#[async_trait] +impl DynSpanExporter for T { + async fn export(&self, batch: Vec) -> OTelSdkResult { + SpanExporter::export(self, batch).await + } + + fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { + SpanExporter::shutdown_with_timeout(self, timeout) + } + + fn force_flush(&self) -> OTelSdkResult { + SpanExporter::force_flush(self) + } + + fn set_resource(&mut self, resource: &Resource) { + SpanExporter::set_resource(self, resource) + } +} + +/// Type-erased span exporter adapter. +#[derive(Debug)] +pub struct BoxedSpanExporter(Box); + +impl BoxedSpanExporter { + pub fn new(span_exporter: T) -> Self { + Self(Box::new(span_exporter)) + } +} + +impl SpanExporter for BoxedSpanExporter { + async fn export(&self, batch: Vec) -> OTelSdkResult { + self.0.export(batch).await + } + + fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { + self.0.shutdown_with_timeout(timeout) + } + + fn force_flush(&self) -> OTelSdkResult { + self.0.force_flush() + } + + fn set_resource(&mut self, resource: &Resource) { + self.0.set_resource(resource) + } +}