From 51d8e566ad6228d097f2d76daae32bcfe6c3ab0d Mon Sep 17 00:00:00 2001 From: Winter Zhang Date: Thu, 23 May 2024 18:46:06 +0800 Subject: [PATCH] chore(query): add timeout for open telemetry logger (#15627) --- src/common/tracing/src/loggers.rs | 57 ++++++++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/src/common/tracing/src/loggers.rs b/src/common/tracing/src/loggers.rs index e2a7fd24e275..13cb1748fb07 100644 --- a/src/common/tracing/src/loggers.rs +++ b/src/common/tracing/src/loggers.rs @@ -17,15 +17,22 @@ use std::fmt; use std::io::BufWriter; use std::path::Path; use std::sync::Arc; +use std::sync::Mutex; use std::time::Duration; use std::time::SystemTime; use databend_common_base::runtime::ThreadTracker; use fern::FormatCallback; +use opentelemetry::global; use opentelemetry::logs::AnyValue; +use opentelemetry::logs::LogError; +use opentelemetry::logs::LogResult; use opentelemetry::logs::Severity; use opentelemetry::InstrumentationLibrary; use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::export::logs::LogData; +use opentelemetry_sdk::export::logs::LogExporter; +use opentelemetry_sdk::logs::LogProcessor; use serde_json::Map; use tracing_appender::non_blocking::NonBlocking; use tracing_appender::non_blocking::WorkerGuard; @@ -89,6 +96,54 @@ pub(crate) struct OpenTelemetryLogger { provider: opentelemetry_sdk::logs::LoggerProvider, } +#[derive(Debug)] +struct LogProcessorWarp(Mutex>); + +impl LogProcessor for LogProcessorWarp { + fn emit(&self, data: LogData) { + match self.0.lock() { + Ok(mut exporter) => { + let err = databend_common_base::runtime::block_on( + databend_common_base::base::tokio::time::timeout( + Duration::from_secs(20), + exporter.export(vec![data]), + ), + ); + + match err { + Ok(Ok(_)) => { + // do nothing + } + Ok(Err(err)) => { + global::handle_error(err); + } + Err(_) => { + global::handle_error(LogError::Other("timeout with lock".into())); + } + } + } + Err(_) => { + global::handle_error(LogError::Other("simple logprocessor mutex poison".into())); + } + } + } + + fn force_flush(&self) -> LogResult<()> { + Ok(()) + } + + fn shutdown(&mut self) -> LogResult<()> { + if let Ok(mut exporter) = self.0.lock() { + exporter.shutdown(); + Ok(()) + } else { + Err(LogError::Other( + "simple logprocessor mutex poison during shutdown".into(), + )) + } + } +} + impl OpenTelemetryLogger { pub(crate) fn new( name: impl ToString, @@ -113,7 +168,7 @@ impl OpenTelemetryLogger { .build_log_exporter() .expect("build log exporter"); let provider = opentelemetry_sdk::logs::LoggerProvider::builder() - .with_simple_exporter(exporter) + .with_log_processor(LogProcessorWarp(Mutex::new(Box::new(exporter)))) .with_config( opentelemetry_sdk::logs::Config::default() .with_resource(opentelemetry_sdk::Resource::new(kvs)),