Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ chrono-tz = { version = "0.8", features = ["serde"] }
cidr = { version = "0.2.2" }
clap = { version = "4.4.2", features = ["derive"] }
comfy-table = "7"
concurrent-queue = "2.5.0"
convert_case = "0.6.0"
cookie = "0.18.1"
crc32fast = "1.3.2"
Expand Down
4 changes: 4 additions & 0 deletions scripts/ci/deploy/config/databend-query-node-otlp-logs.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ otlp_protocol = "http"
pkey1 = "pvalue1"
pkey2 = "pvalue2"

[log.persistentlog]
on = true
level = "INFO"

[meta]
endpoints = ["0.0.0.0:9191"]
username = "root"
Expand Down
5 changes: 5 additions & 0 deletions src/binaries/query/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use databend_common_tracing::set_panic_hook;
use databend_enterprise_background_service::get_background_service_handler;
use databend_query::clusters::ClusterDiscovery;
use databend_query::local;
use databend_query::persistent_log::GlobalPersistentLog;
use databend_query::servers::admin::AdminService;
use databend_query::servers::flight::FlightService;
use databend_query::servers::metrics::MetricService;
Expand Down Expand Up @@ -282,6 +283,10 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
if conf.log.structlog.on {
println!(" structlog: {}", conf.log.structlog);
}
if conf.log.persistentlog.on {
GlobalPersistentLog::instance().initialized();
println!(" persistentlog: {}", conf.log.persistentlog);
}

println!();
println!(
Expand Down
7 changes: 7 additions & 0 deletions src/common/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@ publish = { workspace = true }
edition = { workspace = true }

[dependencies]
anyhow = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
async-channel = { workspace = true }
backtrace = { workspace = true, features = ["std", "serialize-serde"] }
chrono = { workspace = true }
concurrent-queue = { workspace = true }
databend-common-base = { workspace = true }
databend-common-exception = { workspace = true }
defer = { workspace = true }
Expand All @@ -18,9 +23,11 @@ itertools = { workspace = true }
libc = { workspace = true }
log = { workspace = true }
logforth = { workspace = true }
opendal = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry-otlp = { workspace = true, features = ["reqwest-client"] }
opentelemetry_sdk = { workspace = true }
parquet = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tonic = { workspace = true }
Expand Down
32 changes: 32 additions & 0 deletions src/common/tracing/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct Config {
pub profile: ProfileLogConfig,
pub structlog: StructLogConfig,
pub tracing: TracingConfig,
pub persistentlog: PersistentLogConfig,
}

impl Config {
Expand Down Expand Up @@ -340,3 +341,34 @@ impl Default for OTLPEndpointConfig {
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)]
pub struct PersistentLogConfig {
pub on: bool,
pub interval: usize,
pub stage_name: String,
pub level: String,
pub retention: usize,
}

impl Display for PersistentLogConfig {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"enabled={}, interval={}, stage_name={}, level={}, retention={}",
self.on, self.interval, self.stage_name, self.level, self.retention
)
}
}

impl Default for PersistentLogConfig {
fn default() -> Self {
Self {
on: false,
interval: 2,
stage_name: "log_1f93b76af0bd4b1d8e018667865fbc65".to_string(),
retention: 72,
level: "WARN".to_string(),
}
}
}
65 changes: 63 additions & 2 deletions src/common/tracing/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

use std::borrow::Cow;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;

use databend_common_base::base::tokio;
use databend_common_base::base::tokio::sync::RwLock;
use databend_common_base::base::GlobalInstance;
use databend_common_base::runtime::Thread;
use fastrace::prelude::*;
Expand All @@ -25,24 +27,54 @@ use logforth::filter::env::EnvFilterBuilder;
use logforth::filter::EnvFilter;
use logforth::Dispatch;
use logforth::Logger;
use opendal::Operator;
use opentelemetry_otlp::WithExportConfig;

use crate::config::OTLPProtocol;
use crate::loggers::get_layout;
use crate::loggers::new_rolling_file_appender;
use crate::remote_log::RemoteLog;
use crate::structlog::StructLogReporter;
use crate::Config;

