From bf60d1ba2e571bcbc5c7b76bf316f29065dc4893 Mon Sep 17 00:00:00 2001 From: William Law Date: Fri, 17 Oct 2025 13:55:16 -0400 Subject: [PATCH 01/20] spike tracing approach --- Cargo.lock | 233 +++++++++++++++++++++++++++--- Cargo.toml | 13 ++ crates/ingress-rpc/Cargo.toml | 6 + crates/ingress-rpc/src/main.rs | 66 ++++++++- crates/ingress-rpc/src/service.rs | 6 +- 5 files changed, 300 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4d6f59f..037f3f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6568,6 +6568,41 @@ dependencies = [ "tracing", ] +[[package]] +name = "opentelemetry" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.17", + "tracing", +] + +[[package]] +name = "opentelemetry-datadog" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a6b2d4db32343691eb945e6153e5a4bd494dbf9d931d5bf7d1d7f59bee156d0" +dependencies = [ + "ahash", + "http 1.3.1", + "indexmap 2.11.4", + "itoa", + "opentelemetry 0.31.0", + "opentelemetry-http 0.31.0", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk 0.31.0", + "reqwest", + "rmp", + "ryu", + "thiserror 2.0.17", + "url", +] + [[package]] name = "opentelemetry-http" version = "0.28.0" @@ -6577,11 +6612,24 @@ dependencies = [ "async-trait", "bytes", "http 1.3.1", - "opentelemetry", + "opentelemetry 0.28.0", "reqwest", "tracing", ] +[[package]] +name = "opentelemetry-http" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" +dependencies = [ + "async-trait", + "bytes", + "http 1.3.1", + "opentelemetry 0.31.0", + "reqwest", +] + [[package]] name = "opentelemetry-otlp" version = "0.28.0" @@ -6591,16 +6639,36 @@ dependencies = [ "async-trait", "futures-core", "http 1.3.1", - "opentelemetry", - "opentelemetry-http", - "opentelemetry-proto", - "opentelemetry_sdk", - "prost", + "opentelemetry 0.28.0", + "opentelemetry-http 0.28.0", + "opentelemetry-proto 0.28.0", + "opentelemetry_sdk 0.28.0", + "prost 0.13.5", + "reqwest", + "serde_json", + "thiserror 2.0.17", + "tokio", + "tonic 0.12.3", + "tracing", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" +dependencies = [ + "http 1.3.1", + "opentelemetry 0.31.0", + "opentelemetry-http 0.31.0", + "opentelemetry-proto 0.31.0", + "opentelemetry_sdk 0.31.0", + "prost 0.14.1", "reqwest", "serde_json", "thiserror 2.0.17", "tokio", - "tonic", + "tonic 0.14.2", "tracing", ] @@ -6612,13 +6680,36 @@ checksum = "56f8870d3024727e99212eb3bb1762ec16e255e3e6f58eeb3dc8db1aa226746d" dependencies = [ "base64 0.22.1", "hex", - "opentelemetry", - "opentelemetry_sdk", - "prost", + "opentelemetry 0.28.0", + "opentelemetry_sdk 0.28.0", + "prost 0.13.5", "serde", - "tonic", + "tonic 0.12.3", ] +[[package]] +name = "opentelemetry-proto" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" +dependencies = [ + "base64 0.22.1", + "const-hex", + "opentelemetry 0.31.0", + "opentelemetry_sdk 0.31.0", + "prost 0.14.1", + "serde", + "serde_json", + "tonic 0.14.2", + "tonic-prost", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" + [[package]] name = "opentelemetry_sdk" version = "0.28.0" @@ -6630,7 +6721,7 @@ dependencies = [ "futures-executor", "futures-util", "glob", - "opentelemetry", + "opentelemetry 0.28.0", "percent-encoding", "rand 0.8.5", "serde_json", @@ -6640,6 +6731,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "opentelemetry_sdk" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry 0.31.0", + "percent-encoding", + "rand 0.9.2", + "thiserror 2.0.17", + "tokio", + "tokio-stream", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -7141,7 +7249,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.13.5", +] + +[[package]] +name = "prost" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" +dependencies = [ + "bytes", + "prost-derive 0.14.1", ] [[package]] @@ -7157,6 +7275,19 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "prost-derive" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "pulldown-cmark" version = "0.9.6" @@ -10894,9 +11025,9 @@ dependencies = [ "metrics-util", "moka", "op-alloy-rpc-types-engine", - "opentelemetry", - "opentelemetry-otlp", - "opentelemetry_sdk", + "opentelemetry 0.28.0", + "opentelemetry-otlp 0.28.0", + "opentelemetry_sdk 0.28.0", "parking_lot", "paste", "reth-optimism-payload-builder", @@ -10912,7 +11043,7 @@ dependencies = [ "tower 0.5.2", "tower-http", "tracing", - "tracing-opentelemetry", + "tracing-opentelemetry 0.29.0", "tracing-subscriber 0.3.20", "url", "vergen", @@ -12529,6 +12660,11 @@ dependencies = [ "op-alloy-consensus", "op-alloy-network", "op-revm", + "opentelemetry 0.31.0", + "opentelemetry-datadog", + "opentelemetry-otlp 0.31.0", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk 0.31.0", "rdkafka", "reth-optimism-evm", "reth-rpc-eth-types", @@ -12536,6 +12672,7 @@ dependencies = [ "serde_json", "tokio", "tracing", + "tracing-opentelemetry 0.32.0", "tracing-subscriber 0.3.20", "url", ] @@ -12798,7 +12935,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", + "prost 0.13.5", "socket2 0.5.10", "tokio", "tokio-stream", @@ -12808,6 +12945,43 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.7.0", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "sync_wrapper", + "tokio", + "tokio-stream", + "tower 0.5.2", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-prost" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" +dependencies = [ + "bytes", + "prost 0.14.1", + "tonic 0.14.2", +] + [[package]] name = "tower" version = "0.4.13" @@ -12988,8 +13162,8 @@ checksum = "721f2d2569dce9f3dfbbddee5906941e953bfcdf736a62da3377f5751650cc36" dependencies = [ "js-sys", "once_cell", - "opentelemetry", - "opentelemetry_sdk", + "opentelemetry 0.28.0", + "opentelemetry_sdk 0.28.0", "smallvec", "tracing", "tracing-core", @@ -12998,6 +13172,25 @@ dependencies = [ "web-time", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e6e5658463dd88089aba75c7791e1d3120633b1bfde22478b28f625a9bb1b8e" +dependencies = [ + "js-sys", + "opentelemetry 0.31.0", + "opentelemetry_sdk 0.31.0", + "rustversion", + "smallvec", + "thiserror 2.0.17", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber 0.3.20", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 9114e9f..f91d86e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,3 +72,16 @@ backon = "1.5.2" op-revm = { version = "10.1.0", default-features = false } revm-context-interface = "10.2.0" alloy-signer-local = "1.0.36" + +opentelemetry = { version = "0.31.0", features = ["trace"] } +opentelemetry-otlp = { version = "0.31.0", features = [ + "http-proto", + "http-json", + "reqwest-client", + "trace", + "grpc-tonic", +] } +opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] } +tracing-opentelemetry = "0.32.0" +opentelemetry-datadog = { version = "0.19.0", features = ["reqwest-client"] } +opentelemetry-semantic-conventions = "0.31.0" diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 13a581b..9364bb2 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -35,3 +35,9 @@ op-revm.workspace = true revm-context-interface.workspace = true alloy-signer-local.workspace = true reth-optimism-evm.workspace = true +opentelemetry.workspace = true +opentelemetry-otlp.workspace = true +opentelemetry_sdk.workspace = true +tracing-opentelemetry.workspace = true +opentelemetry-datadog.workspace = true +opentelemetry-semantic-conventions.workspace = true diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 6400bc0..e180b0b 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -2,11 +2,20 @@ use alloy_provider::{ProviderBuilder, RootProvider}; use clap::Parser; use jsonrpsee::server::Server; use op_alloy_network::Optimism; +use opentelemetry::global; +use opentelemetry::trace::Tracer; +use opentelemetry::{InstrumentationScope, trace::TracerProvider}; +use opentelemetry_sdk::trace; +use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler}; +use opentelemetry_semantic_conventions as semcov; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; +use std::env; use std::fs; use std::net::IpAddr; use tracing::{info, warn}; +use tracing_subscriber::Layer; +use tracing_subscriber::filter::{LevelFilter, Targets}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use url::Url; @@ -57,6 +66,14 @@ struct Config { default_value = "10800" )] send_transaction_default_lifetime_seconds: u64, + + /// Enable tracing + #[arg(long, env = "TIPS_INGRESS_TRACING_ENABLED", default_value = "false")] + tracing_enabled: bool, + + /// Port for the OTLP endpoint + #[arg(long, env = "TIPS_INGRESS_TRACING_OTLP_PORT", default_value = "4317")] + tracing_otlp_port: u16, } #[tokio::main] @@ -80,18 +97,63 @@ async fn main() -> anyhow::Result<()> { } }; - tracing_subscriber::registry() + /*tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(log_level.to_string())), ) .with(tracing_subscriber::fmt::layer()) .init(); + + let global_filter = Targets::new() + .with_default(LevelFilter::INFO) + .with_target(env!("CARGO_PKG_NAME"), LevelFilter::TRACE); + */ + + let log_filter = Targets::new() + .with_default(LevelFilter::INFO) + .with_target(env!("CARGO_PKG_NAME"), log_level); + + let dd_host = env::var("DD_AGENT_HOST").unwrap_or_else(|_| "localhost".to_string()); + let otlp_endpoint = format!("http://{}:{}", dd_host, config.tracing_otlp_port); + + let mut trace_config = trace::Config::default(); + trace_config.sampler = Box::new(Sampler::AlwaysOn); + trace_config.id_generator = Box::new(RandomIdGenerator::default()); + + let provider = opentelemetry_datadog::new_pipeline() + .with_service_name(env!("CARGO_PKG_NAME")) + .with_api_version(opentelemetry_datadog::ApiVersion::Version05) + .with_agent_endpoint(&otlp_endpoint) + .with_trace_config(trace_config) + .install_batch() + .unwrap(); + global::set_tracer_provider(provider.clone()); + + let scope = InstrumentationScope::builder("opentelemetry-datadog") + .with_version(env!("CARGO_PKG_VERSION")) + .with_schema_url(semcov::SCHEMA_URL) + .with_attributes(None) + .build(); + + let tracer = provider.tracer_with_scope(scope); + tracer.in_span("span_main", |_span| { + info!( + message = "Tracing enabled", + endpoint = %otlp_endpoint + ); + }); + tracing_subscriber::registry() + .with(tracing_opentelemetry::OpenTelemetryLayer::new(tracer)) + .with(tracing_subscriber::fmt::layer().with_filter(log_filter)) + .init(); + info!( message = "Starting ingress service", address = %config.address, port = config.port, - mempool_url = %config.mempool_url + mempool_url = %config.mempool_url, + endpoint = %otlp_endpoint ); let provider: RootProvider = ProviderBuilder::new() diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index a48841d..e4e833d 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -11,7 +11,7 @@ use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::Optimism; use reth_rpc_eth_types::EthApiError; use std::time::{SystemTime, UNIX_EPOCH}; -use tracing::{info, warn}; +use tracing::{Instrument, info, span, warn}; use crate::queue::QueuePublisher; @@ -111,7 +111,9 @@ where // queue the bundle let sender = transaction.signer(); - if let Err(e) = self.queue.publish(&bundle, sender).await { + let span = + span!(tracing::Level::INFO, "span_publish", transaction = %transaction.tx_hash()); + if let Err(e) = self.queue.publish(&bundle, sender).instrument(span).await { warn!(message = "Failed to publish Queue::enqueue_bundle", sender = %sender, error = %e); } From 27e9ad247da2d076f47ebabb0f1db7e8105eadb8 Mon Sep 17 00:00:00 2001 From: William Law Date: Fri, 17 Oct 2025 14:21:23 -0400 Subject: [PATCH 02/20] shutdown span tracer --- crates/ingress-rpc/src/main.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index e180b0b..476b944 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -156,7 +156,7 @@ async fn main() -> anyhow::Result<()> { endpoint = %otlp_endpoint ); - let provider: RootProvider = ProviderBuilder::new() + let op_provider: RootProvider = ProviderBuilder::new() .disable_recommended_fillers() .network::() .connect_http(config.mempool_url); @@ -168,7 +168,7 @@ async fn main() -> anyhow::Result<()> { let queue = KafkaQueuePublisher::new(queue_producer, config.ingress_topic); let service = IngressService::new( - provider, + op_provider, config.dual_write_mempool, queue, config.send_transaction_default_lifetime_seconds, @@ -185,6 +185,7 @@ async fn main() -> anyhow::Result<()> { ); handle.stopped().await; + let _ = provider.shutdown(); Ok(()) } From ca1fbbfc756b5bcae9ebbc56abd5cc40f06a581f Mon Sep 17 00:00:00 2001 From: William Law Date: Fri, 17 Oct 2025 14:45:45 -0400 Subject: [PATCH 03/20] fix span exporter again --- crates/ingress-rpc/src/main.rs | 92 +++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 28 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 476b944..ea5080f 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -121,32 +121,48 @@ async fn main() -> anyhow::Result<()> { trace_config.sampler = Box::new(Sampler::AlwaysOn); trace_config.id_generator = Box::new(RandomIdGenerator::default()); - let provider = opentelemetry_datadog::new_pipeline() - .with_service_name(env!("CARGO_PKG_NAME")) - .with_api_version(opentelemetry_datadog::ApiVersion::Version05) - .with_agent_endpoint(&otlp_endpoint) - .with_trace_config(trace_config) - .install_batch() - .unwrap(); - global::set_tracer_provider(provider.clone()); - - let scope = InstrumentationScope::builder("opentelemetry-datadog") - .with_version(env!("CARGO_PKG_VERSION")) - .with_schema_url(semcov::SCHEMA_URL) - .with_attributes(None) - .build(); - - let tracer = provider.tracer_with_scope(scope); - tracer.in_span("span_main", |_span| { - info!( - message = "Tracing enabled", - endpoint = %otlp_endpoint - ); - }); - tracing_subscriber::registry() - .with(tracing_opentelemetry::OpenTelemetryLayer::new(tracer)) - .with(tracing_subscriber::fmt::layer().with_filter(log_filter)) - .init(); + let provider = if config.tracing_enabled { + Some( + opentelemetry_datadog::new_pipeline() + .with_service_name(env!("CARGO_PKG_NAME")) + .with_api_version(opentelemetry_datadog::ApiVersion::Version05) + .with_agent_endpoint(&otlp_endpoint) + .with_trace_config(trace_config) + .install_batch() + .unwrap(), + ) + } else { + None + }; + + if let Some(ref provider) = provider { + global::set_tracer_provider(provider.clone()); + } + + if let Some(ref provider) = provider { + let scope = InstrumentationScope::builder("opentelemetry-datadog") + .with_version(env!("CARGO_PKG_VERSION")) + .with_schema_url(semcov::SCHEMA_URL) + .with_attributes(None) + .build(); + + let tracer = provider.tracer_with_scope(scope); + tracer.in_span("span_main", |_span| { + info!( + message = "Tracing enabled", + endpoint = %otlp_endpoint + ); + }); + + tracing_subscriber::registry() + .with(tracing_opentelemetry::OpenTelemetryLayer::new(tracer)) + .with(tracing_subscriber::fmt::layer().with_filter(log_filter)) + .init(); + } else { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().with_filter(log_filter)) + .init(); + } info!( message = "Starting ingress service", @@ -184,8 +200,28 @@ async fn main() -> anyhow::Result<()> { address = %addr ); - handle.stopped().await; - let _ = provider.shutdown(); + // Set up graceful shutdown + tokio::select! { + _ = handle.stopped() => { + info!("Server stopped"); + } + _ = tokio::signal::ctrl_c() => { + info!("Received shutdown signal"); + } + } + + // Give time for any remaining spans to be processed + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + // Shutdown tracer provider if it exists + if let Some(provider) = provider { + info!("Shutting down tracer provider"); + if let Err(e) = provider.shutdown() { + warn!("Error shutting down tracer provider: {}", e); + } + } + + info!("Ingress service shutdown complete"); Ok(()) } From 8e3838b56383a36e3be2841932ade2e35faec927 Mon Sep 17 00:00:00 2001 From: William Law Date: Fri, 17 Oct 2025 15:35:35 -0400 Subject: [PATCH 04/20] follow commonware --- Cargo.toml | 2 +- crates/ingress-rpc/src/main.rs | 155 ++++++++++++++++-------------- crates/ingress-rpc/src/service.rs | 7 ++ 3 files changed, 93 insertions(+), 71 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f91d86e..dd288f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ op-alloy-consensus = { version = "0.20.0", features = ["k256"] } tokio = { version = "1.47.1", features = ["full"] } tracing = "0.1.41" -tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } +tracing-subscriber = { version = "0.3.20", features = ["env-filter", "fmt", "json"] } anyhow = "1.0.99" clap = { version = "4.5.47", features = ["derive", "env"] } url = "2.5.7" diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index ea5080f..b25835a 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -3,20 +3,26 @@ use clap::Parser; use jsonrpsee::server::Server; use op_alloy_network::Optimism; use opentelemetry::global; -use opentelemetry::trace::Tracer; -use opentelemetry::{InstrumentationScope, trace::TracerProvider}; -use opentelemetry_sdk::trace; -use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler}; -use opentelemetry_semantic_conventions as semcov; +//use opentelemetry::trace::Tracer; +//use opentelemetry::{InstrumentationScope, trace::TracerProvider}; +//use opentelemetry_sdk::trace; +use opentelemetry::trace::TracerProvider; +use opentelemetry_sdk::trace::BatchSpanProcessor; +use opentelemetry_sdk::trace::Sampler; +//use opentelemetry_semantic_conventions as semcov; +use opentelemetry_sdk::Resource; +use opentelemetry_sdk::trace::SdkTracerProvider; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; use std::env; use std::fs; use std::net::IpAddr; use tracing::{info, warn}; -use tracing_subscriber::Layer; -use tracing_subscriber::filter::{LevelFilter, Targets}; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +//use tracing_subscriber::Layer; +//use tracing_subscriber::filter::{LevelFilter, Targets}; +use opentelemetry_otlp::{SpanExporter, WithExportConfig}; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::{Layer, Registry}; use url::Url; mod queue; @@ -110,6 +116,8 @@ async fn main() -> anyhow::Result<()> { .with_target(env!("CARGO_PKG_NAME"), LevelFilter::TRACE); */ + /*global::set_text_map_propagator(opentelemetry_datadog::DatadogPropagator::default()); + let log_filter = Targets::new() .with_default(LevelFilter::INFO) .with_target(env!("CARGO_PKG_NAME"), log_level); @@ -121,48 +129,76 @@ async fn main() -> anyhow::Result<()> { trace_config.sampler = Box::new(Sampler::AlwaysOn); trace_config.id_generator = Box::new(RandomIdGenerator::default()); - let provider = if config.tracing_enabled { - Some( - opentelemetry_datadog::new_pipeline() - .with_service_name(env!("CARGO_PKG_NAME")) - .with_api_version(opentelemetry_datadog::ApiVersion::Version05) - .with_agent_endpoint(&otlp_endpoint) - .with_trace_config(trace_config) - .install_batch() - .unwrap(), - ) - } else { - None - }; + let provider = opentelemetry_datadog::new_pipeline() + .with_service_name(env!("CARGO_PKG_NAME")) + .with_api_version(opentelemetry_datadog::ApiVersion::Version05) + .with_agent_endpoint(&otlp_endpoint) + .with_trace_config(trace_config) + .install_batch()?; + + global::set_tracer_provider(provider.clone()); + + let scope = InstrumentationScope::builder(env!("CARGO_PKG_NAME")) + .with_version(env!("CARGO_PKG_VERSION")) + .with_schema_url(semcov::SCHEMA_URL) + .with_attributes(None) + .build(); + + let tracer = provider.tracer_with_scope(scope); + tracer.in_span("span_main", |_span| { + info!( + message = "Tracing enabled", + endpoint = %otlp_endpoint + ); + }); + + tracing_subscriber::registry() + .with(tracing_opentelemetry::OpenTelemetryLayer::new(tracer)) + .with(tracing_subscriber::fmt::layer().with_filter(log_filter)) + .init();*/ + + let filter = tracing_subscriber::EnvFilter::new(log_level.to_string()); + + let log_layer = tracing_subscriber::fmt::layer() + .with_line_number(true) + .with_thread_ids(true) + .with_file(true) + .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE) + .json() + .boxed(); - if let Some(ref provider) = provider { - global::set_tracer_provider(provider.clone()); - } + let dd_host = env::var("DD_AGENT_HOST").unwrap_or_else(|_| "localhost".to_string()); + let otlp_endpoint = format!("http://{}:{}", dd_host, config.tracing_otlp_port); - if let Some(ref provider) = provider { - let scope = InstrumentationScope::builder("opentelemetry-datadog") - .with_version(env!("CARGO_PKG_VERSION")) - .with_schema_url(semcov::SCHEMA_URL) - .with_attributes(None) - .build(); - - let tracer = provider.tracer_with_scope(scope); - tracer.in_span("span_main", |_span| { - info!( - message = "Tracing enabled", - endpoint = %otlp_endpoint - ); - }); - - tracing_subscriber::registry() - .with(tracing_opentelemetry::OpenTelemetryLayer::new(tracer)) - .with(tracing_subscriber::fmt::layer().with_filter(log_filter)) - .init(); - } else { - tracing_subscriber::registry() - .with(tracing_subscriber::fmt::layer().with_filter(log_filter)) - .init(); - } + // https://github.com/commonwarexyz/monorepo/blob/27e6f73fce91fc46ef7170e928cbcf96cc635fea/runtime/src/tokio/tracing.rs#L10 + let exporter = SpanExporter::builder() + .with_http() + .with_endpoint(&otlp_endpoint) + .build()?; + + let batch_processor = BatchSpanProcessor::builder(exporter).build(); + + let resource = Resource::builder_empty() + .with_service_name(env!("CARGO_PKG_NAME")) + .build(); + + let tracer_provider = SdkTracerProvider::builder() + .with_span_processor(batch_processor) + .with_resource(resource) + .with_sampler(Sampler::AlwaysOn) + .build(); + + // Create the tracer and set it globally + let tracer = tracer_provider.tracer(env!("CARGO_PKG_NAME")); + global::set_tracer_provider(tracer_provider); + + let trace_layer = tracing_opentelemetry::layer().with_tracer(tracer); + + let register = Registry::default() + .with(filter) + .with(log_layer) + .with(trace_layer); + tracing::subscriber::set_global_default(register)?; info!( message = "Starting ingress service", @@ -200,28 +236,7 @@ async fn main() -> anyhow::Result<()> { address = %addr ); - // Set up graceful shutdown - tokio::select! { - _ = handle.stopped() => { - info!("Server stopped"); - } - _ = tokio::signal::ctrl_c() => { - info!("Received shutdown signal"); - } - } - - // Give time for any remaining spans to be processed - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - - // Shutdown tracer provider if it exists - if let Some(provider) = provider { - info!("Shutting down tracer provider"); - if let Err(e) = provider.shutdown() { - warn!("Error shutting down tracer provider: {}", e); - } - } - - info!("Ingress service shutdown complete"); + handle.stopped().await; Ok(()) } diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index e4e833d..8eb8ae1 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -94,12 +94,18 @@ where .await?; validate_tx(account, &transaction, &data, &mut l1_block_info).await?; + let span = span!(tracing::Level::TRACE, "span_ethsendbundle", transaction = %transaction.tx_hash()); + let _enter = span.enter(); let expiry_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs() + self.send_transaction_default_lifetime_seconds; + drop(_enter); + let span = + span!(tracing::Level::INFO, "span_ethsendbundle", transaction = %transaction.tx_hash()); + let _enter = span.enter(); let bundle = EthSendBundle { txs: vec![data.clone()], block_number: 0, @@ -108,6 +114,7 @@ where reverting_tx_hashes: vec![transaction.tx_hash()], ..Default::default() }; + drop(_enter); // queue the bundle let sender = transaction.signer(); From 33e20d537e35c40e902f1bbf7e49e945e1c19c9c Mon Sep 17 00:00:00 2001 From: William Law Date: Fri, 17 Oct 2025 17:47:30 -0400 Subject: [PATCH 05/20] try http client on commonware approach too --- Cargo.lock | 13 +++++++++++++ Cargo.toml | 1 + crates/ingress-rpc/Cargo.toml | 1 + crates/ingress-rpc/src/main.rs | 2 ++ 4 files changed, 17 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 037f3f2..b348ad9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3568,6 +3568,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "endian-type" version = "0.1.2" @@ -7695,9 +7704,11 @@ checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" dependencies = [ "base64 0.22.1", "bytes", + "encoding_rs", "futures-channel", "futures-core", "futures-util", + "h2 0.4.12", "http 1.3.1", "http-body 1.0.1", "http-body-util", @@ -7707,6 +7718,7 @@ dependencies = [ "hyper-util", "js-sys", "log", + "mime", "native-tls", "percent-encoding", "pin-project-lite", @@ -12666,6 +12678,7 @@ dependencies = [ "opentelemetry-semantic-conventions", "opentelemetry_sdk 0.31.0", "rdkafka", + "reqwest", "reth-optimism-evm", "reth-rpc-eth-types", "revm-context-interface", diff --git a/Cargo.toml b/Cargo.toml index dd288f8..fc6aa43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,3 +85,4 @@ opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] } tracing-opentelemetry = "0.32.0" opentelemetry-datadog = { version = "0.19.0", features = ["reqwest-client"] } opentelemetry-semantic-conventions = "0.31.0" +reqwest = { version = "0.12", features = ["json"] } diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 9364bb2..851dc98 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -41,3 +41,4 @@ opentelemetry_sdk.workspace = true tracing-opentelemetry.workspace = true opentelemetry-datadog.workspace = true opentelemetry-semantic-conventions.workspace = true +reqwest.workspace = true diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index b25835a..6e82c34 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -7,6 +7,7 @@ use opentelemetry::global; //use opentelemetry::{InstrumentationScope, trace::TracerProvider}; //use opentelemetry_sdk::trace; use opentelemetry::trace::TracerProvider; +use opentelemetry_otlp::WithHttpConfig; use opentelemetry_sdk::trace::BatchSpanProcessor; use opentelemetry_sdk::trace::Sampler; //use opentelemetry_semantic_conventions as semcov; @@ -173,6 +174,7 @@ async fn main() -> anyhow::Result<()> { // https://github.com/commonwarexyz/monorepo/blob/27e6f73fce91fc46ef7170e928cbcf96cc635fea/runtime/src/tokio/tracing.rs#L10 let exporter = SpanExporter::builder() .with_http() + .with_http_client(reqwest::Client::new()) .with_endpoint(&otlp_endpoint) .build()?; From cd81a8dc82b9d6c34cbb1491e64ca0dd8983773e Mon Sep 17 00:00:00 2001 From: William Law Date: Sat, 18 Oct 2025 16:37:47 -0400 Subject: [PATCH 06/20] try simplespanexporter --- Cargo.lock | 29 ----------------------------- Cargo.toml | 2 -- crates/ingress-rpc/Cargo.toml | 2 -- crates/ingress-rpc/src/main.rs | 9 ++++++--- 4 files changed, 6 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b348ad9..52a94c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6591,27 +6591,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "opentelemetry-datadog" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a6b2d4db32343691eb945e6153e5a4bd494dbf9d931d5bf7d1d7f59bee156d0" -dependencies = [ - "ahash", - "http 1.3.1", - "indexmap 2.11.4", - "itoa", - "opentelemetry 0.31.0", - "opentelemetry-http 0.31.0", - "opentelemetry-semantic-conventions", - "opentelemetry_sdk 0.31.0", - "reqwest", - "rmp", - "ryu", - "thiserror 2.0.17", - "url", -] - [[package]] name = "opentelemetry-http" version = "0.28.0" @@ -6713,12 +6692,6 @@ dependencies = [ "tonic-prost", ] -[[package]] -name = "opentelemetry-semantic-conventions" -version = "0.31.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" - [[package]] name = "opentelemetry_sdk" version = "0.28.0" @@ -12673,9 +12646,7 @@ dependencies = [ "op-alloy-network", "op-revm", "opentelemetry 0.31.0", - "opentelemetry-datadog", "opentelemetry-otlp 0.31.0", - "opentelemetry-semantic-conventions", "opentelemetry_sdk 0.31.0", "rdkafka", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index fc6aa43..b3bc52a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,6 +83,4 @@ opentelemetry-otlp = { version = "0.31.0", features = [ ] } opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] } tracing-opentelemetry = "0.32.0" -opentelemetry-datadog = { version = "0.19.0", features = ["reqwest-client"] } -opentelemetry-semantic-conventions = "0.31.0" reqwest = { version = "0.12", features = ["json"] } diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 851dc98..65686dd 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -39,6 +39,4 @@ opentelemetry.workspace = true opentelemetry-otlp.workspace = true opentelemetry_sdk.workspace = true tracing-opentelemetry.workspace = true -opentelemetry-datadog.workspace = true -opentelemetry-semantic-conventions.workspace = true reqwest.workspace = true diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 6e82c34..513b40f 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -8,8 +8,9 @@ use opentelemetry::global; //use opentelemetry_sdk::trace; use opentelemetry::trace::TracerProvider; use opentelemetry_otlp::WithHttpConfig; -use opentelemetry_sdk::trace::BatchSpanProcessor; +//use opentelemetry_sdk::trace::BatchSpanProcessor; use opentelemetry_sdk::trace::Sampler; +use opentelemetry_sdk::trace::SimpleSpanProcessor; //use opentelemetry_semantic_conventions as semcov; use opentelemetry_sdk::Resource; use opentelemetry_sdk::trace::SdkTracerProvider; @@ -174,11 +175,13 @@ async fn main() -> anyhow::Result<()> { // https://github.com/commonwarexyz/monorepo/blob/27e6f73fce91fc46ef7170e928cbcf96cc635fea/runtime/src/tokio/tracing.rs#L10 let exporter = SpanExporter::builder() .with_http() - .with_http_client(reqwest::Client::new()) + .with_http_client(reqwest::blocking::Client::new()) + //.with_tonic() .with_endpoint(&otlp_endpoint) .build()?; - let batch_processor = BatchSpanProcessor::builder(exporter).build(); + //let batch_processor = BatchSpanProcessor::builder(exporter).build(); + let batch_processor = SimpleSpanProcessor::new(exporter); let resource = Resource::builder_empty() .with_service_name(env!("CARGO_PKG_NAME")) From cb921ae306578cb6c1cfcf79e86f13601af9f3be Mon Sep 17 00:00:00 2001 From: William Law Date: Sat, 18 Oct 2025 16:58:44 -0400 Subject: [PATCH 07/20] try grpc --- crates/ingress-rpc/src/main.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 513b40f..6f86211 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -7,7 +7,7 @@ use opentelemetry::global; //use opentelemetry::{InstrumentationScope, trace::TracerProvider}; //use opentelemetry_sdk::trace; use opentelemetry::trace::TracerProvider; -use opentelemetry_otlp::WithHttpConfig; +//use opentelemetry_otlp::WithHttpConfig; //use opentelemetry_sdk::trace::BatchSpanProcessor; use opentelemetry_sdk::trace::Sampler; use opentelemetry_sdk::trace::SimpleSpanProcessor; @@ -174,9 +174,9 @@ async fn main() -> anyhow::Result<()> { // https://github.com/commonwarexyz/monorepo/blob/27e6f73fce91fc46ef7170e928cbcf96cc635fea/runtime/src/tokio/tracing.rs#L10 let exporter = SpanExporter::builder() - .with_http() - .with_http_client(reqwest::blocking::Client::new()) - //.with_tonic() + //.with_http() + //.with_http_client(reqwest::blocking::Client::new()) + .with_tonic() .with_endpoint(&otlp_endpoint) .build()?; From c3a100dff69f9f435cfc15987bab8d8e947959a1 Mon Sep 17 00:00:00 2001 From: William Law Date: Sun, 19 Oct 2025 10:58:37 -0400 Subject: [PATCH 08/20] wip --- crates/ingress-rpc/src/main.rs | 121 +++++++++++++++++++++------------ 1 file changed, 77 insertions(+), 44 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 6f86211..e5e59cd 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -9,11 +9,12 @@ use opentelemetry::global; use opentelemetry::trace::TracerProvider; //use opentelemetry_otlp::WithHttpConfig; //use opentelemetry_sdk::trace::BatchSpanProcessor; -use opentelemetry_sdk::trace::Sampler; -use opentelemetry_sdk::trace::SimpleSpanProcessor; +//use opentelemetry_sdk::trace::Sampler; +//use opentelemetry_sdk::trace::SimpleSpanProcessor; +use opentelemetry_sdk::trace::SpanProcessor; //use opentelemetry_semantic_conventions as semcov; use opentelemetry_sdk::Resource; -use opentelemetry_sdk::trace::SdkTracerProvider; +//use opentelemetry_sdk::trace::SdkTracerProvider; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; use std::env; @@ -22,10 +23,12 @@ use std::net::IpAddr; use tracing::{info, warn}; //use tracing_subscriber::Layer; //use tracing_subscriber::filter::{LevelFilter, Targets}; -use opentelemetry_otlp::{SpanExporter, WithExportConfig}; +//use opentelemetry_otlp::SpanExporter; +use opentelemetry_otlp::{WithExportConfig}; use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::{Layer, Registry}; +use tracing_subscriber::{Layer}; use url::Url; +use anyhow::Context; mod queue; mod service; @@ -84,6 +87,29 @@ struct Config { tracing_otlp_port: u16, } +#[derive(Debug)] +pub struct OtlpSpanProcessor; + +impl SpanProcessor for OtlpSpanProcessor { + fn on_start(&self, _span: &mut opentelemetry_sdk::trace::Span, _cx: &opentelemetry::Context) {} + + fn on_end(&self, _span: opentelemetry_sdk::trace::SpanData) {} + + fn force_flush(&self) -> opentelemetry_sdk::error::OTelSdkResult { + Ok(()) + } + + fn shutdown(&self) -> opentelemetry_sdk::error::OTelSdkResult { + Ok(()) + } + + fn shutdown_with_timeout(&self, _timeout: tokio::time::Duration) -> opentelemetry_sdk::error::OTelSdkResult { + Ok(()) + } + + fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) {} +} + #[tokio::main] async fn main() -> anyhow::Result<()> { dotenvy::dotenv().ok(); @@ -159,51 +185,58 @@ async fn main() -> anyhow::Result<()> { .with(tracing_subscriber::fmt::layer().with_filter(log_filter)) .init();*/ - let filter = tracing_subscriber::EnvFilter::new(log_level.to_string()); + // https://github.com/flashbots/rollup-boost/blob/08ebd3e75a8f4c7ebc12db13b042dee04e132c05/crates/rollup-boost/src/tracing.rs#L127 + let global_filter = tracing_subscriber::filter::Targets::new() + .with_default(tracing_subscriber::filter::LevelFilter::INFO) + .with_target(env!("CARGO_PKG_NAME"), tracing_subscriber::filter::LevelFilter::TRACE); + + let registry = tracing_subscriber::registry().with(global_filter); - let log_layer = tracing_subscriber::fmt::layer() - .with_line_number(true) - .with_thread_ids(true) - .with_file(true) - .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE) - .json() - .boxed(); + let log_filter = tracing_subscriber::filter::Targets::new() + .with_default(tracing_subscriber::filter::LevelFilter::INFO) + .with_target(env!("CARGO_PKG_NAME"), log_level); + + let writer = tracing_subscriber::fmt::writer::BoxMakeWriter::new(std::io::stdout); let dd_host = env::var("DD_AGENT_HOST").unwrap_or_else(|_| "localhost".to_string()); let otlp_endpoint = format!("http://{}:{}", dd_host, config.tracing_otlp_port); - - // https://github.com/commonwarexyz/monorepo/blob/27e6f73fce91fc46ef7170e928cbcf96cc635fea/runtime/src/tokio/tracing.rs#L10 - let exporter = SpanExporter::builder() - //.with_http() - //.with_http_client(reqwest::blocking::Client::new()) + + global::set_text_map_propagator(opentelemetry_sdk::propagation::TraceContextPropagator::new()); + let otlp_exporter = opentelemetry_otlp::SpanExporter::builder() .with_tonic() .with_endpoint(&otlp_endpoint) - .build()?; - - //let batch_processor = BatchSpanProcessor::builder(exporter).build(); - let batch_processor = SimpleSpanProcessor::new(exporter); - - let resource = Resource::builder_empty() - .with_service_name(env!("CARGO_PKG_NAME")) - .build(); - - let tracer_provider = SdkTracerProvider::builder() - .with_span_processor(batch_processor) - .with_resource(resource) - .with_sampler(Sampler::AlwaysOn) - .build(); - - // Create the tracer and set it globally - let tracer = tracer_provider.tracer(env!("CARGO_PKG_NAME")); - global::set_tracer_provider(tracer_provider); - - let trace_layer = tracing_opentelemetry::layer().with_tracer(tracer); - - let register = Registry::default() - .with(filter) - .with(log_layer) - .with(trace_layer); - tracing::subscriber::set_global_default(register)?; + .build() + .context("Failed to create OTLP exporter")?; + + let provider_builder = opentelemetry_sdk::trace::SdkTracerProvider::builder() + .with_batch_exporter(otlp_exporter) + .with_resource( + Resource::builder_empty() + .with_attributes([ + opentelemetry::KeyValue::new("service.name", env!("CARGO_PKG_NAME")), + opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")), + ]) + .build(), + ) + .with_span_processor(OtlpSpanProcessor); + + let provider = provider_builder.build(); + let tracer = provider.tracer(env!("CARGO_PKG_NAME")); + + let trace_filter = tracing_subscriber::filter::Targets::new() + .with_default(tracing_subscriber::filter::LevelFilter::OFF) + .with_target(env!("CARGO_PKG_NAME"), tracing_subscriber::filter::LevelFilter::TRACE); + + let registry = registry.with(tracing_opentelemetry::OpenTelemetryLayer::new(tracer).with_filter(trace_filter)); + tracing::subscriber::set_global_default( + registry.with( + tracing_subscriber::fmt::layer() + .json() + .with_ansi(false) + .with_writer(writer) + .with_filter(log_filter.clone()), + ), + )?; info!( message = "Starting ingress service", From dcffe128d94c5c2366f872d23d062832052fb58c Mon Sep 17 00:00:00 2001 From: William Law Date: Sun, 19 Oct 2025 17:52:22 -0400 Subject: [PATCH 09/20] follow rollup-boost --- crates/ingress-rpc/src/main.rs | 99 +++++++++------------------------- 1 file changed, 24 insertions(+), 75 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index e5e59cd..235c9da 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -11,7 +11,8 @@ use opentelemetry::trace::TracerProvider; //use opentelemetry_sdk::trace::BatchSpanProcessor; //use opentelemetry_sdk::trace::Sampler; //use opentelemetry_sdk::trace::SimpleSpanProcessor; -use opentelemetry_sdk::trace::SpanProcessor; +//use opentelemetry_sdk::trace::SpanProcessor; +use tracing_opentelemetry::OpenTelemetryLayer; //use opentelemetry_semantic_conventions as semcov; use opentelemetry_sdk::Resource; //use opentelemetry_sdk::trace::SdkTracerProvider; @@ -21,14 +22,14 @@ use std::env; use std::fs; use std::net::IpAddr; use tracing::{info, warn}; -//use tracing_subscriber::Layer; -//use tracing_subscriber::filter::{LevelFilter, Targets}; +use tracing_subscriber::Layer; +use tracing_subscriber::filter::{LevelFilter, Targets}; //use opentelemetry_otlp::SpanExporter; -use opentelemetry_otlp::{WithExportConfig}; +use opentelemetry_otlp::WithExportConfig; use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::{Layer}; -use url::Url; +//use tracing_subscriber::{Layer}; use anyhow::Context; +use url::Url; mod queue; mod service; @@ -87,6 +88,7 @@ struct Config { tracing_otlp_port: u16, } +/* #[derive(Debug)] pub struct OtlpSpanProcessor; @@ -108,7 +110,7 @@ impl SpanProcessor for OtlpSpanProcessor { } fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) {} -} +}*/ #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -131,83 +133,31 @@ async fn main() -> anyhow::Result<()> { } }; - /*tracing_subscriber::registry() - .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(log_level.to_string())), - ) - .with(tracing_subscriber::fmt::layer()) - .init(); - - let global_filter = Targets::new() - .with_default(LevelFilter::INFO) - .with_target(env!("CARGO_PKG_NAME"), LevelFilter::TRACE); - */ - - /*global::set_text_map_propagator(opentelemetry_datadog::DatadogPropagator::default()); - - let log_filter = Targets::new() - .with_default(LevelFilter::INFO) - .with_target(env!("CARGO_PKG_NAME"), log_level); - + // https://github.com/flashbots/rollup-boost/blob/08ebd3e75a8f4c7ebc12db13b042dee04e132c05/crates/rollup-boost/src/tracing.rs#L127 let dd_host = env::var("DD_AGENT_HOST").unwrap_or_else(|_| "localhost".to_string()); let otlp_endpoint = format!("http://{}:{}", dd_host, config.tracing_otlp_port); - let mut trace_config = trace::Config::default(); - trace_config.sampler = Box::new(Sampler::AlwaysOn); - trace_config.id_generator = Box::new(RandomIdGenerator::default()); - - let provider = opentelemetry_datadog::new_pipeline() - .with_service_name(env!("CARGO_PKG_NAME")) - .with_api_version(opentelemetry_datadog::ApiVersion::Version05) - .with_agent_endpoint(&otlp_endpoint) - .with_trace_config(trace_config) - .install_batch()?; - - global::set_tracer_provider(provider.clone()); - - let scope = InstrumentationScope::builder(env!("CARGO_PKG_NAME")) - .with_version(env!("CARGO_PKG_VERSION")) - .with_schema_url(semcov::SCHEMA_URL) - .with_attributes(None) - .build(); - - let tracer = provider.tracer_with_scope(scope); - tracer.in_span("span_main", |_span| { - info!( - message = "Tracing enabled", - endpoint = %otlp_endpoint - ); - }); + // Be cautious with snake_case and kebab-case here + let filter_name = "tips-ingress-rpc".to_string(); - tracing_subscriber::registry() - .with(tracing_opentelemetry::OpenTelemetryLayer::new(tracer)) - .with(tracing_subscriber::fmt::layer().with_filter(log_filter)) - .init();*/ - - // https://github.com/flashbots/rollup-boost/blob/08ebd3e75a8f4c7ebc12db13b042dee04e132c05/crates/rollup-boost/src/tracing.rs#L127 - let global_filter = tracing_subscriber::filter::Targets::new() - .with_default(tracing_subscriber::filter::LevelFilter::INFO) - .with_target(env!("CARGO_PKG_NAME"), tracing_subscriber::filter::LevelFilter::TRACE); + let global_filter = Targets::new() + .with_default(LevelFilter::INFO) + .with_target(&filter_name, LevelFilter::TRACE); let registry = tracing_subscriber::registry().with(global_filter); - let log_filter = tracing_subscriber::filter::Targets::new() - .with_default(tracing_subscriber::filter::LevelFilter::INFO) - .with_target(env!("CARGO_PKG_NAME"), log_level); + let log_filter = Targets::new() + .with_default(LevelFilter::INFO) + .with_target(&filter_name, log_level); let writer = tracing_subscriber::fmt::writer::BoxMakeWriter::new(std::io::stdout); - let dd_host = env::var("DD_AGENT_HOST").unwrap_or_else(|_| "localhost".to_string()); - let otlp_endpoint = format!("http://{}:{}", dd_host, config.tracing_otlp_port); - global::set_text_map_propagator(opentelemetry_sdk::propagation::TraceContextPropagator::new()); let otlp_exporter = opentelemetry_otlp::SpanExporter::builder() .with_tonic() .with_endpoint(&otlp_endpoint) .build() .context("Failed to create OTLP exporter")?; - let provider_builder = opentelemetry_sdk::trace::SdkTracerProvider::builder() .with_batch_exporter(otlp_exporter) .with_resource( @@ -217,17 +167,16 @@ async fn main() -> anyhow::Result<()> { opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")), ]) .build(), - ) - .with_span_processor(OtlpSpanProcessor); - + ); let provider = provider_builder.build(); let tracer = provider.tracer(env!("CARGO_PKG_NAME")); - let trace_filter = tracing_subscriber::filter::Targets::new() - .with_default(tracing_subscriber::filter::LevelFilter::OFF) - .with_target(env!("CARGO_PKG_NAME"), tracing_subscriber::filter::LevelFilter::TRACE); + let trace_filter = Targets::new() + .with_default(LevelFilter::OFF) + .with_target(&filter_name, LevelFilter::TRACE); + + let registry = registry.with(OpenTelemetryLayer::new(tracer).with_filter(trace_filter)); - let registry = registry.with(tracing_opentelemetry::OpenTelemetryLayer::new(tracer).with_filter(trace_filter)); tracing::subscriber::set_global_default( registry.with( tracing_subscriber::fmt::layer() From 091158e4e3f042de0842695e618cfa5f296510c7 Mon Sep 17 00:00:00 2001 From: William Law Date: Mon, 20 Oct 2025 10:58:38 -0400 Subject: [PATCH 10/20] only spans in ingress-rpc --- crates/ingress-rpc/src/main.rs | 45 +++------------------------------- 1 file changed, 4 insertions(+), 41 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 235c9da..e0dc0b6 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -1,34 +1,22 @@ use alloy_provider::{ProviderBuilder, RootProvider}; +use anyhow::Context; use clap::Parser; use jsonrpsee::server::Server; use op_alloy_network::Optimism; use opentelemetry::global; -//use opentelemetry::trace::Tracer; -//use opentelemetry::{InstrumentationScope, trace::TracerProvider}; -//use opentelemetry_sdk::trace; use opentelemetry::trace::TracerProvider; -//use opentelemetry_otlp::WithHttpConfig; -//use opentelemetry_sdk::trace::BatchSpanProcessor; -//use opentelemetry_sdk::trace::Sampler; -//use opentelemetry_sdk::trace::SimpleSpanProcessor; -//use opentelemetry_sdk::trace::SpanProcessor; -use tracing_opentelemetry::OpenTelemetryLayer; -//use opentelemetry_semantic_conventions as semcov; +use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::Resource; -//use opentelemetry_sdk::trace::SdkTracerProvider; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; use std::env; use std::fs; use std::net::IpAddr; use tracing::{info, warn}; +use tracing_opentelemetry::OpenTelemetryLayer; use tracing_subscriber::Layer; use tracing_subscriber::filter::{LevelFilter, Targets}; -//use opentelemetry_otlp::SpanExporter; -use opentelemetry_otlp::WithExportConfig; use tracing_subscriber::layer::SubscriberExt; -//use tracing_subscriber::{Layer}; -use anyhow::Context; use url::Url; mod queue; @@ -88,30 +76,6 @@ struct Config { tracing_otlp_port: u16, } -/* -#[derive(Debug)] -pub struct OtlpSpanProcessor; - -impl SpanProcessor for OtlpSpanProcessor { - fn on_start(&self, _span: &mut opentelemetry_sdk::trace::Span, _cx: &opentelemetry::Context) {} - - fn on_end(&self, _span: opentelemetry_sdk::trace::SpanData) {} - - fn force_flush(&self) -> opentelemetry_sdk::error::OTelSdkResult { - Ok(()) - } - - fn shutdown(&self) -> opentelemetry_sdk::error::OTelSdkResult { - Ok(()) - } - - fn shutdown_with_timeout(&self, _timeout: tokio::time::Duration) -> opentelemetry_sdk::error::OTelSdkResult { - Ok(()) - } - - fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) {} -}*/ - #[tokio::main] async fn main() -> anyhow::Result<()> { dotenvy::dotenv().ok(); @@ -133,11 +97,10 @@ async fn main() -> anyhow::Result<()> { } }; - // https://github.com/flashbots/rollup-boost/blob/08ebd3e75a8f4c7ebc12db13b042dee04e132c05/crates/rollup-boost/src/tracing.rs#L127 let dd_host = env::var("DD_AGENT_HOST").unwrap_or_else(|_| "localhost".to_string()); let otlp_endpoint = format!("http://{}:{}", dd_host, config.tracing_otlp_port); - // Be cautious with snake_case and kebab-case here + // from: https://github.com/flashbots/rollup-boost/blob/08ebd3e75a8f4c7ebc12db13b042dee04e132c05/crates/rollup-boost/src/tracing.rs#L127 let filter_name = "tips-ingress-rpc".to_string(); let global_filter = Targets::new() From 8c489e595e8ddc051e501819b330c2bb9a6b7fad Mon Sep 17 00:00:00 2001 From: William Law Date: Mon, 20 Oct 2025 11:47:39 -0400 Subject: [PATCH 11/20] use opentelemetry_datadog --- Cargo.lock | 28 +++++++++++++++++++++++++ Cargo.toml | 4 +++- crates/ingress-rpc/Cargo.toml | 2 ++ crates/ingress-rpc/src/main.rs | 37 +++++++++++++++++++++++++++++----- 4 files changed, 65 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 52a94c1..f376137 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6591,6 +6591,26 @@ dependencies = [ "tracing", ] +[[package]] +name = "opentelemetry-datadog" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a6b2d4db32343691eb945e6153e5a4bd494dbf9d931d5bf7d1d7f59bee156d0" +dependencies = [ + "ahash", + "http 1.3.1", + "indexmap 2.11.4", + "itoa", + "opentelemetry 0.31.0", + "opentelemetry-http 0.31.0", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk 0.31.0", + "rmp", + "ryu", + "thiserror 2.0.17", + "url", +] + [[package]] name = "opentelemetry-http" version = "0.28.0" @@ -6692,6 +6712,12 @@ dependencies = [ "tonic-prost", ] +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" + [[package]] name = "opentelemetry_sdk" version = "0.28.0" @@ -12646,7 +12672,9 @@ dependencies = [ "op-alloy-network", "op-revm", "opentelemetry 0.31.0", + "opentelemetry-datadog", "opentelemetry-otlp 0.31.0", + "opentelemetry-semantic-conventions", "opentelemetry_sdk 0.31.0", "rdkafka", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index b3bc52a..93bce25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,6 +81,8 @@ opentelemetry-otlp = { version = "0.31.0", features = [ "trace", "grpc-tonic", ] } -opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] } +opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio", "rt-tokio-current-thread"] } tracing-opentelemetry = "0.32.0" reqwest = { version = "0.12", features = ["json"] } +opentelemetry-datadog = { version = "0.19.0" } +opentelemetry-semantic-conventions = { version = "0.31.0" } diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 65686dd..9f9937b 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -40,3 +40,5 @@ opentelemetry-otlp.workspace = true opentelemetry_sdk.workspace = true tracing-opentelemetry.workspace = true reqwest.workspace = true +opentelemetry-datadog.workspace = true +opentelemetry-semantic-conventions.workspace = true diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index e0dc0b6..a8588a3 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -1,12 +1,12 @@ use alloy_provider::{ProviderBuilder, RootProvider}; -use anyhow::Context; +//use anyhow::Context; use clap::Parser; use jsonrpsee::server::Server; use op_alloy_network::Optimism; use opentelemetry::global; use opentelemetry::trace::TracerProvider; -use opentelemetry_otlp::WithExportConfig; -use opentelemetry_sdk::Resource; +//use opentelemetry_otlp::WithExportConfig; +//use opentelemetry_sdk::Resource; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; use std::env; @@ -19,6 +19,15 @@ use tracing_subscriber::filter::{LevelFilter, Targets}; use tracing_subscriber::layer::SubscriberExt; use url::Url; +//use opentelemetry::{ +// trace::{SamplingResult, Span, TraceContextExt, Tracer}, +// InstrumentationScope, Key, KeyValue, Value, +//}; +use opentelemetry::InstrumentationScope; +use opentelemetry_datadog::{ApiVersion, new_pipeline}; +use opentelemetry_sdk::trace::{self, RandomIdGenerator, Sampler}; +use opentelemetry_semantic_conventions as semcov; + mod queue; mod service; mod validation; @@ -116,7 +125,7 @@ async fn main() -> anyhow::Result<()> { let writer = tracing_subscriber::fmt::writer::BoxMakeWriter::new(std::io::stdout); global::set_text_map_propagator(opentelemetry_sdk::propagation::TraceContextPropagator::new()); - let otlp_exporter = opentelemetry_otlp::SpanExporter::builder() + /*let otlp_exporter = opentelemetry_otlp::SpanExporter::builder() .with_tonic() .with_endpoint(&otlp_endpoint) .build() @@ -132,7 +141,25 @@ async fn main() -> anyhow::Result<()> { .build(), ); let provider = provider_builder.build(); - let tracer = provider.tracer(env!("CARGO_PKG_NAME")); + let tracer = provider.tracer(env!("CARGO_PKG_NAME"));*/ + + let mut trace_cfg = trace::Config::default(); + trace_cfg.sampler = Box::new(Sampler::AlwaysOn); + trace_cfg.id_generator = Box::new(RandomIdGenerator::default()); + + let provider = new_pipeline() + .with_service_name(&filter_name) + .with_api_version(ApiVersion::Version05) + .with_trace_config(trace_cfg) + .with_agent_endpoint(&otlp_endpoint) + .install_batch()?; + global::set_tracer_provider(provider.clone()); + let scope = InstrumentationScope::builder(filter_name.clone()) + .with_version(env!("CARGO_PKG_VERSION")) + .with_schema_url(semcov::SCHEMA_URL) + .with_attributes(None) + .build(); + let tracer = provider.tracer_with_scope(scope); let trace_filter = Targets::new() .with_default(LevelFilter::OFF) From 069adb48186a183bbbc0a7b8078f4d076d8d1c1f Mon Sep 17 00:00:00 2001 From: William Law Date: Mon, 20 Oct 2025 13:46:09 -0400 Subject: [PATCH 12/20] add notes --- crates/ingress-rpc/src/main.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index a8588a3..5c2e65f 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -147,11 +147,12 @@ async fn main() -> anyhow::Result<()> { trace_cfg.sampler = Box::new(Sampler::AlwaysOn); trace_cfg.id_generator = Box::new(RandomIdGenerator::default()); + // `with_agent_endpoint` or `with_http_client`? let provider = new_pipeline() .with_service_name(&filter_name) .with_api_version(ApiVersion::Version05) .with_trace_config(trace_cfg) - .with_agent_endpoint(&otlp_endpoint) + .with_agent_endpoint(&otlp_endpoint) // TODO: do we need to configure HTTP client? .install_batch()?; global::set_tracer_provider(provider.clone()); let scope = InstrumentationScope::builder(filter_name.clone()) @@ -214,6 +215,8 @@ async fn main() -> anyhow::Result<()> { ); handle.stopped().await; + // TODO: might need shutdown + // let _ = provider.shutdown(); Ok(()) } From 41d4c57078188f63aed598b70e13331b250ee115 Mon Sep 17 00:00:00 2001 From: William Law Date: Tue, 21 Oct 2025 09:54:15 -0400 Subject: [PATCH 13/20] add back http client --- Cargo.lock | 1 + Cargo.toml | 2 +- crates/ingress-rpc/src/main.rs | 3 ++- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f376137..b348ad9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6605,6 +6605,7 @@ dependencies = [ "opentelemetry-http 0.31.0", "opentelemetry-semantic-conventions", "opentelemetry_sdk 0.31.0", + "reqwest", "rmp", "ryu", "thiserror 2.0.17", diff --git a/Cargo.toml b/Cargo.toml index 93bce25..f115d59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,5 +84,5 @@ opentelemetry-otlp = { version = "0.31.0", features = [ opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio", "rt-tokio-current-thread"] } tracing-opentelemetry = "0.32.0" reqwest = { version = "0.12", features = ["json"] } -opentelemetry-datadog = { version = "0.19.0" } +opentelemetry-datadog = { version = "0.19.0", features = ["reqwest-blocking-client", "reqwest-client"] } opentelemetry-semantic-conventions = { version = "0.31.0" } diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 5c2e65f..dbfbdb4 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -152,8 +152,9 @@ async fn main() -> anyhow::Result<()> { .with_service_name(&filter_name) .with_api_version(ApiVersion::Version05) .with_trace_config(trace_cfg) + .with_http_client(reqwest::blocking::Client::new()) .with_agent_endpoint(&otlp_endpoint) // TODO: do we need to configure HTTP client? - .install_batch()?; + .install_simple()?; // TODO: use batch exporter later global::set_tracer_provider(provider.clone()); let scope = InstrumentationScope::builder(filter_name.clone()) .with_version(env!("CARGO_PKG_VERSION")) From 0c88f4b233d97017233923b909143b577f9c3feb Mon Sep 17 00:00:00 2001 From: William Law Date: Tue, 21 Oct 2025 10:40:25 -0400 Subject: [PATCH 14/20] add unit test & use reqwest-client --- crates/ingress-rpc/Cargo.toml | 2 +- crates/ingress-rpc/src/main.rs | 26 ++++++++++++++++++++++++-- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 9f9937b..1a5fa8a 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -40,5 +40,5 @@ opentelemetry-otlp.workspace = true opentelemetry_sdk.workspace = true tracing-opentelemetry.workspace = true reqwest.workspace = true -opentelemetry-datadog.workspace = true +opentelemetry-datadog = { workspace = true, features = ["reqwest-client"]} opentelemetry-semantic-conventions.workspace = true diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index dbfbdb4..aceefb6 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -152,7 +152,7 @@ async fn main() -> anyhow::Result<()> { .with_service_name(&filter_name) .with_api_version(ApiVersion::Version05) .with_trace_config(trace_cfg) - .with_http_client(reqwest::blocking::Client::new()) + //.with_http_client(reqwest::Client::new()) .with_agent_endpoint(&otlp_endpoint) // TODO: do we need to configure HTTP client? .install_simple()?; // TODO: use batch exporter later global::set_tracer_provider(provider.clone()); @@ -217,7 +217,7 @@ async fn main() -> anyhow::Result<()> { handle.stopped().await; // TODO: might need shutdown - // let _ = provider.shutdown(); + let _ = provider.shutdown(); Ok(()) } @@ -239,3 +239,25 @@ fn load_kafka_config_from_file(properties_file_path: &str) -> anyhow::Result Date: Tue, 21 Oct 2025 10:52:32 -0400 Subject: [PATCH 15/20] update unit test to be in async ctx --- crates/ingress-rpc/src/main.rs | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index aceefb6..4fdc5d0 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -244,20 +244,22 @@ fn load_kafka_config_from_file(properties_file_path: &str) -> anyhow::Result Date: Tue, 21 Oct 2025 11:00:24 -0400 Subject: [PATCH 16/20] make tracer in seperate thread --- crates/ingress-rpc/src/main.rs | 71 ++++++++++++++++++---------------- 1 file changed, 38 insertions(+), 33 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 4fdc5d0..8c63231 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -107,7 +107,7 @@ async fn main() -> anyhow::Result<()> { }; let dd_host = env::var("DD_AGENT_HOST").unwrap_or_else(|_| "localhost".to_string()); - let otlp_endpoint = format!("http://{}:{}", dd_host, config.tracing_otlp_port); + let otlp_endpoint = format!("http://{}:{}", &dd_host, &config.tracing_otlp_port); // from: https://github.com/flashbots/rollup-boost/blob/08ebd3e75a8f4c7ebc12db13b042dee04e132c05/crates/rollup-boost/src/tracing.rs#L127 let filter_name = "tips-ingress-rpc".to_string(); @@ -148,43 +148,48 @@ async fn main() -> anyhow::Result<()> { trace_cfg.id_generator = Box::new(RandomIdGenerator::default()); // `with_agent_endpoint` or `with_http_client`? - let provider = new_pipeline() - .with_service_name(&filter_name) - .with_api_version(ApiVersion::Version05) - .with_trace_config(trace_cfg) - //.with_http_client(reqwest::Client::new()) - .with_agent_endpoint(&otlp_endpoint) // TODO: do we need to configure HTTP client? - .install_simple()?; // TODO: use batch exporter later - global::set_tracer_provider(provider.clone()); - let scope = InstrumentationScope::builder(filter_name.clone()) - .with_version(env!("CARGO_PKG_VERSION")) - .with_schema_url(semcov::SCHEMA_URL) - .with_attributes(None) - .build(); - let tracer = provider.tracer_with_scope(scope); - - let trace_filter = Targets::new() - .with_default(LevelFilter::OFF) - .with_target(&filter_name, LevelFilter::TRACE); - - let registry = registry.with(OpenTelemetryLayer::new(tracer).with_filter(trace_filter)); - - tracing::subscriber::set_global_default( - registry.with( - tracing_subscriber::fmt::layer() - .json() - .with_ansi(false) - .with_writer(writer) - .with_filter(log_filter.clone()), - ), - )?; + let handle = std::thread::spawn(move || { + let provider = new_pipeline() + .with_service_name(&filter_name) + .with_api_version(ApiVersion::Version05) + .with_trace_config(trace_cfg) + //.with_http_client(reqwest::Client::new()) + .with_agent_endpoint(&otlp_endpoint) // TODO: do we need to configure HTTP client? + .install_simple() + .expect("Failed to build provider"); // TODO: use batch exporter later + global::set_tracer_provider(provider.clone()); + let scope = InstrumentationScope::builder(filter_name.clone()) + .with_version(env!("CARGO_PKG_VERSION")) + .with_schema_url(semcov::SCHEMA_URL) + .with_attributes(None) + .build(); + let tracer = provider.tracer_with_scope(scope); + + let trace_filter = Targets::new() + .with_default(LevelFilter::OFF) + .with_target(&filter_name, LevelFilter::TRACE); + + let registry = registry.with(OpenTelemetryLayer::new(tracer).with_filter(trace_filter)); + + tracing::subscriber::set_global_default( + registry.with( + tracing_subscriber::fmt::layer() + .json() + .with_ansi(false) + .with_writer(writer) + .with_filter(log_filter.clone()), + ), + ) + .expect("Failed to set global default"); + }); + handle.join().expect("Failed to join thread"); info!( message = "Starting ingress service", address = %config.address, port = config.port, mempool_url = %config.mempool_url, - endpoint = %otlp_endpoint + endpoint = %format!("http://{}:{}", &dd_host, &config.tracing_otlp_port) ); let op_provider: RootProvider = ProviderBuilder::new() @@ -217,7 +222,7 @@ async fn main() -> anyhow::Result<()> { handle.stopped().await; // TODO: might need shutdown - let _ = provider.shutdown(); + // let _ = provider.shutdown(); Ok(()) } From d7984e8db390ba5599ad071bb1d3e0eb36a586c8 Mon Sep 17 00:00:00 2001 From: William Law Date: Tue, 21 Oct 2025 11:27:24 -0400 Subject: [PATCH 17/20] add more unit test + use dummy example to see if traces in apm work --- crates/ingress-rpc/src/main.rs | 142 ++++++++++++++++++++++++++------- 1 file changed, 111 insertions(+), 31 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 8c63231..470b941 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -3,8 +3,8 @@ use alloy_provider::{ProviderBuilder, RootProvider}; use clap::Parser; use jsonrpsee::server::Server; use op_alloy_network::Optimism; -use opentelemetry::global; -use opentelemetry::trace::TracerProvider; +use opentelemetry::trace::{Span, TraceContextExt, Tracer, TracerProvider}; +use opentelemetry::{Key, KeyValue, Value, global}; //use opentelemetry_otlp::WithExportConfig; //use opentelemetry_sdk::Resource; use rdkafka::ClientConfig; @@ -12,6 +12,7 @@ use rdkafka::producer::FutureProducer; use std::env; use std::fs; use std::net::IpAddr; +use std::time::Duration; use tracing::{info, warn}; use tracing_opentelemetry::OpenTelemetryLayer; use tracing_subscriber::Layer; @@ -85,6 +86,21 @@ struct Config { tracing_otlp_port: u16, } +fn bar() { + let tracer = global::tracer("component-bar"); + let mut span = tracer.start("bar"); + span.set_attribute(KeyValue::new( + Key::new("span.type"), + Value::String("sql".into()), + )); + span.set_attribute(KeyValue::new( + Key::new("sql.query"), + Value::String("SELECT * FROM table".into()), + )); + std::thread::sleep(Duration::from_millis(6)); + span.end() +} + #[tokio::main] async fn main() -> anyhow::Result<()> { dotenvy::dotenv().ok(); @@ -125,24 +141,6 @@ async fn main() -> anyhow::Result<()> { let writer = tracing_subscriber::fmt::writer::BoxMakeWriter::new(std::io::stdout); global::set_text_map_propagator(opentelemetry_sdk::propagation::TraceContextPropagator::new()); - /*let otlp_exporter = opentelemetry_otlp::SpanExporter::builder() - .with_tonic() - .with_endpoint(&otlp_endpoint) - .build() - .context("Failed to create OTLP exporter")?; - let provider_builder = opentelemetry_sdk::trace::SdkTracerProvider::builder() - .with_batch_exporter(otlp_exporter) - .with_resource( - Resource::builder_empty() - .with_attributes([ - opentelemetry::KeyValue::new("service.name", env!("CARGO_PKG_NAME")), - opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")), - ]) - .build(), - ); - let provider = provider_builder.build(); - let tracer = provider.tracer(env!("CARGO_PKG_NAME"));*/ - let mut trace_cfg = trace::Config::default(); trace_cfg.sampler = Box::new(Sampler::AlwaysOn); trace_cfg.id_generator = Box::new(RandomIdGenerator::default()); @@ -164,6 +162,26 @@ async fn main() -> anyhow::Result<()> { .with_attributes(None) .build(); let tracer = provider.tracer_with_scope(scope); + tracer.in_span("foo", |cx| { + let span = cx.span(); + span.set_attribute(KeyValue::new( + Key::new("span.type"), + Value::String("web".into()), + )); + span.set_attribute(KeyValue::new( + Key::new("http.url"), + Value::String("http://localhost:8080/foo".into()), + )); + span.set_attribute(KeyValue::new( + Key::new("http.method"), + Value::String("GET".into()), + )); + span.set_attribute(KeyValue::new(Key::new("http.status_code"), Value::I64(200))); + + std::thread::sleep(Duration::from_millis(6)); + bar(); + std::thread::sleep(Duration::from_millis(6)); + }); let trace_filter = Targets::new() .with_default(LevelFilter::OFF) @@ -248,23 +266,85 @@ fn load_kafka_config_from_file(properties_file_path: &str) -> anyhow::Result (Layered, Targets, BoxMakeWriter) { + let global_filter = Targets::new() + .with_default(LevelFilter::INFO) + .with_target(FILTER_NAME, LevelFilter::TRACE); + + let registry: Layered = + tracing_subscriber::registry().with(global_filter); + + let log_filter = Targets::new() + .with_default(LevelFilter::INFO) + .with_target(FILTER_NAME, LevelFilter::TRACE); + + let writer = BoxMakeWriter::new(std::io::stdout); + + (registry, log_filter, writer) + } + + fn build_provider() -> SdkTracerProvider { + let mut trace_cfg = trace::Config::default(); + trace_cfg.sampler = Box::new(Sampler::AlwaysOn); + trace_cfg.id_generator = Box::new(RandomIdGenerator::default()); + + let provider = new_pipeline() + .with_service_name(FILTER_NAME) + .with_api_version(ApiVersion::Version05) + .with_trace_config(trace_cfg) + .with_agent_endpoint("http://localhost:4317") + .install_simple() + .expect("Failed to build provider"); + provider + } #[tokio::test] async fn test_build_provider() { let handle = std::thread::spawn(|| { - let mut trace_cfg = trace::Config::default(); - trace_cfg.sampler = Box::new(Sampler::AlwaysOn); - trace_cfg.id_generator = Box::new(RandomIdGenerator::default()); - - let _provider = new_pipeline() - .with_service_name("tips-ingress-rpc") - .with_api_version(ApiVersion::Version05) - .with_trace_config(trace_cfg) - .with_agent_endpoint("http://localhost:4317") - .install_simple() - .expect("Failed to build provider"); + build_provider(); }); handle.join().unwrap(); } + + #[tokio::test] + async fn test_build_tracer() { + let (registry, log_filter, writer) = setup(); + + let handle = std::thread::spawn(move || { + let provider = build_provider(); + global::set_tracer_provider(provider.clone()); + let scope = InstrumentationScope::builder(FILTER_NAME.to_string()) + .with_version(env!("CARGO_PKG_VERSION")) + .with_schema_url(semcov::SCHEMA_URL) + .with_attributes(None) + .build(); + let tracer = provider.tracer_with_scope(scope); + + let trace_filter = Targets::new() + .with_default(LevelFilter::OFF) + .with_target(FILTER_NAME, LevelFilter::TRACE); + + let registry = registry.with(OpenTelemetryLayer::new(tracer).with_filter(trace_filter)); + + tracing::subscriber::set_global_default( + registry.with( + tracing_subscriber::fmt::layer() + .json() + .with_ansi(false) + .with_writer(writer) + .with_filter(log_filter.clone()), + ), + ) + .expect("Failed to set global default"); + }); + handle.join().expect("Failed to join thread"); + } } From a99d07b3c760b296fce26df01827fdf43a476330 Mon Sep 17 00:00:00 2001 From: William Law Date: Tue, 21 Oct 2025 12:01:22 -0400 Subject: [PATCH 18/20] add agent sampler --- crates/ingress-rpc/Cargo.toml | 2 +- crates/ingress-rpc/src/main.rs | 44 +++++++++++++++++++++++++++++----- 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 1a5fa8a..ec52da9 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -40,5 +40,5 @@ opentelemetry-otlp.workspace = true opentelemetry_sdk.workspace = true tracing-opentelemetry.workspace = true reqwest.workspace = true -opentelemetry-datadog = { workspace = true, features = ["reqwest-client"]} +opentelemetry-datadog = { workspace = true, features = ["reqwest-client", "agent-sampling"]} opentelemetry-semantic-conventions.workspace = true diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 470b941..eb14714 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -3,7 +3,7 @@ use alloy_provider::{ProviderBuilder, RootProvider}; use clap::Parser; use jsonrpsee::server::Server; use op_alloy_network::Optimism; -use opentelemetry::trace::{Span, TraceContextExt, Tracer, TracerProvider}; +use opentelemetry::trace::{SamplingResult, Span, TraceContextExt, Tracer, TracerProvider}; use opentelemetry::{Key, KeyValue, Value, global}; //use opentelemetry_otlp::WithExportConfig; //use opentelemetry_sdk::Resource; @@ -25,8 +25,8 @@ use url::Url; // InstrumentationScope, Key, KeyValue, Value, //}; use opentelemetry::InstrumentationScope; -use opentelemetry_datadog::{ApiVersion, new_pipeline}; -use opentelemetry_sdk::trace::{self, RandomIdGenerator, Sampler}; +use opentelemetry_datadog::{ApiVersion, DatadogTraceStateBuilder, new_pipeline}; +use opentelemetry_sdk::trace::{self, RandomIdGenerator, ShouldSample}; use opentelemetry_semantic_conventions as semcov; mod queue; @@ -101,6 +101,38 @@ fn bar() { span.end() } +#[derive(Debug, Clone)] +struct AgentBasedSampler; + +impl ShouldSample for AgentBasedSampler { + fn should_sample( + &self, + parent_context: Option<&opentelemetry::Context>, + _trace_id: opentelemetry::trace::TraceId, + _name: &str, + _span_kind: &opentelemetry::trace::SpanKind, + _attributes: &[opentelemetry::KeyValue], + _links: &[opentelemetry::trace::Link], + ) -> opentelemetry::trace::SamplingResult { + let trace_state = parent_context + .map( + |parent_context| parent_context.span().span_context().trace_state().clone(), // inherit sample decision from parent span + ) + .unwrap_or_else(|| { + DatadogTraceStateBuilder::default() + .with_priority_sampling(true) // always sample root span(span without remote or local parent) + .with_measuring(true) // datadog-agent will create metric for this span for APM + .build() + }); + + SamplingResult { + decision: opentelemetry::trace::SamplingDecision::RecordAndSample, // send all spans to datadog-agent + attributes: vec![], + trace_state, + } + } +} + #[tokio::main] async fn main() -> anyhow::Result<()> { dotenvy::dotenv().ok(); @@ -126,7 +158,7 @@ async fn main() -> anyhow::Result<()> { let otlp_endpoint = format!("http://{}:{}", &dd_host, &config.tracing_otlp_port); // from: https://github.com/flashbots/rollup-boost/blob/08ebd3e75a8f4c7ebc12db13b042dee04e132c05/crates/rollup-boost/src/tracing.rs#L127 - let filter_name = "tips-ingress-rpc".to_string(); + let filter_name = "tips_ingress_rpc".to_string(); let global_filter = Targets::new() .with_default(LevelFilter::INFO) @@ -142,7 +174,7 @@ async fn main() -> anyhow::Result<()> { global::set_text_map_propagator(opentelemetry_sdk::propagation::TraceContextPropagator::new()); let mut trace_cfg = trace::Config::default(); - trace_cfg.sampler = Box::new(Sampler::AlwaysOn); + trace_cfg.sampler = Box::new(AgentBasedSampler); trace_cfg.id_generator = Box::new(RandomIdGenerator::default()); // `with_agent_endpoint` or `with_http_client`? @@ -292,7 +324,7 @@ mod tests { fn build_provider() -> SdkTracerProvider { let mut trace_cfg = trace::Config::default(); - trace_cfg.sampler = Box::new(Sampler::AlwaysOn); + trace_cfg.sampler = Box::new(AgentBasedSampler); trace_cfg.id_generator = Box::new(RandomIdGenerator::default()); let provider = new_pipeline() From 2aad6f2ebd87da9191600fa8a2929e6a9361d283 Mon Sep 17 00:00:00 2001 From: William Law Date: Tue, 21 Oct 2025 12:26:45 -0400 Subject: [PATCH 19/20] move everything inside thread --- crates/ingress-rpc/src/main.rs | 37 +++++++++++++++++----------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index eb14714..c589f9a 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -157,28 +157,29 @@ async fn main() -> anyhow::Result<()> { let dd_host = env::var("DD_AGENT_HOST").unwrap_or_else(|_| "localhost".to_string()); let otlp_endpoint = format!("http://{}:{}", &dd_host, &config.tracing_otlp_port); - // from: https://github.com/flashbots/rollup-boost/blob/08ebd3e75a8f4c7ebc12db13b042dee04e132c05/crates/rollup-boost/src/tracing.rs#L127 - let filter_name = "tips_ingress_rpc".to_string(); + let handle = std::thread::spawn(move || { + // from: https://github.com/flashbots/rollup-boost/blob/08ebd3e75a8f4c7ebc12db13b042dee04e132c05/crates/rollup-boost/src/tracing.rs#L127 + let filter_name = "tips_ingress_rpc".to_string(); - let global_filter = Targets::new() - .with_default(LevelFilter::INFO) - .with_target(&filter_name, LevelFilter::TRACE); + let global_filter = Targets::new() + .with_default(LevelFilter::INFO) + .with_target(&filter_name, LevelFilter::TRACE); - let registry = tracing_subscriber::registry().with(global_filter); + let registry = tracing_subscriber::registry().with(global_filter); - let log_filter = Targets::new() - .with_default(LevelFilter::INFO) - .with_target(&filter_name, log_level); + let log_filter = Targets::new() + .with_default(LevelFilter::INFO) + .with_target(&filter_name, log_level); - let writer = tracing_subscriber::fmt::writer::BoxMakeWriter::new(std::io::stdout); + let writer = tracing_subscriber::fmt::writer::BoxMakeWriter::new(std::io::stdout); - global::set_text_map_propagator(opentelemetry_sdk::propagation::TraceContextPropagator::new()); - let mut trace_cfg = trace::Config::default(); - trace_cfg.sampler = Box::new(AgentBasedSampler); - trace_cfg.id_generator = Box::new(RandomIdGenerator::default()); + global::set_text_map_propagator( + opentelemetry_sdk::propagation::TraceContextPropagator::new(), + ); + let mut trace_cfg = trace::Config::default(); + trace_cfg.sampler = Box::new(AgentBasedSampler); + trace_cfg.id_generator = Box::new(RandomIdGenerator::default()); - // `with_agent_endpoint` or `with_http_client`? - let handle = std::thread::spawn(move || { let provider = new_pipeline() .with_service_name(&filter_name) .with_api_version(ApiVersion::Version05) @@ -348,9 +349,9 @@ mod tests { #[tokio::test] async fn test_build_tracer() { - let (registry, log_filter, writer) = setup(); - let handle = std::thread::spawn(move || { + let (registry, log_filter, writer) = setup(); + let provider = build_provider(); global::set_tracer_provider(provider.clone()); let scope = InstrumentationScope::builder(FILTER_NAME.to_string()) From c3ae58b2540acb7859bc6124cad463cd462cec36 Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 22 Oct 2025 14:12:21 -0400 Subject: [PATCH 20/20] try DatadogPropagator --- Cargo.lock | 13 +++++++++++++ Cargo.toml | 1 + crates/ingress-rpc/Cargo.toml | 1 + crates/ingress-rpc/src/main.rs | 23 +++++++++++------------ crates/ingress-rpc/src/service.rs | 14 +++++++++----- 5 files changed, 35 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b348ad9..833a5a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6591,6 +6591,18 @@ dependencies = [ "tracing", ] +[[package]] +name = "opentelemetry-appender-tracing" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef6a1ac5ca3accf562b8c306fa8483c85f4390f768185ab775f242f7fe8fdcc2" +dependencies = [ + "opentelemetry 0.31.0", + "tracing", + "tracing-core", + "tracing-subscriber 0.3.20", +] + [[package]] name = "opentelemetry-datadog" version = "0.19.0" @@ -12673,6 +12685,7 @@ dependencies = [ "op-alloy-network", "op-revm", "opentelemetry 0.31.0", + "opentelemetry-appender-tracing", "opentelemetry-datadog", "opentelemetry-otlp 0.31.0", "opentelemetry-semantic-conventions", diff --git a/Cargo.toml b/Cargo.toml index f115d59..2946cd4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,3 +86,4 @@ tracing-opentelemetry = "0.32.0" reqwest = { version = "0.12", features = ["json"] } opentelemetry-datadog = { version = "0.19.0", features = ["reqwest-blocking-client", "reqwest-client"] } opentelemetry-semantic-conventions = { version = "0.31.0" } +opentelemetry-appender-tracing = "0.31.1" diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index ec52da9..d034071 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -42,3 +42,4 @@ tracing-opentelemetry.workspace = true reqwest.workspace = true opentelemetry-datadog = { workspace = true, features = ["reqwest-client", "agent-sampling"]} opentelemetry-semantic-conventions.workspace = true +opentelemetry-appender-tracing.workspace = true diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index c589f9a..7e01ad1 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -3,7 +3,7 @@ use alloy_provider::{ProviderBuilder, RootProvider}; use clap::Parser; use jsonrpsee::server::Server; use op_alloy_network::Optimism; -use opentelemetry::trace::{SamplingResult, Span, TraceContextExt, Tracer, TracerProvider}; +use opentelemetry::trace::{Span, TraceContextExt, Tracer, TracerProvider}; use opentelemetry::{Key, KeyValue, Value, global}; //use opentelemetry_otlp::WithExportConfig; //use opentelemetry_sdk::Resource; @@ -25,8 +25,9 @@ use url::Url; // InstrumentationScope, Key, KeyValue, Value, //}; use opentelemetry::InstrumentationScope; -use opentelemetry_datadog::{ApiVersion, DatadogTraceStateBuilder, new_pipeline}; -use opentelemetry_sdk::trace::{self, RandomIdGenerator, ShouldSample}; +//use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; +use opentelemetry_datadog::{ApiVersion, new_pipeline}; //DatadogTraceStateBuilder, +use opentelemetry_sdk::trace::{self, RandomIdGenerator, Sampler}; //ShouldSample use opentelemetry_semantic_conventions as semcov; mod queue; @@ -101,7 +102,7 @@ fn bar() { span.end() } -#[derive(Debug, Clone)] +/*#[derive(Debug, Clone)] struct AgentBasedSampler; impl ShouldSample for AgentBasedSampler { @@ -131,7 +132,7 @@ impl ShouldSample for AgentBasedSampler { trace_state, } } -} +}*/ #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -159,7 +160,7 @@ async fn main() -> anyhow::Result<()> { let handle = std::thread::spawn(move || { // from: https://github.com/flashbots/rollup-boost/blob/08ebd3e75a8f4c7ebc12db13b042dee04e132c05/crates/rollup-boost/src/tracing.rs#L127 - let filter_name = "tips_ingress_rpc".to_string(); + let filter_name = "tips-ingress-rpc".to_string(); let global_filter = Targets::new() .with_default(LevelFilter::INFO) @@ -173,11 +174,9 @@ async fn main() -> anyhow::Result<()> { let writer = tracing_subscriber::fmt::writer::BoxMakeWriter::new(std::io::stdout); - global::set_text_map_propagator( - opentelemetry_sdk::propagation::TraceContextPropagator::new(), - ); + global::set_text_map_propagator(opentelemetry_datadog::DatadogPropagator::default()); let mut trace_cfg = trace::Config::default(); - trace_cfg.sampler = Box::new(AgentBasedSampler); + trace_cfg.sampler = Box::new(Sampler::AlwaysOn); trace_cfg.id_generator = Box::new(RandomIdGenerator::default()); let provider = new_pipeline() @@ -186,7 +185,7 @@ async fn main() -> anyhow::Result<()> { .with_trace_config(trace_cfg) //.with_http_client(reqwest::Client::new()) .with_agent_endpoint(&otlp_endpoint) // TODO: do we need to configure HTTP client? - .install_simple() + .install_batch() .expect("Failed to build provider"); // TODO: use batch exporter later global::set_tracer_provider(provider.clone()); let scope = InstrumentationScope::builder(filter_name.clone()) @@ -325,7 +324,7 @@ mod tests { fn build_provider() -> SdkTracerProvider { let mut trace_cfg = trace::Config::default(); - trace_cfg.sampler = Box::new(AgentBasedSampler); + trace_cfg.sampler = Box::new(Sampler::AlwaysOn); trace_cfg.id_generator = Box::new(RandomIdGenerator::default()); let provider = new_pipeline() diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 8eb8ae1..c783c7e 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -9,6 +9,7 @@ use jsonrpsee::{ }; use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::Optimism; +use opentelemetry::{global, trace::Tracer}; use reth_rpc_eth_types::EthApiError; use std::time::{SystemTime, UNIX_EPOCH}; use tracing::{Instrument, info, span, warn}; @@ -118,11 +119,14 @@ where // queue the bundle let sender = transaction.signer(); - let span = - span!(tracing::Level::INFO, "span_publish", transaction = %transaction.tx_hash()); - if let Err(e) = self.queue.publish(&bundle, sender).instrument(span).await { - warn!(message = "Failed to publish Queue::enqueue_bundle", sender = %sender, error = %e); - } + let t = global::tracer("queue_publish"); + t.in_span("queue_publish_span", async |_| { + let span = + span!(tracing::Level::INFO, "span_publish", transaction = %transaction.tx_hash()); + if let Err(e) = self.queue.publish(&bundle, sender).instrument(span).await { + warn!(message = "Failed to publish Queue::enqueue_bundle", sender = %sender, error = %e); + } + }).await; info!(message="queued singleton bundle", txn_hash=%transaction.tx_hash());