diff --git a/Cargo.lock b/Cargo.lock index 4d6f59f..833a5a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3568,6 +3568,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "endian-type" version = "0.1.2" @@ -6568,6 +6577,53 @@ dependencies = [ "tracing", ] +[[package]] +name = "opentelemetry" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.17", + "tracing", +] + +[[package]] +name = "opentelemetry-appender-tracing" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef6a1ac5ca3accf562b8c306fa8483c85f4390f768185ab775f242f7fe8fdcc2" +dependencies = [ + "opentelemetry 0.31.0", + "tracing", + "tracing-core", + "tracing-subscriber 0.3.20", +] + +[[package]] +name = "opentelemetry-datadog" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a6b2d4db32343691eb945e6153e5a4bd494dbf9d931d5bf7d1d7f59bee156d0" +dependencies = [ + "ahash", + "http 1.3.1", + "indexmap 2.11.4", + "itoa", + "opentelemetry 0.31.0", + "opentelemetry-http 0.31.0", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk 0.31.0", + "reqwest", + "rmp", + "ryu", + "thiserror 2.0.17", + "url", +] + [[package]] name = "opentelemetry-http" version = "0.28.0" @@ -6577,11 +6633,24 @@ dependencies = [ "async-trait", "bytes", "http 1.3.1", - "opentelemetry", + "opentelemetry 0.28.0", "reqwest", "tracing", ] +[[package]] +name = "opentelemetry-http" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" +dependencies = [ + "async-trait", + "bytes", + "http 1.3.1", + "opentelemetry 0.31.0", + "reqwest", +] + [[package]] name = "opentelemetry-otlp" version = "0.28.0" @@ -6591,16 +6660,36 @@ dependencies = [ "async-trait", "futures-core", "http 1.3.1", - "opentelemetry", - "opentelemetry-http", - "opentelemetry-proto", - "opentelemetry_sdk", - "prost", + "opentelemetry 0.28.0", + "opentelemetry-http 0.28.0", + "opentelemetry-proto 0.28.0", + "opentelemetry_sdk 0.28.0", + "prost 0.13.5", "reqwest", "serde_json", "thiserror 2.0.17", "tokio", - "tonic", + "tonic 0.12.3", + "tracing", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" +dependencies = [ + "http 1.3.1", + "opentelemetry 0.31.0", + "opentelemetry-http 0.31.0", + "opentelemetry-proto 0.31.0", + "opentelemetry_sdk 0.31.0", + "prost 0.14.1", + "reqwest", + "serde_json", + "thiserror 2.0.17", + "tokio", + "tonic 0.14.2", "tracing", ] @@ -6612,13 +6701,36 @@ checksum = "56f8870d3024727e99212eb3bb1762ec16e255e3e6f58eeb3dc8db1aa226746d" dependencies = [ "base64 0.22.1", "hex", - "opentelemetry", - "opentelemetry_sdk", - "prost", + "opentelemetry 0.28.0", + "opentelemetry_sdk 0.28.0", + "prost 0.13.5", "serde", - "tonic", + "tonic 0.12.3", ] +[[package]] +name = "opentelemetry-proto" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" +dependencies = [ + "base64 0.22.1", + "const-hex", + "opentelemetry 0.31.0", + "opentelemetry_sdk 0.31.0", + "prost 0.14.1", + "serde", + "serde_json", + "tonic 0.14.2", + "tonic-prost", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" + [[package]] name = "opentelemetry_sdk" version = "0.28.0" @@ -6630,7 +6742,7 @@ dependencies = [ "futures-executor", "futures-util", "glob", - "opentelemetry", + "opentelemetry 0.28.0", "percent-encoding", "rand 0.8.5", "serde_json", @@ -6640,6 +6752,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "opentelemetry_sdk" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry 0.31.0", + "percent-encoding", + "rand 0.9.2", + "thiserror 2.0.17", + "tokio", + "tokio-stream", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -7141,7 +7270,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.13.5", +] + +[[package]] +name = "prost" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" +dependencies = [ + "bytes", + "prost-derive 0.14.1", ] [[package]] @@ -7157,6 +7296,19 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "prost-derive" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "pulldown-cmark" version = "0.9.6" @@ -7564,9 +7716,11 @@ checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" dependencies = [ "base64 0.22.1", "bytes", + "encoding_rs", "futures-channel", "futures-core", "futures-util", + "h2 0.4.12", "http 1.3.1", "http-body 1.0.1", "http-body-util", @@ -7576,6 +7730,7 @@ dependencies = [ "hyper-util", "js-sys", "log", + "mime", "native-tls", "percent-encoding", "pin-project-lite", @@ -10894,9 +11049,9 @@ dependencies = [ "metrics-util", "moka", "op-alloy-rpc-types-engine", - "opentelemetry", - "opentelemetry-otlp", - "opentelemetry_sdk", + "opentelemetry 0.28.0", + "opentelemetry-otlp 0.28.0", + "opentelemetry_sdk 0.28.0", "parking_lot", "paste", "reth-optimism-payload-builder", @@ -10912,7 +11067,7 @@ dependencies = [ "tower 0.5.2", "tower-http", "tracing", - "tracing-opentelemetry", + "tracing-opentelemetry 0.29.0", "tracing-subscriber 0.3.20", "url", "vergen", @@ -12529,13 +12684,21 @@ dependencies = [ "op-alloy-consensus", "op-alloy-network", "op-revm", + "opentelemetry 0.31.0", + "opentelemetry-appender-tracing", + "opentelemetry-datadog", + "opentelemetry-otlp 0.31.0", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk 0.31.0", "rdkafka", + "reqwest", "reth-optimism-evm", "reth-rpc-eth-types", "revm-context-interface", "serde_json", "tokio", "tracing", + "tracing-opentelemetry 0.32.0", "tracing-subscriber 0.3.20", "url", ] @@ -12798,7 +12961,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", + "prost 0.13.5", "socket2 0.5.10", "tokio", "tokio-stream", @@ -12808,6 +12971,43 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.7.0", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "sync_wrapper", + "tokio", + "tokio-stream", + "tower 0.5.2", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-prost" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" +dependencies = [ + "bytes", + "prost 0.14.1", + "tonic 0.14.2", +] + [[package]] name = "tower" version = "0.4.13" @@ -12988,8 +13188,8 @@ checksum = "721f2d2569dce9f3dfbbddee5906941e953bfcdf736a62da3377f5751650cc36" dependencies = [ "js-sys", "once_cell", - "opentelemetry", - "opentelemetry_sdk", + "opentelemetry 0.28.0", + "opentelemetry_sdk 0.28.0", "smallvec", "tracing", "tracing-core", @@ -12998,6 +13198,25 @@ dependencies = [ "web-time", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e6e5658463dd88089aba75c7791e1d3120633b1bfde22478b28f625a9bb1b8e" +dependencies = [ + "js-sys", + "opentelemetry 0.31.0", + "opentelemetry_sdk 0.31.0", + "rustversion", + "smallvec", + "thiserror 2.0.17", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber 0.3.20", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 9114e9f..2946cd4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ op-alloy-consensus = { version = "0.20.0", features = ["k256"] } tokio = { version = "1.47.1", features = ["full"] } tracing = "0.1.41" -tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } +tracing-subscriber = { version = "0.3.20", features = ["env-filter", "fmt", "json"] } anyhow = "1.0.99" clap = { version = "4.5.47", features = ["derive", "env"] } url = "2.5.7" @@ -72,3 +72,18 @@ backon = "1.5.2" op-revm = { version = "10.1.0", default-features = false } revm-context-interface = "10.2.0" alloy-signer-local = "1.0.36" + +opentelemetry = { version = "0.31.0", features = ["trace"] } +opentelemetry-otlp = { version = "0.31.0", features = [ + "http-proto", + "http-json", + "reqwest-client", + "trace", + "grpc-tonic", +] } +opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio", "rt-tokio-current-thread"] } +tracing-opentelemetry = "0.32.0" +reqwest = { version = "0.12", features = ["json"] } +opentelemetry-datadog = { version = "0.19.0", features = ["reqwest-blocking-client", "reqwest-client"] } +opentelemetry-semantic-conventions = { version = "0.31.0" } +opentelemetry-appender-tracing = "0.31.1" diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 13a581b..d034071 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -35,3 +35,11 @@ op-revm.workspace = true revm-context-interface.workspace = true alloy-signer-local.workspace = true reth-optimism-evm.workspace = true +opentelemetry.workspace = true +opentelemetry-otlp.workspace = true +opentelemetry_sdk.workspace = true +tracing-opentelemetry.workspace = true +reqwest.workspace = true +opentelemetry-datadog = { workspace = true, features = ["reqwest-client", "agent-sampling"]} +opentelemetry-semantic-conventions.workspace = true +opentelemetry-appender-tracing.workspace = true diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 6400bc0..7e01ad1 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -1,15 +1,35 @@ use alloy_provider::{ProviderBuilder, RootProvider}; +//use anyhow::Context; use clap::Parser; use jsonrpsee::server::Server; use op_alloy_network::Optimism; +use opentelemetry::trace::{Span, TraceContextExt, Tracer, TracerProvider}; +use opentelemetry::{Key, KeyValue, Value, global}; +//use opentelemetry_otlp::WithExportConfig; +//use opentelemetry_sdk::Resource; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; +use std::env; use std::fs; use std::net::IpAddr; +use std::time::Duration; use tracing::{info, warn}; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use tracing_opentelemetry::OpenTelemetryLayer; +use tracing_subscriber::Layer; +use tracing_subscriber::filter::{LevelFilter, Targets}; +use tracing_subscriber::layer::SubscriberExt; use url::Url; +//use opentelemetry::{ +// trace::{SamplingResult, Span, TraceContextExt, Tracer}, +// InstrumentationScope, Key, KeyValue, Value, +//}; +use opentelemetry::InstrumentationScope; +//use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; +use opentelemetry_datadog::{ApiVersion, new_pipeline}; //DatadogTraceStateBuilder, +use opentelemetry_sdk::trace::{self, RandomIdGenerator, Sampler}; //ShouldSample +use opentelemetry_semantic_conventions as semcov; + mod queue; mod service; mod validation; @@ -57,8 +77,63 @@ struct Config { default_value = "10800" )] send_transaction_default_lifetime_seconds: u64, + + /// Enable tracing + #[arg(long, env = "TIPS_INGRESS_TRACING_ENABLED", default_value = "false")] + tracing_enabled: bool, + + /// Port for the OTLP endpoint + #[arg(long, env = "TIPS_INGRESS_TRACING_OTLP_PORT", default_value = "4317")] + tracing_otlp_port: u16, } +fn bar() { + let tracer = global::tracer("component-bar"); + let mut span = tracer.start("bar"); + span.set_attribute(KeyValue::new( + Key::new("span.type"), + Value::String("sql".into()), + )); + span.set_attribute(KeyValue::new( + Key::new("sql.query"), + Value::String("SELECT * FROM table".into()), + )); + std::thread::sleep(Duration::from_millis(6)); + span.end() +} + +/*#[derive(Debug, Clone)] +struct AgentBasedSampler; + +impl ShouldSample for AgentBasedSampler { + fn should_sample( + &self, + parent_context: Option<&opentelemetry::Context>, + _trace_id: opentelemetry::trace::TraceId, + _name: &str, + _span_kind: &opentelemetry::trace::SpanKind, + _attributes: &[opentelemetry::KeyValue], + _links: &[opentelemetry::trace::Link], + ) -> opentelemetry::trace::SamplingResult { + let trace_state = parent_context + .map( + |parent_context| parent_context.span().span_context().trace_state().clone(), // inherit sample decision from parent span + ) + .unwrap_or_else(|| { + DatadogTraceStateBuilder::default() + .with_priority_sampling(true) // always sample root span(span without remote or local parent) + .with_measuring(true) // datadog-agent will create metric for this span for APM + .build() + }); + + SamplingResult { + decision: opentelemetry::trace::SamplingDecision::RecordAndSample, // send all spans to datadog-agent + attributes: vec![], + trace_state, + } + } +}*/ + #[tokio::main] async fn main() -> anyhow::Result<()> { dotenvy::dotenv().ok(); @@ -80,21 +155,94 @@ async fn main() -> anyhow::Result<()> { } }; - tracing_subscriber::registry() - .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(log_level.to_string())), + let dd_host = env::var("DD_AGENT_HOST").unwrap_or_else(|_| "localhost".to_string()); + let otlp_endpoint = format!("http://{}:{}", &dd_host, &config.tracing_otlp_port); + + let handle = std::thread::spawn(move || { + // from: https://github.com/flashbots/rollup-boost/blob/08ebd3e75a8f4c7ebc12db13b042dee04e132c05/crates/rollup-boost/src/tracing.rs#L127 + let filter_name = "tips-ingress-rpc".to_string(); + + let global_filter = Targets::new() + .with_default(LevelFilter::INFO) + .with_target(&filter_name, LevelFilter::TRACE); + + let registry = tracing_subscriber::registry().with(global_filter); + + let log_filter = Targets::new() + .with_default(LevelFilter::INFO) + .with_target(&filter_name, log_level); + + let writer = tracing_subscriber::fmt::writer::BoxMakeWriter::new(std::io::stdout); + + global::set_text_map_propagator(opentelemetry_datadog::DatadogPropagator::default()); + let mut trace_cfg = trace::Config::default(); + trace_cfg.sampler = Box::new(Sampler::AlwaysOn); + trace_cfg.id_generator = Box::new(RandomIdGenerator::default()); + + let provider = new_pipeline() + .with_service_name(&filter_name) + .with_api_version(ApiVersion::Version05) + .with_trace_config(trace_cfg) + //.with_http_client(reqwest::Client::new()) + .with_agent_endpoint(&otlp_endpoint) // TODO: do we need to configure HTTP client? + .install_batch() + .expect("Failed to build provider"); // TODO: use batch exporter later + global::set_tracer_provider(provider.clone()); + let scope = InstrumentationScope::builder(filter_name.clone()) + .with_version(env!("CARGO_PKG_VERSION")) + .with_schema_url(semcov::SCHEMA_URL) + .with_attributes(None) + .build(); + let tracer = provider.tracer_with_scope(scope); + tracer.in_span("foo", |cx| { + let span = cx.span(); + span.set_attribute(KeyValue::new( + Key::new("span.type"), + Value::String("web".into()), + )); + span.set_attribute(KeyValue::new( + Key::new("http.url"), + Value::String("http://localhost:8080/foo".into()), + )); + span.set_attribute(KeyValue::new( + Key::new("http.method"), + Value::String("GET".into()), + )); + span.set_attribute(KeyValue::new(Key::new("http.status_code"), Value::I64(200))); + + std::thread::sleep(Duration::from_millis(6)); + bar(); + std::thread::sleep(Duration::from_millis(6)); + }); + + let trace_filter = Targets::new() + .with_default(LevelFilter::OFF) + .with_target(&filter_name, LevelFilter::TRACE); + + let registry = registry.with(OpenTelemetryLayer::new(tracer).with_filter(trace_filter)); + + tracing::subscriber::set_global_default( + registry.with( + tracing_subscriber::fmt::layer() + .json() + .with_ansi(false) + .with_writer(writer) + .with_filter(log_filter.clone()), + ), ) - .with(tracing_subscriber::fmt::layer()) - .init(); + .expect("Failed to set global default"); + }); + handle.join().expect("Failed to join thread"); + info!( message = "Starting ingress service", address = %config.address, port = config.port, - mempool_url = %config.mempool_url + mempool_url = %config.mempool_url, + endpoint = %format!("http://{}:{}", &dd_host, &config.tracing_otlp_port) ); - let provider: RootProvider = ProviderBuilder::new() + let op_provider: RootProvider = ProviderBuilder::new() .disable_recommended_fillers() .network::() .connect_http(config.mempool_url); @@ -106,7 +254,7 @@ async fn main() -> anyhow::Result<()> { let queue = KafkaQueuePublisher::new(queue_producer, config.ingress_topic); let service = IngressService::new( - provider, + op_provider, config.dual_write_mempool, queue, config.send_transaction_default_lifetime_seconds, @@ -123,6 +271,8 @@ async fn main() -> anyhow::Result<()> { ); handle.stopped().await; + // TODO: might need shutdown + // let _ = provider.shutdown(); Ok(()) } @@ -144,3 +294,89 @@ fn load_kafka_config_from_file(properties_file_path: &str) -> anyhow::Result (Layered, Targets, BoxMakeWriter) { + let global_filter = Targets::new() + .with_default(LevelFilter::INFO) + .with_target(FILTER_NAME, LevelFilter::TRACE); + + let registry: Layered = + tracing_subscriber::registry().with(global_filter); + + let log_filter = Targets::new() + .with_default(LevelFilter::INFO) + .with_target(FILTER_NAME, LevelFilter::TRACE); + + let writer = BoxMakeWriter::new(std::io::stdout); + + (registry, log_filter, writer) + } + + fn build_provider() -> SdkTracerProvider { + let mut trace_cfg = trace::Config::default(); + trace_cfg.sampler = Box::new(Sampler::AlwaysOn); + trace_cfg.id_generator = Box::new(RandomIdGenerator::default()); + + let provider = new_pipeline() + .with_service_name(FILTER_NAME) + .with_api_version(ApiVersion::Version05) + .with_trace_config(trace_cfg) + .with_agent_endpoint("http://localhost:4317") + .install_simple() + .expect("Failed to build provider"); + provider + } + + #[tokio::test] + async fn test_build_provider() { + let handle = std::thread::spawn(|| { + build_provider(); + }); + + handle.join().unwrap(); + } + + #[tokio::test] + async fn test_build_tracer() { + let handle = std::thread::spawn(move || { + let (registry, log_filter, writer) = setup(); + + let provider = build_provider(); + global::set_tracer_provider(provider.clone()); + let scope = InstrumentationScope::builder(FILTER_NAME.to_string()) + .with_version(env!("CARGO_PKG_VERSION")) + .with_schema_url(semcov::SCHEMA_URL) + .with_attributes(None) + .build(); + let tracer = provider.tracer_with_scope(scope); + + let trace_filter = Targets::new() + .with_default(LevelFilter::OFF) + .with_target(FILTER_NAME, LevelFilter::TRACE); + + let registry = registry.with(OpenTelemetryLayer::new(tracer).with_filter(trace_filter)); + + tracing::subscriber::set_global_default( + registry.with( + tracing_subscriber::fmt::layer() + .json() + .with_ansi(false) + .with_writer(writer) + .with_filter(log_filter.clone()), + ), + ) + .expect("Failed to set global default"); + }); + handle.join().expect("Failed to join thread"); + } +} diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index a48841d..c783c7e 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -9,9 +9,10 @@ use jsonrpsee::{ }; use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::Optimism; +use opentelemetry::{global, trace::Tracer}; use reth_rpc_eth_types::EthApiError; use std::time::{SystemTime, UNIX_EPOCH}; -use tracing::{info, warn}; +use tracing::{Instrument, info, span, warn}; use crate::queue::QueuePublisher; @@ -94,12 +95,18 @@ where .await?; validate_tx(account, &transaction, &data, &mut l1_block_info).await?; + let span = span!(tracing::Level::TRACE, "span_ethsendbundle", transaction = %transaction.tx_hash()); + let _enter = span.enter(); let expiry_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs() + self.send_transaction_default_lifetime_seconds; + drop(_enter); + let span = + span!(tracing::Level::INFO, "span_ethsendbundle", transaction = %transaction.tx_hash()); + let _enter = span.enter(); let bundle = EthSendBundle { txs: vec![data.clone()], block_number: 0, @@ -108,12 +115,18 @@ where reverting_tx_hashes: vec![transaction.tx_hash()], ..Default::default() }; + drop(_enter); // queue the bundle let sender = transaction.signer(); - if let Err(e) = self.queue.publish(&bundle, sender).await { - warn!(message = "Failed to publish Queue::enqueue_bundle", sender = %sender, error = %e); - } + let t = global::tracer("queue_publish"); + t.in_span("queue_publish_span", async |_| { + let span = + span!(tracing::Level::INFO, "span_publish", transaction = %transaction.tx_hash()); + if let Err(e) = self.queue.publish(&bundle, sender).instrument(span).await { + warn!(message = "Failed to publish Queue::enqueue_bundle", sender = %sender, error = %e); + } + }).await; info!(message="queued singleton bundle", txn_hash=%transaction.tx_hash());