Skip to content

Commit

Permalink
add optional support for tokio-console
Browse files Browse the repository at this point in the history
This turned out to be quite hairy, mostly because we need to apply the
config's log level filter to the actual logs (stdout and, optionally
sentry), but do not want to filter out the tokio tracing events needed by
the console_subscriber. I hit several edge cases in tracing getting
this to work, and we now depend on a git version of tracing with a
backported patch :(
  • Loading branch information
Benjamin Lee committed Apr 26, 2024
1 parent 95fb05a commit 9576383
Show file tree
Hide file tree
Showing 8 changed files with 487 additions and 104 deletions.
420 changes: 362 additions & 58 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ rust-version = "1.75.0"


[dependencies]
console-subscriber = { version = "0.1", optional = true }

hot-lib-reloader = { version = "^0.6", optional = true }

# Used for secure identifiers
Expand Down Expand Up @@ -337,6 +339,18 @@ hardened_malloc-rs = { version = "0.1.2", optional = true, features = [
#hardened_malloc-rs = { optional = true, features = ["static","clang","light"], path = "../hardened_malloc-rs", default-features = false }


# backport of [https://github.com/tokio-rs/tracing/pull/2956] to the 0.1.x branch of tracing.
# we can switch back to upstream if #2956 is merged and backported in the upstream repo.
[patch.crates-io.tracing-subscriber]
git = "https://github.com/Benjamin-L/tracing"
branch = "tracing-subscriber/env-filter-clone-0.1.x-backport"
[patch.crates-io.tracing]
git = "https://github.com/Benjamin-L/tracing"
branch = "tracing-subscriber/env-filter-clone-0.1.x-backport"
[patch.crates-io.tracing-core]
git = "https://github.com/Benjamin-L/tracing"
branch = "tracing-subscriber/env-filter-clone-0.1.x-backport"

[features]
default = [
"backend_rocksdb",
Expand Down Expand Up @@ -372,6 +386,10 @@ perf_measurements = [
"opentelemetry-jaeger",
]

# enable the tokio_console server
# incompatible with release_max_log_level
tokio_console = ["console-subscriber", "tokio/tracing"]

hot_reload = ["dep:hot-lib-reloader"]

hardened_malloc = ["hardened_malloc-rs"]
Expand Down
18 changes: 18 additions & 0 deletions docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,21 @@

Information about developing the project. If you are only interested in using
it, you can safely ignore this section.

## Debugging with `tokio-console`

[`tokio-console`][1] can be a useful tool for debugging and profiling. To make
a `tokio-console`-enabled build of Conduwuit, enable the `tokio_console` feature,
disable the default `release_max_log_level` feature, and set the
`--cfg tokio_unstable` flag to enable experimental tokio APIs. A build might
look like this:

```bash
RUSTFLAGS="--cfg tokio_unstable" cargo build \
--release \
--no-default-features \
--features
backend_rocksdb,systemd,element_hacks,sentry_telemetry,gzip_compression,brotli_compression,zstd_compression,tokio_console
```

[1]: https://docs.rs/tokio-console/latest/tokio_console/
12 changes: 3 additions & 9 deletions src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use tokio::time::{interval, Instant};
use tracing::{debug, error, warn};

use crate::{
database::migrations::migrations, service::rooms::timeline::PduCount, services, Config, Error, Result, Services,
SERVICES,
database::migrations::migrations, service::rooms::timeline::PduCount, services, Config, Error,
LogLevelReloadHandles, Result, Services, SERVICES,
};

pub(crate) struct KeyValueDatabase {
Expand Down Expand Up @@ -203,13 +203,7 @@ struct CheckForUpdatesResponse {
impl KeyValueDatabase {
/// Load an existing database or create a new one.
#[allow(clippy::too_many_lines)]
pub(crate) async fn load_or_create(
config: Config,
tracing_reload_handler: tracing_subscriber::reload::Handle<
tracing_subscriber::EnvFilter,
tracing_subscriber::Registry,
>,
) -> Result<()> {
pub(crate) async fn load_or_create(config: Config, tracing_reload_handler: LogLevelReloadHandles) -> Result<()> {
Self::check_db_setup(&config)?;

if !Path::new(&config.database_path).exists() {
Expand Down
103 changes: 79 additions & 24 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::os::unix::fs::PermissionsExt as _; /* not unix specific, just only for
// Not async due to services() being used in many closures, and async closures
// are not stable as of writing This is the case for every other occurence of
// sync Mutex/RwLock, except for database related ones
use std::sync::RwLock;
use std::sync::{Arc, RwLock};
use std::{any::Any, io, net::SocketAddr, sync::atomic, time::Duration};

use api::ruma_wrapper::{Ruma, RumaResponse};
Expand Down Expand Up @@ -56,6 +56,12 @@ mod routes;
mod service;
mod utils;

#[cfg(all(feature = "tokio_console", feature = "release_max_log_level"))]
compile_error!(
"'tokio_console' feature and 'release_max_log_level' feature are incompatible, because console-subscriber needs \
access to trace-level events. 'release_max_log_level' must be disabled to use tokio-console"
);

pub(crate) static SERVICES: RwLock<Option<&'static Services<'static>>> = RwLock::new(None);

#[must_use]
Expand All @@ -79,7 +85,7 @@ struct Server {

runtime: tokio::runtime::Runtime,

tracing_reload_handle: reload::Handle<EnvFilter, Registry>,
tracing_reload_handle: LogLevelReloadHandles,

#[cfg(feature = "sentry_telemetry")]
_sentry_guard: Option<sentry::ClientInitGuard>,
Expand Down Expand Up @@ -547,7 +553,56 @@ fn init_sentry(config: &Config) -> sentry::ClientInitGuard {
))
}

fn init_tracing_sub(config: &Config) -> reload::Handle<EnvFilter, Registry> {
// We need to store a reload::Handle value, but can't name it's type explicitly
// because the S type parameter depends on the subscriber's previous layers. In
// our case, this includes unnameable 'impl Trait' types.
//
// This is fixed[1] in the unreleased tracing-subscriber from the master branch,
// which removes the S parameter. Unfortunately can't use it without pulling in
// a version of tracing that's incompatible with the rest of our deps.
//
// To work around this, we define an trait without the S paramter that forwards
// to the reload::Handle::reload method, and then store the handle as a trait
// object.
//
// [1]: https://github.com/tokio-rs/tracing/pull/1035/commits/8a87ea52425098d3ef8f56d92358c2f6c144a28f
trait ReloadHandle<L> {
fn reload(&self, new_value: L) -> Result<(), reload::Error>;
}

impl<L, S> ReloadHandle<L> for reload::Handle<L, S> {
fn reload(&self, new_value: L) -> Result<(), reload::Error> { reload::Handle::reload(self, new_value) }
}

struct LogLevelReloadHandlesInner {
handles: Vec<Box<dyn ReloadHandle<EnvFilter> + Send + Sync>>,
}

/// Wrapper to allow reloading the filter on several several
/// [`tracing_subscriber::reload::Handle`]s at once, with the same value.
#[derive(Clone)]
struct LogLevelReloadHandles {
inner: Arc<LogLevelReloadHandlesInner>,
}

impl LogLevelReloadHandles {
fn new(handles: Vec<Box<dyn ReloadHandle<EnvFilter> + Send + Sync>>) -> LogLevelReloadHandles {
LogLevelReloadHandles {
inner: Arc::new(LogLevelReloadHandlesInner {
handles,
}),
}
}

fn reload(&self, new_value: &EnvFilter) -> Result<(), reload::Error> {
for handle in &self.inner.handles {
handle.reload(new_value.clone())?;
}
Ok(())
}
}

fn init_tracing_sub(config: &Config) -> LogLevelReloadHandles {
let registry = Registry::default();
let fmt_layer = tracing_subscriber::fmt::Layer::new();
let filter_layer = match EnvFilter::try_new(&config.log) {
Expand All @@ -558,35 +613,34 @@ fn init_tracing_sub(config: &Config) -> reload::Handle<EnvFilter, Registry> {
},
};

let (reload_filter, reload_handle) = reload::Layer::new(filter_layer);
let mut reload_handles = Vec::<Box<dyn ReloadHandle<EnvFilter> + Send + Sync>>::new();
let subscriber = registry;

#[cfg(feature = "sentry_telemetry")]
let sentry_layer = sentry_tracing::layer();
#[cfg(feature = "tokio_console")]
let subscriber = {
let console_layer = console_subscriber::spawn();
subscriber.with(console_layer)
};

let subscriber;
let (fmt_reload_filter, fmt_reload_handle) = reload::Layer::new(filter_layer.clone());
reload_handles.push(Box::new(fmt_reload_handle));
let subscriber = subscriber.with(fmt_layer.with_filter(fmt_reload_filter));

#[allow(clippy::unnecessary_operation)] // error[E0658]: attributes on expressions are experimental
#[cfg(feature = "sentry_telemetry")]
{
subscriber = registry
.with(reload_filter)
.with(fmt_layer)
.with(sentry_layer);
};

#[allow(clippy::unnecessary_operation)] // error[E0658]: attributes on expressions are experimental
#[cfg(not(feature = "sentry_telemetry"))]
{
subscriber = registry.with(reload_filter).with(fmt_layer);
let subscriber = {
let sentry_layer = sentry_tracing::layer();
let (sentry_reload_filter, sentry_reload_handle) = reload::Layer::new(filter_layer);
reload_handles.push(Box::new(sentry_reload_handle));
subscriber.with(sentry_layer.with_filter(sentry_reload_filter))
};

tracing::subscriber::set_global_default(subscriber).unwrap();

reload_handle
LogLevelReloadHandles::new(reload_handles)
}

#[cfg(feature = "perf_measurements")]
fn init_tracing_jaeger(config: &Config) -> reload::Handle<EnvFilter, Registry> {
fn init_tracing_jaeger(config: &Config) -> LogLevelReloadHandles {
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let tracer = opentelemetry_jaeger::new_agent_pipeline()
.with_auto_split_batch(true)
Expand All @@ -609,11 +663,12 @@ fn init_tracing_jaeger(config: &Config) -> reload::Handle<EnvFilter, Registry> {

tracing::subscriber::set_global_default(subscriber).unwrap();

reload_handle
LogLevelReloadHandles::new(vec![Box::new(reload_handle)])
}

// TODO: tokio-console here?
#[cfg(feature = "perf_measurements")]
fn init_tracing_flame(_config: &Config) -> reload::Handle<EnvFilter, Registry> {
fn init_tracing_flame(_config: &Config) -> LogLevelReloadHandles {
let registry = Registry::default();
let (flame_layer, _guard) = tracing_flame::FlameLayer::with_file("./tracing.folded").unwrap();
let flame_layer = flame_layer.with_empty_samples(false);
Expand All @@ -626,7 +681,7 @@ fn init_tracing_flame(_config: &Config) -> reload::Handle<EnvFilter, Registry> {

tracing::subscriber::set_global_default(subscriber).unwrap();

reload_handle
LogLevelReloadHandles::new(vec![Box::new(reload_handle)])
}

// This is needed for opening lots of file descriptors, which tends to
Expand Down
4 changes: 2 additions & 2 deletions src/service/admin/debug/debug_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ pub(crate) async fn change_log_level(
match services()
.globals
.tracing_reload_handle
.modify(|filter| *filter = old_filter_layer)
.reload(&old_filter_layer)
{
Ok(()) => {
return Ok(RoomMessageEventContent::text_plain(format!(
Expand Down Expand Up @@ -360,7 +360,7 @@ pub(crate) async fn change_log_level(
match services()
.globals
.tracing_reload_handle
.modify(|filter| *filter = new_filter_layer)
.reload(&new_filter_layer)
{
Ok(()) => {
return Ok(RoomMessageEventContent::text_plain("Successfully changed log level"));
Expand Down
8 changes: 3 additions & 5 deletions src/service/globals/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ use ruma::{
};
use tokio::sync::{broadcast, watch::Receiver, Mutex, RwLock, Semaphore};
use tracing::{error, info, trace};
use tracing_subscriber::{EnvFilter, Registry};
use url::Url;

use crate::{services, Config, Result};
use crate::{services, Config, LogLevelReloadHandles, Result};

mod client;
mod data;
Expand All @@ -45,7 +44,7 @@ type SyncHandle = (
pub(crate) struct Service<'a> {
pub(crate) db: &'static dyn Data,

pub(crate) tracing_reload_handle: tracing_subscriber::reload::Handle<EnvFilter, Registry>,
pub(crate) tracing_reload_handle: LogLevelReloadHandles,
pub(crate) config: Config,
pub(crate) cidr_range_denylist: Vec<IPAddress>,
keypair: Arc<ruma::signatures::Ed25519KeyPair>,
Expand Down Expand Up @@ -100,8 +99,7 @@ impl Default for RotationHandler {

impl Service<'_> {
pub(crate) fn load(
db: &'static dyn Data, config: &Config,
tracing_reload_handle: tracing_subscriber::reload::Handle<EnvFilter, Registry>,
db: &'static dyn Data, config: &Config, tracing_reload_handle: LogLevelReloadHandles,
) -> Result<Self> {
let keypair = db.load_keypair();

Expand Down
8 changes: 2 additions & 6 deletions src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
use lru_cache::LruCache;
use tokio::sync::{broadcast, Mutex, RwLock};

use crate::{Config, Result};
use crate::{Config, LogLevelReloadHandles, Result};

pub(crate) mod account_data;
pub(crate) mod admin;
Expand Down Expand Up @@ -55,11 +55,7 @@ impl Services<'_> {
+ sending::Data
+ 'static,
>(
db: &'static D, config: &Config,
tracing_reload_handle: tracing_subscriber::reload::Handle<
tracing_subscriber::EnvFilter,
tracing_subscriber::Registry,
>,
db: &'static D, config: &Config, tracing_reload_handle: LogLevelReloadHandles,
) -> Result<Self> {
Ok(Self {
appservice: appservice::Service::build(db)?,
Expand Down

0 comments on commit 9576383

Please sign in to comment.