const HEADER_TRACE_PARENT: &str = "traceparent";

pub struct GlobalLogger {
_drop_guards: Vec<Box<dyn Send + Sync + 'static>>,
pub remote_log_operator: RwLock<Option<Operator>>,
}

impl GlobalLogger {
pub fn init(name: &str, cfg: &Config, labels: BTreeMap<String, String>) {
let _drop_guards = init_logging(name, cfg, labels);
GlobalInstance::set(Self { _drop_guards });

// GlobalLogger is initialized before DataOperator, so set the operator to None first
let remote_log_operator = RwLock::new(None);

let instance = Arc::new(Self {
_drop_guards,
remote_log_operator,
});
GlobalInstance::set(instance);
}

pub fn instance() -> Arc<GlobalLogger> {
GlobalInstance::get()
}

// Get the operator for remote log when it is ready.
pub(crate) async fn get_operator(&self) -> Option<Operator> {
let operator = self.remote_log_operator.read().await;
if let Some(operator) = operator.as_ref() {
return Some(operator.clone());
}
None
}

// Set the operator for remote log, this should be only called once
pub async fn set_operator(&self, operator: Operator) {
let mut remote_log_operator = self.remote_log_operator.write().await;
*remote_log_operator = Some(operator);
}
}

Expand Down Expand Up @@ -100,7 +132,6 @@ pub fn init_logging(
}
),
};

// initialize tracing a reporter
if cfg.tracing.on {
let endpoint = cfg.tracing.otlp.endpoint.clone();
Expand Down Expand Up @@ -345,6 +376,36 @@ pub fn init_logging(
logger = logger.dispatch(dispatch);
}

if cfg.persistentlog.on {
let (remote_log, flush_guard) =
RemoteLog::new(&labels, cfg).expect("initialize remote logger");

let mut filter_builder =
EnvFilterBuilder::new().filter(Some("databend::log::structlog"), LevelFilter::Off);

if cfg.profile.on && !cfg.profile.dir.is_empty() {
filter_builder =
filter_builder.filter(Some("databend::log::profile"), LevelFilter::Trace);
} else {
filter_builder =
filter_builder.filter(Some("databend::log::profile"), LevelFilter::Off);
}
if cfg.query.on && !cfg.query.dir.is_empty() {
filter_builder =
filter_builder.filter(Some("databend::log::query"), LevelFilter::Trace);
} else {
filter_builder = filter_builder.filter(Some("databend::log::query"), LevelFilter::Off);
}
let dispatch = Dispatch::new()
.filter(EnvFilter::new(
filter_builder.parse(&cfg.persistentlog.level),
))
.append(remote_log);

logger = logger.dispatch(dispatch);
_drop_guards.push(flush_guard);
}

// set global logger
if logger.apply().is_err() {
eprintln!("logger has already been set");
Expand Down
7 changes: 7 additions & 0 deletions src/common/tracing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod crash_hook;
mod init;
mod loggers;
mod panic_hook;
mod remote_log;
mod structlog;

pub use crash_hook::pipe_file;
Expand All @@ -32,6 +33,7 @@ pub use crate::config::FileConfig;
pub use crate::config::OTLPConfig;
pub use crate::config::OTLPEndpointConfig;
pub use crate::config::OTLPProtocol;
pub use crate::config::PersistentLogConfig;
pub use crate::config::ProfileLogConfig;
pub use crate::config::QueryLogConfig;
pub use crate::config::StderrConfig;
Expand All @@ -45,6 +47,11 @@ pub use crate::init::start_trace_for_remote_request;
pub use crate::init::GlobalLogger;
pub use crate::panic_hook::log_panic;
pub use crate::panic_hook::set_panic_hook;
pub use crate::remote_log::convert_to_batch;
pub use crate::remote_log::LogBuffer as RemoteLogBuffer;
pub use crate::remote_log::RemoteLog;
pub use crate::remote_log::RemoteLogElement;
pub use crate::remote_log::RemoteLogGuard;
pub use crate::structlog::DummyReporter;
pub use crate::structlog::StructLogReporter;

Expand Down
Loading