Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 4 additions & 84 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ mod admin;
pub mod config;
mod gateway;
mod proxy;
mod utils;
pub mod utils;

use std::sync::Arc;
use std::{fmt::Debug, sync::Arc};

use anyhow::{Context, Result, anyhow};
use axum::Router;
use clap::Parser;
use log::{error, info};
use tokio::{select, sync::oneshot};

use crate::utils::observability::init_observability;

/// Git hash of the aisix core at build time.
pub const GIT_HASH: &str = env!("VERGEN_GIT_SHA");

Expand Down Expand Up @@ -119,88 +121,6 @@ pub async fn run_with_provider(
res
}

/// Initialize observability (logging, tracing, metrics).
///
/// Returns `(shutdown_sender, shutdown_task_handle)`.
/// Call `shutdown_sender.send(())` to flush and shut down observability.
pub fn init_observability() -> Result<(oneshot::Sender<()>, tokio::task::JoinHandle<()>)> {
use std::{borrow::Cow, time::Duration};

use fastrace::collector::Config;
use fastrace_opentelemetry::OpenTelemetryReporter;
use logforth::{
append::{FastraceEvent, Stdout},
filter::env_filter::EnvFilterBuilder,
layout::TextLayout,
};
use metrics_exporter_otel::OpenTelemetryRecorder;
use opentelemetry::{InstrumentationScope, metrics::MeterProvider};
use opentelemetry_otlp::SpanExporter;
use opentelemetry_sdk::{
Resource,
metrics::{PeriodicReader, SdkMeterProvider},
};

const INSTRUMENTATION_NAME: &str = "aisix";

let (tx, rx) = oneshot::channel::<()>();

// log
logforth::starter_log::builder()
.dispatch(|d| {
d.filter(EnvFilterBuilder::from_default_env_or("info,opentelemetry_sdk=off").build())
.append(Stdout::default().with_layout(TextLayout::default()))
})
.dispatch(|d| {
d.filter(EnvFilterBuilder::from_default_env_or("info").build())
.append(FastraceEvent::default())
})
.apply();

// trace
let reporter = OpenTelemetryReporter::new(
SpanExporter::builder()
.build()
.context("failed to initialize otlp exporter")?,
Cow::Owned(Resource::builder().build()),
InstrumentationScope::builder(INSTRUMENTATION_NAME)
.with_version(env!("CARGO_PKG_VERSION"))
.build(),
);
fastrace::set_reporter(
reporter,
Config::default().report_interval(Duration::from_secs(1)),
);

// metric
let exporter = opentelemetry_otlp::MetricExporter::builder().build()?;

let reader = PeriodicReader::builder(exporter).build();

let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter(INSTRUMENTATION_NAME);

metrics::set_global_recorder(OpenTelemetryRecorder::new(meter))
.context("failed to initialize metrics recorder")?;
utils::metrics::describe_metrics();

// shutting down signal handler
let shutdown_handle = tokio::spawn(async move {
let _ = rx.await;

fastrace::flush();

if let Err(e) = meter_provider.shutdown() {
error!("Error shutting down meter provider: {}", e);
}

logforth::core::default_logger().flush();
logforth::core::default_logger().exit();
});

Ok((tx, shutdown_handle))
}

