diff --git a/.env.example b/.env.example index 0ff9039..66c83c5 100644 --- a/.env.example +++ b/.env.example @@ -17,6 +17,7 @@ TIPS_INGRESS_BLOCK_TIME_MILLISECONDS=2000 TIPS_INGRESS_METER_BUNDLE_TIMEOUT_MS=2000 TIPS_INGRESS_MAX_BUFFERED_METER_BUNDLE_RESPONSES=100 TIPS_INGRESS_BUILDER_RPCS=http://localhost:2222,http://localhost:2222,http://localhost:2222 +TIPS_INGRESS_BACKRUN_ENABLED=true # Audit service configuration TIPS_AUDIT_KAFKA_PROPERTIES_FILE=/app/docker/audit-kafka-properties diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index 4d0f2b2..d5fd543 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -5,9 +5,9 @@ use op_alloy_network::Optimism; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; use tips_audit::{BundleEvent, KafkaBundleEventPublisher, connect_audit_to_publisher}; -use tips_core::MeterBundleResponse; use tips_core::kafka::load_kafka_config_from_file; use tips_core::logger::init_logger_with_format; +use tips_core::{Bundle, MeterBundleResponse}; use tips_ingress_rpc::Config; use tips_ingress_rpc::connect_ingress_to_builder; use tips_ingress_rpc::health::bind_health_server; @@ -68,9 +68,11 @@ async fn main() -> anyhow::Result<()> { let (builder_tx, _) = broadcast::channel::(config.max_buffered_meter_bundle_responses); + let (builder_backrun_tx, _) = broadcast::channel::(config.max_buffered_backrun_bundles); config.builder_rpcs.iter().for_each(|builder_rpc| { - let builder_rx = builder_tx.subscribe(); - connect_ingress_to_builder(builder_rx, builder_rpc.clone()); + let metering_rx = builder_tx.subscribe(); + let backrun_rx = builder_backrun_tx.subscribe(); + connect_ingress_to_builder(metering_rx, backrun_rx, builder_rpc.clone()); }); let health_check_addr = config.health_check_addr; @@ -86,6 +88,7 @@ async fn main() -> anyhow::Result<()> { queue, audit_tx, builder_tx, + builder_backrun_tx, cfg, ); let bind_addr = format!("{}:{}", config.address, config.port); diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index f84fa63..8115f28 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -138,6 +138,14 @@ pub struct Config { )] pub max_buffered_meter_bundle_responses: usize, + /// Maximum number of backrun bundles to buffer in memory + #[arg( + long, + env = "TIPS_INGRESS_MAX_BUFFERED_BACKRUN_BUNDLES", + default_value = "100" + )] + pub max_buffered_backrun_bundles: usize, + /// Address to bind the health check server to #[arg( long, @@ -145,23 +153,29 @@ pub struct Config { default_value = "0.0.0.0:8081" )] pub health_check_addr: SocketAddr, + + /// Enable backrun bundle submission to op-rbuilder + #[arg(long, env = "TIPS_INGRESS_BACKRUN_ENABLED", default_value = "false")] + pub backrun_enabled: bool, } pub fn connect_ingress_to_builder( - event_rx: broadcast::Receiver, + metering_rx: broadcast::Receiver, + backrun_rx: broadcast::Receiver, builder_rpc: Url, ) { - tokio::spawn(async move { - let builder: RootProvider = ProviderBuilder::new() - .disable_recommended_fillers() - .network::() - .connect_http(builder_rpc); + let builder: RootProvider = ProviderBuilder::new() + .disable_recommended_fillers() + .network::() + .connect_http(builder_rpc); - let mut event_rx = event_rx; + let metering_builder = builder.clone(); + tokio::spawn(async move { + let mut event_rx = metering_rx; while let Ok(event) = event_rx.recv().await { // we only support one transaction per bundle for now let tx_hash = event.results[0].tx_hash; - if let Err(e) = builder + if let Err(e) = metering_builder .client() .request::<(TxHash, MeterBundleResponse), ()>( "base_setMeteringInformation", @@ -173,4 +187,17 @@ pub fn connect_ingress_to_builder( } } }); + + tokio::spawn(async move { + let mut event_rx = backrun_rx; + while let Ok(bundle) = event_rx.recv().await { + if let Err(e) = builder + .client() + .request::<(tips_core::Bundle,), ()>("base_sendBackrunBundle", (bundle,)) + .await + { + error!(error = %e, "Failed to send backrun bundle to builder"); + } + } + }); } diff --git a/crates/ingress-rpc/src/metrics.rs b/crates/ingress-rpc/src/metrics.rs index eb2e34c..b41eb01 100644 --- a/crates/ingress-rpc/src/metrics.rs +++ b/crates/ingress-rpc/src/metrics.rs @@ -1,4 +1,4 @@ -use metrics::Histogram; +use metrics::{Counter, Histogram}; use metrics_derive::Metrics; use metrics_exporter_prometheus::PrometheusBuilder; use std::net::SocketAddr; @@ -29,6 +29,12 @@ pub struct Metrics { #[metric(describe = "Duration of send_raw_transaction")] pub send_raw_transaction_duration: Histogram, + + #[metric(describe = "Total backrun bundles received")] + pub backrun_bundles_received_total: Counter, + + #[metric(describe = "Duration to send backrun bundle to op-rbuilder")] + pub backrun_bundles_sent_duration: Histogram, } /// Initialize Prometheus metrics exporter diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index b93eaa7..c26d971 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -31,6 +31,9 @@ pub trait IngressApi { #[method(name = "sendBundle")] async fn send_bundle(&self, bundle: Bundle) -> RpcResult; + #[method(name = "sendBackrunBundle")] + async fn send_backrun_bundle(&self, bundle: Bundle) -> RpcResult; + /// `eth_cancelBundle` is used to prevent a submitted bundle from being included on-chain. #[method(name = "cancelBundle")] async fn cancel_bundle(&self, request: CancelBundle) -> RpcResult<()>; @@ -58,6 +61,8 @@ pub struct IngressService { block_time_milliseconds: u64, meter_bundle_timeout_ms: u64, builder_tx: broadcast::Sender, + backrun_enabled: bool, + builder_backrun_tx: broadcast::Sender, } impl IngressService { @@ -67,6 +72,7 @@ impl IngressService { queue: Queue, audit_channel: mpsc::UnboundedSender, builder_tx: broadcast::Sender, + builder_backrun_tx: broadcast::Sender, config: Config, ) -> Self { Self { @@ -81,6 +87,8 @@ impl IngressService { block_time_milliseconds: config.block_time_milliseconds, meter_bundle_timeout_ms: config.meter_bundle_timeout_ms, builder_tx, + backrun_enabled: config.backrun_enabled, + builder_backrun_tx, } } } @@ -90,16 +98,45 @@ impl IngressApiServer for IngressService where Queue: QueuePublisher + Sync + Send + 'static, { + async fn send_backrun_bundle(&self, bundle: Bundle) -> RpcResult { + if !self.backrun_enabled { + info!( + message = "Backrun bundle submission is disabled", + backrun_enabled = self.backrun_enabled + ); + return Err( + EthApiError::InvalidParams("Backrun bundle submission is disabled".into()) + .into_rpc_err(), + ); + } + + let start = Instant::now(); + let (accepted_bundle, bundle_hash) = self.validate_parse_and_meter_bundle(&bundle).await?; + + self.metrics.backrun_bundles_received_total.increment(1); + + if let Err(e) = self.builder_backrun_tx.send(bundle) { + warn!( + message = "Failed to send backrun bundle to builders", + bundle_hash = %bundle_hash, + error = %e + ); + } + + self.send_audit_event(&accepted_bundle, bundle_hash); + + self.metrics + .backrun_bundles_sent_duration + .record(start.elapsed().as_secs_f64()); + + Ok(BundleHash { bundle_hash }) + } + async fn send_bundle(&self, bundle: Bundle) -> RpcResult { - // validate the bundle and consume the `bundle` to get an `AcceptedBundle` - self.validate_bundle(&bundle).await?; - let parsed_bundle: ParsedBundle = bundle - .clone() - .try_into() - .map_err(|e: String| EthApiError::InvalidParams(e).into_rpc_err())?; - let bundle_hash = &parsed_bundle.bundle_hash(); - let meter_bundle_response = self.meter_bundle(&bundle, bundle_hash).await?; - let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response.clone()); + let (accepted_bundle, bundle_hash) = self.validate_parse_and_meter_bundle(&bundle).await?; + + // Get meter_bundle_response for builder broadcast + let meter_bundle_response = accepted_bundle.meter_bundle_response.clone(); // asynchronously send the meter bundle response to the builder self.builder_tx @@ -109,7 +146,7 @@ where // publish the bundle to the queue if let Err(e) = self .bundle_queue - .publish(&accepted_bundle, bundle_hash) + .publish(&accepted_bundle, &bundle_hash) .await { warn!(message = "Failed to publish bundle to queue", bundle_hash = %bundle_hash, error = %e); @@ -122,20 +159,9 @@ where ); // asynchronously send the audit event to the audit channel - let audit_event = BundleEvent::Received { - bundle_id: *accepted_bundle.uuid(), - bundle: Box::new(accepted_bundle.clone()), - }; - if let Err(e) = self.audit_channel.send(audit_event) { - warn!(message = "Failed to send audit event", error = %e); - return Err( - EthApiError::InvalidParams("Failed to send audit event".into()).into_rpc_err(), - ); - } + self.send_audit_event(&accepted_bundle, bundle_hash); - Ok(BundleHash { - bundle_hash: *bundle_hash, - }) + Ok(BundleHash { bundle_hash }) } async fn cancel_bundle(&self, _request: CancelBundle) -> RpcResult<()> { @@ -212,13 +238,7 @@ where } } - let audit_event = BundleEvent::Received { - bundle_id: *accepted_bundle.uuid(), - bundle: accepted_bundle.clone().into(), - }; - if let Err(e) = self.audit_channel.send(audit_event) { - warn!(message = "Failed to send audit event", error = %e); - } + self.send_audit_event(&accepted_bundle, transaction.tx_hash()); self.metrics .send_raw_transaction_duration @@ -327,6 +347,37 @@ where } Ok(res) } + + /// Helper method to validate, parse, and meter a bundle + async fn validate_parse_and_meter_bundle( + &self, + bundle: &Bundle, + ) -> RpcResult<(AcceptedBundle, B256)> { + self.validate_bundle(bundle).await?; + let parsed_bundle: ParsedBundle = bundle + .clone() + .try_into() + .map_err(|e: String| EthApiError::InvalidParams(e).into_rpc_err())?; + let bundle_hash = parsed_bundle.bundle_hash(); + let meter_bundle_response = self.meter_bundle(bundle, &bundle_hash).await?; + let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response.clone()); + Ok((accepted_bundle, bundle_hash)) + } + + /// Helper method to send audit event for a bundle + fn send_audit_event(&self, accepted_bundle: &AcceptedBundle, bundle_hash: B256) { + let audit_event = BundleEvent::Received { + bundle_id: *accepted_bundle.uuid(), + bundle: Box::new(accepted_bundle.clone()), + }; + if let Err(e) = self.audit_channel.send(audit_event) { + warn!( + message = "Failed to send audit event", + bundle_hash = %bundle_hash, + error = %e + ); + } + } } #[cfg(test)] diff --git a/justfile b/justfile index 2d4398e..4c30a66 100644 --- a/justfile +++ b/justfile @@ -99,6 +99,9 @@ get-blocks: sender := "0x70997970C51812dc3A010C7d01b50e0d17dc79C8" sender_key := "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d" +backrunner := "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC" +backrunner_key := "0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a" + send-txn: #!/usr/bin/env bash set -euxo pipefail @@ -108,3 +111,67 @@ send-txn: hash=$(curl -s {{ ingress_url }} -X POST -H "Content-Type: application/json" --data "{\"method\":\"eth_sendRawTransaction\",\"params\":[\"$txn\"],\"id\":1,\"jsonrpc\":\"2.0\"}" | jq -r ".result") cast receipt $hash -r {{ sequencer_url }} | grep status cast receipt $hash -r {{ builder_url }} | grep status + +send-txn-with-backrun: + #!/usr/bin/env bash + set -euxo pipefail + + # 1. Get nonce and send target transaction from sender account + nonce=$(cast nonce {{ sender }} -r {{ builder_url }}) + echo "Sending target transaction from sender (nonce=$nonce)..." + target_txn=$(cast mktx --private-key {{ sender_key }} \ + 0x0000000000000000000000000000000000000000 \ + --value 0.01ether \ + --nonce $nonce \ + --chain-id 13 \ + -r {{ builder_url }}) + + target_hash=$(curl -s {{ ingress_url }} -X POST \ + -H "Content-Type: application/json" \ + --data "{\"method\":\"eth_sendRawTransaction\",\"params\":[\"$target_txn\"],\"id\":1,\"jsonrpc\":\"2.0\"}" \ + | jq -r ".result") + echo "Target tx sent: $target_hash" + + # 2. Build backrun transaction from backrunner account (different account!) + backrun_nonce=$(cast nonce {{ backrunner }} -r {{ builder_url }}) + echo "Building backrun transaction from backrunner (nonce=$backrun_nonce)..." + backrun_txn=$(cast mktx --private-key {{ backrunner_key }} \ + 0x0000000000000000000000000000000000000001 \ + --value 0.001ether \ + --nonce $backrun_nonce \ + --chain-id 13 \ + -r {{ builder_url }}) + + # 3. Compute tx hashes for reverting_tx_hashes + backrun_hash_computed=$(cast keccak $backrun_txn) + echo "Target tx hash: $target_hash" + echo "Backrun tx hash: $backrun_hash_computed" + + # 4. Construct and send bundle with reverting_tx_hashes + echo "Sending backrun bundle..." + bundle_json=$(jq -n \ + --arg target "$target_txn" \ + --arg backrun "$backrun_txn" \ + --arg target_hash "$target_hash" \ + --arg backrun_hash "$backrun_hash_computed" \ + '{ + txs: [$target, $backrun], + blockNumber: 0, + revertingTxHashes: [$target_hash, $backrun_hash] + }') + + bundle_hash=$(curl -s {{ ingress_url }} -X POST \ + -H "Content-Type: application/json" \ + --data "{\"method\":\"eth_sendBackrunBundle\",\"params\":[$bundle_json],\"id\":1,\"jsonrpc\":\"2.0\"}" \ + | jq -r ".result") + echo "Bundle sent: $bundle_hash" + + # 5. Wait and verify both transactions + echo "Waiting for transactions to land..." + sleep 5 + + echo "=== Target transaction (from sender) ===" + cast receipt $target_hash -r {{ sequencer_url }} | grep -E "(status|blockNumber|transactionIndex)" + + echo "=== Backrun transaction (from backrunner) ===" + cast receipt $backrun_hash_computed -r {{ sequencer_url }} | grep -E "(status|blockNumber|transactionIndex)" || echo "Backrun tx not found yet"