Skip to content

Commit

Permalink
feat(tracing): add sample rate + filtering (#1904)
Browse files Browse the repository at this point in the history
* feat(tracing): add sample rate + filtering

* fix fmt
  • Loading branch information
gurinderu committed Nov 14, 2023
1 parent 5ad1ce7 commit e825dd1
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 30 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 13 additions & 4 deletions crates/server-config/src/resolved_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,13 @@ impl FromStr for LogFormat {
pub enum TracingConfig {
#[serde(rename = "disabled")]
Disabled,
#[serde(rename = "stdout")]
Stdout,
#[serde(rename = "otlp")]
Otlp { endpoint: String },
Otlp {
endpoint: String,
sample_ratio: Option<f64>,
},
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
Expand Down Expand Up @@ -607,6 +612,7 @@ mod tests {
[tracing]
type = "otlp"
endpoint = "test"
sample_ratio = 0.1
"#
)
.expect("Could not write in file");
Expand All @@ -618,7 +624,8 @@ mod tests {
assert_eq!(
config.tracing,
Some(TracingConfig::Otlp {
endpoint: "test".to_string()
endpoint: "test".to_string(),
sample_ratio: Some(0.1)
})
);
});
Expand All @@ -644,7 +651,8 @@ mod tests {
assert_eq!(
config.tracing,
Some(TracingConfig::Otlp {
endpoint: "test".to_string()
endpoint: "test".to_string(),
sample_ratio: None
})
);
},
Expand Down Expand Up @@ -684,7 +692,8 @@ mod tests {
assert_eq!(
config.tracing,
Some(TracingConfig::Otlp {
endpoint: "test".to_string()
endpoint: "test".to_string(),
sample_ratio: None
})
);
});
Expand Down
1 change: 1 addition & 0 deletions nox/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ tracing-opentelemetry = "0.22.0"
opentelemetry = "0.21.0"
opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio-current-thread"] }
opentelemetry-otlp = "0.14.0"
opentelemetry-stdout = { version = "0.2.0", features = ["trace"] }
once_cell = { workspace = true }

[dev-dependencies]
Expand Down
56 changes: 46 additions & 10 deletions nox/src/layers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use console_subscriber::ConsoleLayer;
use eyre::anyhow;
use opentelemetry::KeyValue;
use libp2p::PeerId;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::Sampler;
use opentelemetry_sdk::Resource;
use server_config::{ConsoleConfig, LogConfig, LogFormat, TracingConfig};
use std::net::{SocketAddr, ToSocketAddrs};
Expand All @@ -10,15 +14,15 @@ use tracing::Subscriber;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;

pub fn log_layer<S>(log_config: &Option<LogConfig>) -> impl Layer<S>
pub fn env_filter<S>() -> impl Layer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
let rust_log = std::env::var("RUST_LOG")
.unwrap_or_default()
.replace(char::is_whitespace, "");

let env_filter = tracing_subscriber::EnvFilter::builder()
tracing_subscriber::EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.parse_lossy(rust_log)
.add_directive("cranelift_codegen=off".parse().unwrap())
Expand All @@ -34,8 +38,12 @@ where
.add_directive("soketto=error".parse().unwrap())
.add_directive("cranelift_codegen=error".parse().unwrap())
.add_directive("tracing=error".parse().unwrap())
.add_directive("avm_server::runner=error".parse().unwrap());

.add_directive("avm_server::runner=error".parse().unwrap())
}
pub fn log_layer<S>(log_config: &Option<LogConfig>) -> impl Layer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
let log_format = log_config
.as_ref()
.map(|c| &c.format)
Expand All @@ -47,12 +55,10 @@ where
.with_span_path(false)
.with_span_name(false)
.layer()
.with_filter(env_filter)
.boxed(),
LogFormat::Default => tracing_subscriber::fmt::layer()
.with_thread_ids(true)
.with_thread_names(true)
.with_filter(env_filter)
.boxed(),
};

Expand Down Expand Up @@ -82,15 +88,45 @@ where