async fn serve_proxy(config: Arc<config::Config>, router: Router) -> Result<()> {
serve(
"Proxy",
Expand Down
1 change: 1 addition & 0 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod future;
pub mod jsonschema;
pub mod metrics;
pub mod observability;
Comment thread
bzp2010 marked this conversation as resolved.
122 changes: 122 additions & 0 deletions src/utils/observability/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
mod trace;

use std::borrow::Cow;

use anyhow::{Context, Result};
use fastrace::collector::Config as FastraceConfig;
use fastrace_opentelemetry::OpenTelemetryReporter;
use log::error;
use logforth::{
append::{FastraceEvent, Stdout},
filter::env_filter::EnvFilterBuilder,
layout::TextLayout,
};
use metrics_exporter_otel::OpenTelemetryRecorder;
use opentelemetry::{InstrumentationScope, metrics::MeterProvider};
use opentelemetry_otlp::SpanExporter;
use opentelemetry_sdk::{
Resource,
metrics::{PeriodicReader, SdkMeterProvider},
};
use tokio::{sync::oneshot, task::JoinHandle};
pub use trace::*;

use crate::utils;

pub const INSTRUMENTATION_NAME: &str = "aisix";

pub fn shutdown_handler<F, Fut>(f: F) -> (oneshot::Sender<()>, tokio::task::JoinHandle<()>)
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
Comment thread
bzp2010 marked this conversation as resolved.
let (tx, rx) = oneshot::channel::<()>();
let shutdown_handle = tokio::spawn(async move {
let _ = rx.await;
f().await;
});
(tx, shutdown_handle)
}
Comment thread
bzp2010 marked this conversation as resolved.

/// Initialize observability logging.
pub fn init_observability_log() -> Result<(oneshot::Sender<()>, JoinHandle<()>)> {
logforth::starter_log::builder()
.dispatch(|d| {
d.filter(EnvFilterBuilder::from_default_env_or("info,opentelemetry_sdk=off").build())
.append(Stdout::default().with_layout(TextLayout::default()))
})
.dispatch(|d| {
d.filter(EnvFilterBuilder::from_default_env_or("info").build())
.append(FastraceEvent::default())
})
.apply();

Ok(shutdown_handler(|| async move {
logforth::core::default_logger().flush();
logforth::core::default_logger().exit();
}))
}

/// Initialize observability tracing.
pub fn init_observability_trace(
span_exporter: Option<BoxedSpanExporter>,
config: Option<FastraceConfig>,
) -> Result<(oneshot::Sender<()>, JoinHandle<()>)> {
let reporter = OpenTelemetryReporter::new(
match span_exporter {
Some(exporter) => exporter,
None => BoxedSpanExporter::new(
SpanExporter::builder()
.build()
.context("failed to initialize otlp exporter")?,
),
},
Cow::Owned(Resource::builder().build()),
InstrumentationScope::builder(INSTRUMENTATION_NAME)
.with_version(env!("CARGO_PKG_VERSION"))
.build(),
);
fastrace::set_reporter(reporter, config.unwrap_or_default());

Ok(shutdown_handler(|| async move { fastrace::flush() }))
}

/// Initialize observability metrics.
pub fn init_observability_metric() -> Result<(oneshot::Sender<()>, JoinHandle<()>)> {
let exporter = opentelemetry_otlp::MetricExporter::builder().build()?;

let reader = PeriodicReader::builder(exporter).build();

let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter(INSTRUMENTATION_NAME);

metrics::set_global_recorder(OpenTelemetryRecorder::new(meter))
.context("failed to initialize metrics recorder")?;
utils::metrics::describe_metrics();

// shutting down signal handler
Ok(shutdown_handler(|| async move {
if let Err(e) = meter_provider.shutdown() {
error!("Error shutting down meter provider: {}", e);
}
}))
}

/// Initialize observability (logging, tracing, metrics).
///
/// Returns `(shutdown_sender, shutdown_task_handle)`.
/// Call `shutdown_sender.send(())` to flush and shut down observability.
pub fn init_observability() -> Result<(oneshot::Sender<()>, tokio::task::JoinHandle<()>)> {
let (log_tx, log_shutdown_handle) = init_observability_log()?;
let (trace_tx, trace_shutdown_handle) = init_observability_trace(None, None)?;
let (metric_tx, metric_shutdown_handle) = init_observability_metric()?;

Ok(shutdown_handler(|| async move {
let _ = trace_tx.send(());
let _ = trace_shutdown_handle.await;
let _ = metric_tx.send(());
let _ = metric_shutdown_handle.await;
let _ = log_tx.send(());
let _ = log_shutdown_handle.await;
}))
}
67 changes: 67 additions & 0 deletions src/utils/observability/trace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::{fmt::Debug, time::Duration};

use async_trait::async_trait;
use opentelemetry_sdk::{
Resource,
error::OTelSdkResult,
trace::{SpanData, SpanExporter},
};

/// Type-erased span exporter trait object.
#[async_trait]
pub trait DynSpanExporter: Send + Sync + Debug {
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult;

fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult;

fn force_flush(&self) -> OTelSdkResult;

fn set_resource(&mut self, resource: &Resource);
}

#[async_trait]
impl<T: SpanExporter> DynSpanExporter for T {
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
SpanExporter::export(self, batch).await
}

fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
SpanExporter::shutdown_with_timeout(self, timeout)
}

fn force_flush(&self) -> OTelSdkResult {
SpanExporter::force_flush(self)
}

fn set_resource(&mut self, resource: &Resource) {
SpanExporter::set_resource(self, resource)
}
}

/// Type-erased span exporter adapter.
#[derive(Debug)]
pub struct BoxedSpanExporter(Box<dyn DynSpanExporter>);

impl BoxedSpanExporter {
pub fn new<T: SpanExporter + 'static>(span_exporter: T) -> Self {
Self(Box::new(span_exporter))
}
}

impl SpanExporter for BoxedSpanExporter {
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
self.0.export(batch).await
}

fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
self.0.shutdown_with_timeout(timeout)
}

fn force_flush(&self) -> OTelSdkResult {
self.0.force_flush()
}

fn set_resource(&mut self, resource: &Resource) {
self.0.set_resource(resource)
}
}
Loading