From d9a831734d300644fbd09beb17b860c25cc4ab7d Mon Sep 17 00:00:00 2001 From: bzp2010 Date: Tue, 28 Apr 2026 13:24:20 +0800 Subject: [PATCH 1/4] feat(observability): pluggable span exporter --- src/lib.rs | 30 ++++++++---- src/utils/mod.rs | 1 + src/utils/observability/mod.rs | 85 ++++++++++++++++++++++++++++++++++ 3 files changed, 108 insertions(+), 8 deletions(-) create mode 100644 src/utils/observability/mod.rs diff --git a/src/lib.rs b/src/lib.rs index 5bcd492..6be75ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,14 +4,17 @@ mod gateway; mod proxy; mod utils; -use std::sync::Arc; +use std::{fmt::Debug, sync::Arc, time::Duration}; use anyhow::{Context, Result, anyhow}; use axum::Router; use clap::Parser; use log::{error, info}; +use opentelemetry_otlp::SpanExporter; use tokio::{select, sync::oneshot}; +use crate::utils::observability::BoxedSpanExporter; + /// Git hash of the aisix core at build time. pub const GIT_HASH: &str = env!("VERGEN_GIT_SHA"); @@ -34,7 +37,7 @@ pub struct Args { /// and blocks until a signal is received or a server error occurs. pub async fn run(config_file: Option) -> Result<()> { let (ob_shutdown_signal, ob_shutdown_task) = - init_observability().context("failed to initialize observability")?; + init_observability(None).context("failed to initialize observability")?; let config = match config::load(config_file).context("failed to load configuration") { Ok(c) => Arc::new(c), Err(e) => { @@ -121,10 +124,16 @@ pub async fn run_with_provider( /// Initialize observability (logging, tracing, metrics). /// +/// When `span_exporter` is `Some`, the provided boxed exporter is used to +/// construct the tracing reporter. Otherwise an OTLP span exporter is created +/// internally. +/// /// Returns `(shutdown_sender, shutdown_task_handle)`. /// Call `shutdown_sender.send(())` to flush and shut down observability. -pub fn init_observability() -> Result<(oneshot::Sender<()>, tokio::task::JoinHandle<()>)> { - use std::{borrow::Cow, time::Duration}; +pub fn init_observability( + span_exporter: Option, +) -> Result<(oneshot::Sender<()>, tokio::task::JoinHandle<()>)> { + use std::borrow::Cow; use fastrace::collector::Config; use fastrace_opentelemetry::OpenTelemetryReporter; @@ -135,7 +144,6 @@ pub fn init_observability() -> Result<(oneshot::Sender<()>, tokio::task::JoinHan }; use metrics_exporter_otel::OpenTelemetryRecorder; use opentelemetry::{InstrumentationScope, metrics::MeterProvider}; - use opentelemetry_otlp::SpanExporter; use opentelemetry_sdk::{ Resource, metrics::{PeriodicReader, SdkMeterProvider}, @@ -158,10 +166,16 @@ pub fn init_observability() -> Result<(oneshot::Sender<()>, tokio::task::JoinHan .apply(); // trace + let span_exporter = match span_exporter { + Some(exporter) => exporter, + None => BoxedSpanExporter::new( + SpanExporter::builder() + .build() + .context("failed to initialize otlp exporter")?, + ), + }; let reporter = OpenTelemetryReporter::new( - SpanExporter::builder() - .build() - .context("failed to initialize otlp exporter")?, + span_exporter, Cow::Owned(Resource::builder().build()), InstrumentationScope::builder(INSTRUMENTATION_NAME) .with_version(env!("CARGO_PKG_VERSION")) diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 72b37f9..7641248 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,3 +1,4 @@ pub mod future; pub mod jsonschema; pub mod metrics; +pub mod observability; diff --git a/src/utils/observability/mod.rs b/src/utils/observability/mod.rs new file mode 100644 index 0000000..975ae8a --- /dev/null +++ b/src/utils/observability/mod.rs @@ -0,0 +1,85 @@ +use std::{fmt::Debug, time::Duration}; + +use async_trait::async_trait; +use opentelemetry_sdk::{ + Resource, + error::OTelSdkResult, + trace::{SpanData, SpanExporter}, +}; + +#[async_trait] +pub trait DynSpanExporter: Send + Sync + Debug { + async fn export(&self, batch: Vec) -> OTelSdkResult; + + fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult; + + fn force_flush(&self) -> OTelSdkResult; + + fn set_resource(&mut self, resource: &Resource); +} + +#[async_trait] +impl DynSpanExporter for T { + async fn export(&self, batch: Vec) -> OTelSdkResult { + Self::export(self, batch).await + } + + fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { + Self::shutdown_with_timeout(self, timeout) + } + + fn force_flush(&self) -> OTelSdkResult { + Self::force_flush(self) + } + + fn set_resource(&mut self, resource: &Resource) { + Self::set_resource(self, resource) + } +} + +/// Type-erased span exporter adapter for `init_observability`. +#[derive(Debug)] +pub struct BoxedSpanExporter(Box); + +impl BoxedSpanExporter { + pub fn new(span_exporter: T) -> Self { + Self(Box::new(span_exporter)) + } +} + +impl SpanExporter for BoxedSpanExporter { + async fn export(&self, batch: Vec) -> OTelSdkResult { + self.0.export(batch).await + } + + fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { + self.0.shutdown_with_timeout(timeout) + } + + fn force_flush(&self) -> OTelSdkResult { + self.0.force_flush() + } + + fn set_resource(&mut self, resource: &Resource) { + self.0.set_resource(resource) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[rstest::rstest] + #[case(None)] + #[case(Some(BoxedSpanExporter::new( + opentelemetry_otlp::SpanExporter::builder().build().unwrap(), + )))] + fn test_boxed_span_exporter(#[case] span_exporter: Option) { + let _ = match span_exporter { + Some(exporter) => exporter, + None => { + BoxedSpanExporter::new(opentelemetry_otlp::SpanExporter::builder().build().unwrap()) + } + }; + } +} From 40d053a29a9831fda98ba0692e9d6499dcc7424b Mon Sep 17 00:00:00 2001 From: bzp2010 Date: Tue, 28 Apr 2026 14:17:03 +0800 Subject: [PATCH 2/4] fix comments --- src/lib.rs | 102 +---------------- src/utils/observability/mod.rs | 189 +++++++++++++++++++------------ src/utils/observability/trace.rs | 67 +++++++++++ 3 files changed, 190 insertions(+), 168 deletions(-) create mode 100644 src/utils/observability/trace.rs diff --git a/src/lib.rs b/src/lib.rs index 6be75ae..64f371e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,18 +2,17 @@ mod admin; pub mod config; mod gateway; mod proxy; -mod utils; +pub mod utils; -use std::{fmt::Debug, sync::Arc, time::Duration}; +use std::{fmt::Debug, sync::Arc}; use anyhow::{Context, Result, anyhow}; use axum::Router; use clap::Parser; use log::{error, info}; -use opentelemetry_otlp::SpanExporter; use tokio::{select, sync::oneshot}; -use crate::utils::observability::BoxedSpanExporter; +use crate::utils::observability::init_observability; /// Git hash of the aisix core at build time. pub const GIT_HASH: &str = env!("VERGEN_GIT_SHA"); @@ -37,7 +36,7 @@ pub struct Args { /// and blocks until a signal is received or a server error occurs. pub async fn run(config_file: Option) -> Result<()> { let (ob_shutdown_signal, ob_shutdown_task) = - init_observability(None).context("failed to initialize observability")?; + init_observability().context("failed to initialize observability")?; let config = match config::load(config_file).context("failed to load configuration") { Ok(c) => Arc::new(c), Err(e) => { @@ -122,99 +121,6 @@ pub async fn run_with_provider( res } -/// Initialize observability (logging, tracing, metrics). -/// -/// When `span_exporter` is `Some`, the provided boxed exporter is used to -/// construct the tracing reporter. Otherwise an OTLP span exporter is created -/// internally. -/// -/// Returns `(shutdown_sender, shutdown_task_handle)`. -/// Call `shutdown_sender.send(())` to flush and shut down observability. -pub fn init_observability( - span_exporter: Option, -) -> Result<(oneshot::Sender<()>, tokio::task::JoinHandle<()>)> { - use std::borrow::Cow; - - use fastrace::collector::Config; - use fastrace_opentelemetry::OpenTelemetryReporter; - use logforth::{ - append::{FastraceEvent, Stdout}, - filter::env_filter::EnvFilterBuilder, - layout::TextLayout, - }; - use metrics_exporter_otel::OpenTelemetryRecorder; - use opentelemetry::{InstrumentationScope, metrics::MeterProvider}; - use opentelemetry_sdk::{ - Resource, - metrics::{PeriodicReader, SdkMeterProvider}, - }; - - const INSTRUMENTATION_NAME: &str = "aisix"; - - let (tx, rx) = oneshot::channel::<()>(); - - // log - logforth::starter_log::builder() - .dispatch(|d| { - d.filter(EnvFilterBuilder::from_default_env_or("info,opentelemetry_sdk=off").build()) - .append(Stdout::default().with_layout(TextLayout::default())) - }) - .dispatch(|d| { - d.filter(EnvFilterBuilder::from_default_env_or("info").build()) - .append(FastraceEvent::default()) - }) - .apply(); - - // trace - let span_exporter = match span_exporter { - Some(exporter) => exporter, - None => BoxedSpanExporter::new( - SpanExporter::builder() - .build() - .context("failed to initialize otlp exporter")?, - ), - }; - let reporter = OpenTelemetryReporter::new( - span_exporter, - Cow::Owned(Resource::builder().build()), - InstrumentationScope::builder(INSTRUMENTATION_NAME) - .with_version(env!("CARGO_PKG_VERSION")) - .build(), - ); - fastrace::set_reporter( - reporter, - Config::default().report_interval(Duration::from_secs(1)), - ); - - // metric - let exporter = opentelemetry_otlp::MetricExporter::builder().build()?; - - let reader = PeriodicReader::builder(exporter).build(); - - let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); - let meter = meter_provider.meter(INSTRUMENTATION_NAME); - - metrics::set_global_recorder(OpenTelemetryRecorder::new(meter)) - .context("failed to initialize metrics recorder")?; - utils::metrics::describe_metrics(); - - // shutting down signal handler - let shutdown_handle = tokio::spawn(async move { - let _ = rx.await; - - fastrace::flush(); - - if let Err(e) = meter_provider.shutdown() { - error!("Error shutting down meter provider: {}", e); - } - - logforth::core::default_logger().flush(); - logforth::core::default_logger().exit(); - }); - - Ok((tx, shutdown_handle)) -} - async fn serve_proxy(config: Arc, router: Router) -> Result<()> { serve( "Proxy", diff --git a/src/utils/observability/mod.rs b/src/utils/observability/mod.rs index 975ae8a..74531c5 100644 --- a/src/utils/observability/mod.rs +++ b/src/utils/observability/mod.rs @@ -1,85 +1,134 @@ -use std::{fmt::Debug, time::Duration}; - -use async_trait::async_trait; +mod trace; + +use std::borrow::Cow; + +use anyhow::{Context, Result}; +use fastrace::collector::Config as FastraceConfig; +use fastrace_opentelemetry::OpenTelemetryReporter; +use log::error; +use logforth::{ + append::{FastraceEvent, Stdout}, + filter::env_filter::EnvFilterBuilder, + layout::TextLayout, +}; +use metrics_exporter_otel::OpenTelemetryRecorder; +use opentelemetry::{InstrumentationScope, metrics::MeterProvider}; +use opentelemetry_otlp::SpanExporter; use opentelemetry_sdk::{ Resource, - error::OTelSdkResult, - trace::{SpanData, SpanExporter}, + metrics::{PeriodicReader, SdkMeterProvider}, }; - -#[async_trait] -pub trait DynSpanExporter: Send + Sync + Debug { - async fn export(&self, batch: Vec) -> OTelSdkResult; - - fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult; - - fn force_flush(&self) -> OTelSdkResult; - - fn set_resource(&mut self, resource: &Resource); +use tokio::{sync::oneshot, task::JoinHandle}; +pub use trace::*; + +use crate::utils; + +pub const INSTRUMENTATION_NAME: &str = "aisix"; + +pub fn shutdown_handler(f: F) -> (oneshot::Sender<()>, tokio::task::JoinHandle<()>) +where + F: FnOnce() -> Fut + Send + 'static, + Fut: Future + Send + 'static, +{ + let (tx, rx) = oneshot::channel::<()>(); + let shutdown_handle = tokio::spawn(async move { + let _ = rx.await; + f().await; + }); + (tx, shutdown_handle) } -#[async_trait] -impl DynSpanExporter for T { - async fn export(&self, batch: Vec) -> OTelSdkResult { - Self::export(self, batch).await - } - - fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { - Self::shutdown_with_timeout(self, timeout) - } - - fn force_flush(&self) -> OTelSdkResult { - Self::force_flush(self) - } - - fn set_resource(&mut self, resource: &Resource) { - Self::set_resource(self, resource) - } +/// Initialize observability logging. +pub fn init_observability_log() -> Result<(oneshot::Sender<()>, JoinHandle<()>)> { + logforth::starter_log::builder() + .dispatch(|d| { + d.filter(EnvFilterBuilder::from_default_env_or("info,opentelemetry_sdk=off").build()) + .append(Stdout::default().with_layout(TextLayout::default())) + }) + .dispatch(|d| { + d.filter(EnvFilterBuilder::from_default_env_or("info").build()) + .append(FastraceEvent::default()) + }) + .apply(); + + Ok(shutdown_handler(|| async move { + logforth::core::default_logger().flush(); + logforth::core::default_logger().exit(); + })) } -/// Type-erased span exporter adapter for `init_observability`. -#[derive(Debug)] -pub struct BoxedSpanExporter(Box); - -impl BoxedSpanExporter { - pub fn new(span_exporter: T) -> Self { - Self(Box::new(span_exporter)) - } +/// Initialize observability tracing. +pub fn init_observability_trace( + span_exporter: Option, + config: Option, +) -> Result<(oneshot::Sender<()>, JoinHandle<()>)> { + let reporter = OpenTelemetryReporter::new( + match span_exporter { + Some(exporter) => exporter, + None => BoxedSpanExporter::new( + SpanExporter::builder() + .build() + .context("failed to initialize otlp exporter")?, + ), + }, + Cow::Owned(Resource::builder().build()), + InstrumentationScope::builder(INSTRUMENTATION_NAME) + .with_version(env!("CARGO_PKG_VERSION")) + .build(), + ); + fastrace::set_reporter( + reporter, + config.unwrap_or_else(|| FastraceConfig::default()), + ); + + Ok(shutdown_handler(|| async move { fastrace::flush() })) } -impl SpanExporter for BoxedSpanExporter { - async fn export(&self, batch: Vec) -> OTelSdkResult { - self.0.export(batch).await - } +/// Initialize observability metrics. +pub fn init_observability_metric() -> Result<(oneshot::Sender<()>, JoinHandle<()>)> { + let exporter = opentelemetry_otlp::MetricExporter::builder().build()?; + + let reader = PeriodicReader::builder(exporter).build(); - fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { - self.0.shutdown_with_timeout(timeout) - } + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let meter = meter_provider.meter(INSTRUMENTATION_NAME); - fn force_flush(&self) -> OTelSdkResult { - self.0.force_flush() - } + metrics::set_global_recorder(OpenTelemetryRecorder::new(meter)) + .context("failed to initialize metrics recorder")?; + utils::metrics::describe_metrics(); - fn set_resource(&mut self, resource: &Resource) { - self.0.set_resource(resource) - } + // shutting down signal handler + Ok(shutdown_handler(|| async move { + if let Err(e) = meter_provider.shutdown() { + error!("Error shutting down meter provider: {}", e); + } + })) } -#[cfg(test)] -mod tests { - use super::*; - - #[rstest::rstest] - #[case(None)] - #[case(Some(BoxedSpanExporter::new( - opentelemetry_otlp::SpanExporter::builder().build().unwrap(), - )))] - fn test_boxed_span_exporter(#[case] span_exporter: Option) { - let _ = match span_exporter { - Some(exporter) => exporter, - None => { - BoxedSpanExporter::new(opentelemetry_otlp::SpanExporter::builder().build().unwrap()) - } - }; - } +/// Initialize observability (logging, tracing, metrics). +/// +/// When `span_exporter` is `Some`, the provided boxed exporter is used to +/// construct the tracing reporter. Otherwise an OTLP span exporter is created +/// internally. +/// +/// Returns `(shutdown_sender, shutdown_task_handle)`. +/// Call `shutdown_sender.send(())` to flush and shut down observability. +pub fn init_observability() -> Result<(oneshot::Sender<()>, tokio::task::JoinHandle<()>)> { + // log + let (log_tx, log_shutdown_handle) = init_observability_log()?; + + // trace + let (trace_tx, trace_shutdown_handle) = init_observability_trace(None, None)?; + + // metric + let (metric_tx, metric_shutdown_handle) = init_observability_metric()?; + + Ok(shutdown_handler(|| async move { + let _ = trace_tx.send(()); + let _ = trace_shutdown_handle.await; + let _ = metric_tx.send(()); + let _ = metric_shutdown_handle.await; + let _ = log_tx.send(()); + let _ = log_shutdown_handle.await; + })) } diff --git a/src/utils/observability/trace.rs b/src/utils/observability/trace.rs new file mode 100644 index 0000000..e627dbb --- /dev/null +++ b/src/utils/observability/trace.rs @@ -0,0 +1,67 @@ +use std::{fmt::Debug, time::Duration}; + +use async_trait::async_trait; +use opentelemetry_sdk::{ + Resource, + error::OTelSdkResult, + trace::{SpanData, SpanExporter}, +}; + +/// Type-erased span exporter trait object. +#[async_trait] +pub trait DynSpanExporter: Send + Sync + Debug { + async fn export(&self, batch: Vec) -> OTelSdkResult; + + fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult; + + fn force_flush(&self) -> OTelSdkResult; + + fn set_resource(&mut self, resource: &Resource); +} + +#[async_trait] +impl DynSpanExporter for T { + async fn export(&self, batch: Vec) -> OTelSdkResult { + Self::export(self, batch).await + } + + fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { + Self::shutdown_with_timeout(self, timeout) + } + + fn force_flush(&self) -> OTelSdkResult { + Self::force_flush(self) + } + + fn set_resource(&mut self, resource: &Resource) { + Self::set_resource(self, resource) + } +} + +/// Type-erased span exporter adapter. +#[derive(Debug)] +pub struct BoxedSpanExporter(Box); + +impl BoxedSpanExporter { + pub fn new(span_exporter: T) -> Self { + Self(Box::new(span_exporter)) + } +} + +impl SpanExporter for BoxedSpanExporter { + async fn export(&self, batch: Vec) -> OTelSdkResult { + self.0.export(batch).await + } + + fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { + self.0.shutdown_with_timeout(timeout) + } + + fn force_flush(&self) -> OTelSdkResult { + self.0.force_flush() + } + + fn set_resource(&mut self, resource: &Resource) { + self.0.set_resource(resource) + } +} From 43a9b5f53b28fd790fa5169e681eda7184a9c1dc Mon Sep 17 00:00:00 2001 From: bzp2010 Date: Tue, 28 Apr 2026 14:24:16 +0800 Subject: [PATCH 3/4] fix test --- src/utils/observability/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/utils/observability/mod.rs b/src/utils/observability/mod.rs index 74531c5..6253220 100644 --- a/src/utils/observability/mod.rs +++ b/src/utils/observability/mod.rs @@ -76,10 +76,7 @@ pub fn init_observability_trace( .with_version(env!("CARGO_PKG_VERSION")) .build(), ); - fastrace::set_reporter( - reporter, - config.unwrap_or_else(|| FastraceConfig::default()), - ); + fastrace::set_reporter(reporter, config.unwrap_or_default()); Ok(shutdown_handler(|| async move { fastrace::flush() })) } From 05d4358c275c60abc135f9e25dc9c0d03e0142c6 Mon Sep 17 00:00:00 2001 From: bzp2010 Date: Tue, 28 Apr 2026 14:33:06 +0800 Subject: [PATCH 4/4] fix comments --- src/utils/observability/mod.rs | 9 --------- src/utils/observability/trace.rs | 8 ++++---- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/src/utils/observability/mod.rs b/src/utils/observability/mod.rs index 6253220..2fa9dfe 100644 --- a/src/utils/observability/mod.rs +++ b/src/utils/observability/mod.rs @@ -104,20 +104,11 @@ pub fn init_observability_metric() -> Result<(oneshot::Sender<()>, JoinHandle<() /// Initialize observability (logging, tracing, metrics). /// -/// When `span_exporter` is `Some`, the provided boxed exporter is used to -/// construct the tracing reporter. Otherwise an OTLP span exporter is created -/// internally. -/// /// Returns `(shutdown_sender, shutdown_task_handle)`. /// Call `shutdown_sender.send(())` to flush and shut down observability. pub fn init_observability() -> Result<(oneshot::Sender<()>, tokio::task::JoinHandle<()>)> { - // log let (log_tx, log_shutdown_handle) = init_observability_log()?; - - // trace let (trace_tx, trace_shutdown_handle) = init_observability_trace(None, None)?; - - // metric let (metric_tx, metric_shutdown_handle) = init_observability_metric()?; Ok(shutdown_handler(|| async move { diff --git a/src/utils/observability/trace.rs b/src/utils/observability/trace.rs index e627dbb..51411ff 100644 --- a/src/utils/observability/trace.rs +++ b/src/utils/observability/trace.rs @@ -22,19 +22,19 @@ pub trait DynSpanExporter: Send + Sync + Debug { #[async_trait] impl DynSpanExporter for T { async fn export(&self, batch: Vec) -> OTelSdkResult { - Self::export(self, batch).await + SpanExporter::export(self, batch).await } fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { - Self::shutdown_with_timeout(self, timeout) + SpanExporter::shutdown_with_timeout(self, timeout) } fn force_flush(&self) -> OTelSdkResult { - Self::force_flush(self) + SpanExporter::force_flush(self) } fn set_resource(&mut self, resource: &Resource) { - Self::set_resource(self, resource) + SpanExporter::set_resource(self, resource) } }