pub fn tracing_layer<S>(
tracing_config: &Option<TracingConfig>,
peer_id: PeerId,
version: &str,
) -> eyre::Result<Option<impl Layer<S>>>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
let tracing_config = tracing_config.as_ref().unwrap_or(&TracingConfig::Disabled);
let tracing_layer = match tracing_config {
TracingConfig::Disabled => None,
TracingConfig::Otlp { endpoint } => {
let resource = Resource::new(vec![KeyValue::new("service.name", "rust-peer")]);
TracingConfig::Stdout => {
global::set_text_map_propagator(TraceContextPropagator::new());
let exporter = opentelemetry_stdout::SpanExporter::default();
let provider = opentelemetry_sdk::trace::TracerProvider::builder()
.with_simple_exporter(exporter)
.build();

let tracer = provider.tracer("rust-peer");

let tracing_layer = tracing_opentelemetry::layer::<S>().with_tracer(tracer);
Some(tracing_layer)
}
TracingConfig::Otlp {
endpoint,
sample_ratio,
} => {
global::set_text_map_propagator(TraceContextPropagator::new());
let resource = Resource::new(vec![
KeyValue::new("service.name", "rust-peer"),
KeyValue::new("service.version", version.to_string()),
KeyValue::new("peer_id", peer_id.to_base58()),
]);

let mut config = opentelemetry_sdk::trace::config().with_resource(resource);

if let Some(ratio) = sample_ratio {
config = config.with_sampler(Sampler::ParentBased(Box::new(
Sampler::TraceIdRatioBased(*ratio),
)));
}

let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
Expand All @@ -99,7 +135,7 @@ where
.tonic()
.with_endpoint(endpoint),
)
.with_trace_config(opentelemetry_sdk::trace::config().with_resource(resource))
.with_trace_config(config)
.install_batch(opentelemetry_sdk::runtime::TokioCurrentThread)?;

let tracing_layer = tracing_opentelemetry::layer::<S>().with_tracer(tracer);
Expand Down
1 change: 1 addition & 0 deletions nox/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub use node::Node;
pub use connection_pool::Command as ConnectionPoolCommand;
pub use connectivity::Connectivity;
pub use kademlia::Command as KademliaCommand;
pub use layers::env_filter;
pub use layers::log_layer;
pub use layers::tokio_console_layer;
pub use layers::tracing_layer;
Expand Down
36 changes: 20 additions & 16 deletions nox/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
use base64::{engine::general_purpose::STANDARD as base64, Engine};

use eyre::WrapErr;
use libp2p::PeerId;
use tokio::signal;
use tokio::sync::oneshot;
use tracing_subscriber::layer::SubscriberExt;
Expand All @@ -38,7 +39,7 @@ use air_interpreter_fs::write_default_air_interpreter;
use aquamarine::{VmConfig, AVM};
use config_utils::to_peer_id;
use fs_utils::to_abs_path;
use nox::{log_layer, tokio_console_layer, tracing_layer, Node};
use nox::{env_filter, log_layer, tokio_console_layer, tracing_layer, Node};
use server_config::{load_config, ConfigData, ResolvedConfig};

const VERSION: &str = env!("CARGO_PKG_VERSION");
Expand Down Expand Up @@ -105,23 +106,32 @@ fn main() -> eyre::Result<()> {
.build()
.expect("Could not make tokio runtime")
.block_on(async {
if let Some(true) = config.print_config {
log::info!("Loaded config: {:#?}", config);
}

let resolver_config = config.clone().resolve()?;

let key_pair = resolver_config.node_config.root_key_pair.clone();
let base64_key_pair = base64.encode(key_pair.public().to_vec());
let peer_id = to_peer_id(&key_pair.into());

tracing_subscriber::registry()
.with(env_filter())
.with(log_layer(&config.log))
.with(tokio_console_layer(&config.console)?)
.with(tracing_layer(&config.tracing)?)
.with(tracing_layer(&config.tracing, peer_id, VERSION)?)
.init();

if let Some(true) = config.print_config {
log::info!("Loaded config: {:#?}", config);
}
log::info!("node public key = {}", base64_key_pair);
log::info!("node server peer id = {}", peer_id);

let config = config.resolve()?;

let interpreter_path = to_abs_path(config.dir_config.air_interpreter_path.clone());
let interpreter_path =
to_abs_path(resolver_config.dir_config.air_interpreter_path.clone());
write_default_air_interpreter(&interpreter_path)?;
log::info!("AIR interpreter: {:?}", interpreter_path);

let fluence = start_fluence(config).await?;
let fluence = start_fluence(resolver_config, peer_id).await?;
log::info!("Fluence has been successfully started.");
log::info!("Waiting for Ctrl-C to exit...");

Expand All @@ -134,15 +144,9 @@ fn main() -> eyre::Result<()> {
}

// NOTE: to stop Fluence just call Stoppable::stop()
async fn start_fluence(config: ResolvedConfig) -> eyre::Result<impl Stoppable> {
async fn start_fluence(config: ResolvedConfig, peer_id: PeerId) -> eyre::Result<impl Stoppable> {
log::trace!("starting Fluence");

let key_pair = config.root_key_pair.clone();
let base64_key_pair = base64.encode(key_pair.public().to_vec());
let peer_id = to_peer_id(&key_pair.into());
log::info!("node public key = {}", base64_key_pair);
log::info!("node server peer id = {}", peer_id);

let listen_addrs = config.listen_multiaddrs();
let vm_config = vm_config(&config);

Expand Down

0 comments on commit e825dd1

Please sign in to comment.