From 8d481c631de9583183180afa7cfa91e3135c461d Mon Sep 17 00:00:00 2001 From: William Law Date: Fri, 31 Oct 2025 11:02:20 -0400 Subject: [PATCH 01/20] spike --- crates/bundle-pool/src/lib.rs | 2 +- crates/core/src/lib.rs | 3 ++- crates/core/src/types.rs | 38 ++++++++++++++++++++++++------- crates/ingress-rpc/src/queue.rs | 4 +++- crates/ingress-rpc/src/service.rs | 3 ++- 5 files changed, 38 insertions(+), 12 deletions(-) diff --git a/crates/bundle-pool/src/lib.rs b/crates/bundle-pool/src/lib.rs index 6aeb248..ccc5cde 100644 --- a/crates/bundle-pool/src/lib.rs +++ b/crates/bundle-pool/src/lib.rs @@ -8,7 +8,7 @@ use tracing::error; pub use pool::{Action, BundleStore, InMemoryBundlePool, ProcessedBundle}; pub use source::KafkaBundleSource; -pub use tips_core::{Bundle, BundleHash, BundleWithMetadata, CancelBundle}; +pub use tips_core::{Bundle, BundleWithMetadata, CancelBundle}; pub fn connect_sources_to_pool( sources: Vec, diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 5b7906c..d1419d3 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -6,5 +6,6 @@ pub mod types; pub mod test_utils; pub use types::{ - BLOCK_TIME, Bundle, BundleHash, BundleWithMetadata, CancelBundle, MeterBundleResponse, + BLOCK_TIME, Bundle, BundleHash, BundleProperties, BundleWithMetadata, CancelBundle, + MeterBundleResponse, }; diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index a38abf4..dce0a5a 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -10,6 +10,10 @@ use uuid::Uuid; /// Block time in microseconds pub const BLOCK_TIME: u128 = 2_000_000; +pub trait BundleProperties { + fn bundle_hash(&self) -> B256; +} + #[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct Bundle { @@ -56,6 +60,26 @@ pub struct Bundle { pub dropping_tx_hashes: Vec, } +impl BundleProperties for Bundle { + fn bundle_hash(&self) -> B256 { + let transactions: Vec = self + .txs + .iter() + .map(|b| { + OpTxEnvelope::decode_2718_exact(b) + .map_err(|e| format!("failed to decode transaction: {e}")) + }) + .collect::, _>>() + .expect("failed to decode transactions"); + + let mut concatenated = Vec::new(); + for tx in transactions { + concatenated.extend_from_slice(tx.tx_hash().as_slice()); + } + keccak256(&concatenated) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct BundleHash { @@ -115,14 +139,6 @@ impl BundleWithMetadata { &self.uuid } - pub fn bundle_hash(&self) -> B256 { - let mut concatenated = Vec::new(); - for tx in self.transactions() { - concatenated.extend_from_slice(tx.tx_hash().as_slice()); - } - keccak256(&concatenated) - } - pub fn txn_hashes(&self) -> Vec { self.transactions().iter().map(|t| t.tx_hash()).collect() } @@ -150,6 +166,12 @@ impl BundleWithMetadata { } } +impl BundleProperties for BundleWithMetadata { + fn bundle_hash(&self) -> B256 { + self.bundle.bundle_hash() + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct TransactionResult { diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index 0f60520..fc91b50 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -75,7 +75,9 @@ impl QueuePublisher for KafkaQueuePublisher { mod tests { use super::*; use rdkafka::config::ClientConfig; - use tips_core::{Bundle, BundleWithMetadata, test_utils::create_test_meter_bundle_response}; + use tips_core::{ + Bundle, BundleProperties, BundleWithMetadata, test_utils::create_test_meter_bundle_response, + }; use tokio::time::{Duration, Instant}; fn create_test_bundle() -> Bundle { diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index fe3ce65..c847d60 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -12,7 +12,8 @@ use reth_rpc_eth_types::EthApiError; use std::time::{SystemTime, UNIX_EPOCH}; use tips_audit::{BundleEvent, BundleEventPublisher}; use tips_core::{ - BLOCK_TIME, Bundle, BundleHash, BundleWithMetadata, CancelBundle, MeterBundleResponse, + BLOCK_TIME, Bundle, BundleHash, BundleProperties, BundleWithMetadata, CancelBundle, + MeterBundleResponse, }; use tracing::{info, warn}; From c26a3f0d6e36302de06aa9edf07de7d1fb8e8d3a Mon Sep 17 00:00:00 2001 From: William Law Date: Fri, 31 Oct 2025 14:02:25 -0400 Subject: [PATCH 02/20] spike from --- crates/core/src/lib.rs | 2 +- crates/core/src/types.rs | 140 ++++++++++++++++-------------- crates/ingress-rpc/src/queue.rs | 6 +- crates/ingress-rpc/src/service.rs | 11 +-- 4 files changed, 85 insertions(+), 74 deletions(-) diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index d1419d3..4209de5 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -6,6 +6,6 @@ pub mod types; pub mod test_utils; pub use types::{ - BLOCK_TIME, Bundle, BundleHash, BundleProperties, BundleWithMetadata, CancelBundle, + BLOCK_TIME, Bundle, BundleHash, BundleTransactions, BundleWithMetadata, CancelBundle, MeterBundleResponse, }; diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index dce0a5a..de8ab29 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -10,8 +10,66 @@ use uuid::Uuid; /// Block time in microseconds pub const BLOCK_TIME: u128 = 2_000_000; -pub trait BundleProperties { - fn bundle_hash(&self) -> B256; +pub struct BundleTransactions(Vec); + +impl From> for BundleTransactions { + fn from(txs: Vec) -> Self { + BundleTransactions(txs) + } +} + +impl BundleTransactions { + pub fn bundle_hash(&self) -> B256 { + let mut concatenated = Vec::new(); + for tx in self.0.iter() { + concatenated.extend_from_slice(tx); + } + keccak256(&concatenated) + } + + /// Get transaction hashes for all transactions in the bundle + pub fn txn_hashes(&self) -> Result, String> { + self.transactions()? + .iter() + .map(|t| Ok(t.tx_hash())) + .collect() + } + + /// Get sender addresses for all transactions in the bundle + pub fn senders(&self) -> Result, String> { + self.transactions()? + .iter() + .map(|t| { + t.recover_signer() + .map_err(|e| format!("failed to recover signer: {e}")) + }) + .collect() + } + + /// Get total gas limit for all transactions in the bundle + pub fn gas_limit(&self) -> Result { + Ok(self.transactions()?.iter().map(|t| t.gas_limit()).sum()) + } + + /// Get total data availability size for all transactions in the bundle + pub fn da_size(&self) -> Result { + Ok(self + .transactions()? + .iter() + .map(|t| tx_estimated_size_fjord_bytes(&t.encoded_2718())) + .sum()) + } + + /// Decode all transactions from bytes to OpTxEnvelope + pub fn transactions(&self) -> Result, String> { + self.0 + .iter() + .map(|b| { + OpTxEnvelope::decode_2718_exact(b) + .map_err(|e| format!("failed to decode transaction: {e}")) + }) + .collect() + } } #[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] @@ -60,26 +118,6 @@ pub struct Bundle { pub dropping_tx_hashes: Vec, } -impl BundleProperties for Bundle { - fn bundle_hash(&self) -> B256 { - let transactions: Vec = self - .txs - .iter() - .map(|b| { - OpTxEnvelope::decode_2718_exact(b) - .map_err(|e| format!("failed to decode transaction: {e}")) - }) - .collect::, _>>() - .expect("failed to decode transactions"); - - let mut concatenated = Vec::new(); - for tx in transactions { - concatenated.extend_from_slice(tx.tx_hash().as_slice()); - } - keccak256(&concatenated) - } -} - #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct BundleHash { @@ -131,45 +169,13 @@ impl BundleWithMetadata { }) } - pub fn transactions(&self) -> &[OpTxEnvelope] { - self.transactions.as_slice() - } - pub fn uuid(&self) -> &Uuid { &self.uuid } - pub fn txn_hashes(&self) -> Vec { - self.transactions().iter().map(|t| t.tx_hash()).collect() - } - pub fn bundle(&self) -> &Bundle { &self.bundle } - - pub fn senders(&self) -> Vec
{ - self.transactions() - .iter() - .map(|t| t.recover_signer().unwrap()) - .collect() - } - - pub fn gas_limit(&self) -> u64 { - self.transactions.iter().map(|t| t.gas_limit()).sum() - } - - pub fn da_size(&self) -> u64 { - self.transactions - .iter() - .map(|t| tx_estimated_size_fjord_bytes(&t.encoded_2718())) - .sum() - } -} - -impl BundleProperties for BundleWithMetadata { - fn bundle_hash(&self) -> B256 { - self.bundle.bundle_hash() - } } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -242,10 +248,11 @@ mod tests { bundle.bundle.replacement_uuid, Some(bundle.uuid().to_string()) ); - assert_eq!(bundle.txn_hashes().len(), 1); - assert_eq!(bundle.txn_hashes()[0], tx1.tx_hash()); - assert_eq!(bundle.senders().len(), 1); - assert_eq!(bundle.senders()[0], alice.address()); + let bundle_txs: BundleTransactions = bundle.bundle().txs.clone().into(); + assert_eq!(bundle_txs.txn_hashes().unwrap().len(), 1); + assert_eq!(bundle_txs.txn_hashes().unwrap()[0], tx1.tx_hash()); + assert_eq!(bundle_txs.senders().unwrap().len(), 1); + assert_eq!(bundle_txs.senders().unwrap()[0], alice.address()); // Bundle hashes are keccack256(...txnHashes) let expected_bundle_hash_single = { @@ -254,7 +261,7 @@ mod tests { hasher.finalize() }; - assert_eq!(bundle.bundle_hash(), expected_bundle_hash_single); + assert_eq!(bundle_txs.bundle_hash(), expected_bundle_hash_single); let uuid = Uuid::new_v4(); let bundle = BundleWithMetadata::load( @@ -270,12 +277,13 @@ mod tests { assert_eq!(*bundle.uuid(), uuid); assert_eq!(bundle.bundle.replacement_uuid, Some(uuid.to_string())); - assert_eq!(bundle.txn_hashes().len(), 2); - assert_eq!(bundle.txn_hashes()[0], tx1.tx_hash()); - assert_eq!(bundle.txn_hashes()[1], tx2.tx_hash()); - assert_eq!(bundle.senders().len(), 2); - assert_eq!(bundle.senders()[0], alice.address()); - assert_eq!(bundle.senders()[1], alice.address()); + let bundle_txs2: BundleTransactions = bundle.bundle().txs.clone().into(); + assert_eq!(bundle_txs2.txn_hashes().unwrap().len(), 2); + assert_eq!(bundle_txs2.txn_hashes().unwrap()[0], tx1.tx_hash()); + assert_eq!(bundle_txs2.txn_hashes().unwrap()[1], tx2.tx_hash()); + assert_eq!(bundle_txs2.senders().unwrap().len(), 2); + assert_eq!(bundle_txs2.senders().unwrap()[0], alice.address()); + assert_eq!(bundle_txs2.senders().unwrap()[1], alice.address()); let expected_bundle_hash_double = { let mut hasher = Keccak256::default(); @@ -284,7 +292,7 @@ mod tests { hasher.finalize() }; - assert_eq!(bundle.bundle_hash(), expected_bundle_hash_double); + assert_eq!(bundle_txs2.bundle_hash(), expected_bundle_hash_double); } #[test] diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index fc91b50..165a416 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -76,7 +76,8 @@ mod tests { use super::*; use rdkafka::config::ClientConfig; use tips_core::{ - Bundle, BundleProperties, BundleWithMetadata, test_utils::create_test_meter_bundle_response, + Bundle, BundleTransactions, BundleWithMetadata, + test_utils::create_test_meter_bundle_response, }; use tokio::time::{Duration, Instant}; @@ -97,7 +98,8 @@ mod tests { let bundle = create_test_bundle(); let bundle_with_metadata = BundleWithMetadata::load(bundle.clone(), create_test_meter_bundle_response()).unwrap(); - let bundle_hash = bundle_with_metadata.bundle_hash(); + let bundle_txs: BundleTransactions = bundle.txs.into(); + let bundle_hash = bundle_txs.bundle_hash(); let start = Instant::now(); let result = publisher.publish(&bundle_with_metadata, &bundle_hash).await; diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index c847d60..51ec860 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -12,7 +12,7 @@ use reth_rpc_eth_types::EthApiError; use std::time::{SystemTime, UNIX_EPOCH}; use tips_audit::{BundleEvent, BundleEventPublisher}; use tips_core::{ - BLOCK_TIME, Bundle, BundleHash, BundleProperties, BundleWithMetadata, CancelBundle, + BLOCK_TIME, Bundle, BundleHash, BundleTransactions, BundleWithMetadata, CancelBundle, MeterBundleResponse, }; use tracing::{info, warn}; @@ -76,7 +76,8 @@ where let bundle_with_metadata = BundleWithMetadata::load(bundle, meter_bundle_response) .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; - let bundle_hash = bundle_with_metadata.bundle_hash(); + let bundle_txs: BundleTransactions = bundle_with_metadata.bundle().txs.clone().into(); + let bundle_hash = bundle_txs.bundle_hash(); if let Err(e) = self .bundle_queue .publish(&bundle_with_metadata, &bundle_hash) @@ -89,7 +90,6 @@ where info!( message = "queued bundle", bundle_hash = %bundle_hash, - tx_count = bundle_with_metadata.transactions().len(), ); let audit_event = BundleEvent::Received { @@ -128,9 +128,10 @@ where }; let meter_bundle_response = self.meter_bundle(&bundle).await?; - let bundle_with_metadata = BundleWithMetadata::load(bundle, meter_bundle_response) + let bundle_with_metadata = BundleWithMetadata::load(bundle.clone(), meter_bundle_response) .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; - let bundle_hash = bundle_with_metadata.bundle_hash(); + let bundle_txs: BundleTransactions = bundle.txs.into(); + let bundle_hash = bundle_txs.bundle_hash(); if let Err(e) = self .bundle_queue From 29a8f0143a20964febfb268ac952ac9b5ce05e8b Mon Sep 17 00:00:00 2001 From: William Law Date: Fri, 31 Oct 2025 14:03:05 -0400 Subject: [PATCH 03/20] remove transactions --- crates/core/src/types.rs | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index de8ab29..c2d4452 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -134,7 +134,6 @@ pub struct CancelBundle { pub struct BundleWithMetadata { bundle: Bundle, uuid: Uuid, - transactions: Vec, meter_bundle_response: MeterBundleResponse, } @@ -152,18 +151,8 @@ impl BundleWithMetadata { bundle.replacement_uuid = Some(uuid.to_string()); - let transactions: Vec = bundle - .txs - .iter() - .map(|b| { - OpTxEnvelope::decode_2718_exact(b) - .map_err(|e| format!("failed to decode transaction: {e}")) - }) - .collect::, _>>()?; - Ok(BundleWithMetadata { bundle, - transactions, uuid, meter_bundle_response, }) From 498b243c92879d73b484cf9a7fa9aacb4070e8b2 Mon Sep 17 00:00:00 2001 From: William Law Date: Fri, 31 Oct 2025 17:01:12 -0400 Subject: [PATCH 04/20] fix test --- crates/core/src/types.rs | 19 +++++++++++++------ crates/ingress-rpc/src/queue.rs | 2 +- crates/ingress-rpc/src/service.rs | 8 ++++++-- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index c2d4452..f358c62 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -19,12 +19,13 @@ impl From> for BundleTransactions { } impl BundleTransactions { - pub fn bundle_hash(&self) -> B256 { + pub fn bundle_hash(&self) -> Result { let mut concatenated = Vec::new(); - for tx in self.0.iter() { - concatenated.extend_from_slice(tx); + let txs = self.transactions()?; + for tx in txs.iter() { + concatenated.extend_from_slice(tx.tx_hash().as_slice()); } - keccak256(&concatenated) + Ok(keccak256(&concatenated)) } /// Get transaction hashes for all transactions in the bundle @@ -250,7 +251,10 @@ mod tests { hasher.finalize() }; - assert_eq!(bundle_txs.bundle_hash(), expected_bundle_hash_single); + assert_eq!( + bundle_txs.bundle_hash().unwrap(), + expected_bundle_hash_single + ); let uuid = Uuid::new_v4(); let bundle = BundleWithMetadata::load( @@ -281,7 +285,10 @@ mod tests { hasher.finalize() }; - assert_eq!(bundle_txs2.bundle_hash(), expected_bundle_hash_double); + assert_eq!( + bundle_txs2.bundle_hash().unwrap(), + expected_bundle_hash_double + ); } #[test] diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index 165a416..3538c42 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -99,7 +99,7 @@ mod tests { let bundle_with_metadata = BundleWithMetadata::load(bundle.clone(), create_test_meter_bundle_response()).unwrap(); let bundle_txs: BundleTransactions = bundle.txs.into(); - let bundle_hash = bundle_txs.bundle_hash(); + let bundle_hash = bundle_txs.bundle_hash().unwrap(); let start = Instant::now(); let result = publisher.publish(&bundle_with_metadata, &bundle_hash).await; diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 51ec860..2de7241 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -77,7 +77,9 @@ where .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; let bundle_txs: BundleTransactions = bundle_with_metadata.bundle().txs.clone().into(); - let bundle_hash = bundle_txs.bundle_hash(); + let bundle_hash = bundle_txs + .bundle_hash() + .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; if let Err(e) = self .bundle_queue .publish(&bundle_with_metadata, &bundle_hash) @@ -131,7 +133,9 @@ where let bundle_with_metadata = BundleWithMetadata::load(bundle.clone(), meter_bundle_response) .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; let bundle_txs: BundleTransactions = bundle.txs.into(); - let bundle_hash = bundle_txs.bundle_hash(); + let bundle_hash = bundle_txs + .bundle_hash() + .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; if let Err(e) = self .bundle_queue From 58370f89896d9153c86d389ee77a6263a67033e4 Mon Sep 17 00:00:00 2001 From: William Law Date: Mon, 3 Nov 2025 10:33:40 -0500 Subject: [PATCH 05/20] bundle params --- crates/audit/src/storage.rs | 2 +- crates/core/src/lib.rs | 4 +- crates/core/src/test_utils.rs | 12 +- crates/core/src/types.rs | 166 +++++++++++++++------------ crates/ingress-rpc/src/queue.rs | 4 +- crates/ingress-rpc/src/service.rs | 28 +++-- crates/ingress-rpc/src/validation.rs | 55 +++++---- 7 files changed, 152 insertions(+), 119 deletions(-) diff --git a/crates/audit/src/storage.rs b/crates/audit/src/storage.rs index 59d9ae0..fa2f53f 100644 --- a/crates/audit/src/storage.rs +++ b/crates/audit/src/storage.rs @@ -405,7 +405,7 @@ mod tests { } => { assert_eq!(key, "test-key"); assert_eq!(*ts, 1234567890); - assert_eq!(b.block_number, bundle.block_number); + assert_eq!(b.params.block_number, bundle.params.block_number); } _ => panic!("Expected Created event"), } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 4209de5..32cb7e6 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -6,6 +6,6 @@ pub mod types; pub mod test_utils; pub use types::{ - BLOCK_TIME, Bundle, BundleHash, BundleTransactions, BundleWithMetadata, CancelBundle, - MeterBundleResponse, + BLOCK_TIME, Bundle, BundleHash, BundleParams, BundleTransactions, BundleWithMetadata, + CancelBundle, MeterBundleResponse, }; diff --git a/crates/core/src/test_utils.rs b/crates/core/src/test_utils.rs index d3f6f8d..1a9102a 100644 --- a/crates/core/src/test_utils.rs +++ b/crates/core/src/test_utils.rs @@ -1,4 +1,4 @@ -use crate::{Bundle, BundleWithMetadata, MeterBundleResponse}; +use crate::{Bundle, BundleParams, BundleWithMetadata, MeterBundleResponse}; use alloy_consensus::SignableTransaction; use alloy_primitives::{Address, B256, U256}; use alloy_provider::network::TxSignerSync; @@ -33,10 +33,12 @@ pub fn create_test_bundle( let bundle = Bundle { txs, - block_number: block_number.unwrap_or(0), - min_timestamp, - max_timestamp, - ..Default::default() + params: BundleParams { + block_number: block_number.unwrap_or(0), + min_timestamp, + max_timestamp, + ..Default::default() + }, }; let meter_bundle_response = create_test_meter_bundle_response(); diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index f358c62..22fa2ed 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -10,66 +10,70 @@ use uuid::Uuid; /// Block time in microseconds pub const BLOCK_TIME: u128 = 2_000_000; -pub struct BundleTransactions(Vec); +#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct BundleTransactions(Vec); -impl From> for BundleTransactions { - fn from(txs: Vec) -> Self { +impl From> for BundleTransactions { + fn from(txs: Vec) -> Self { BundleTransactions(txs) } } +impl TryFrom> for BundleTransactions { + type Error = &'static str; + fn try_from(txs: Vec) -> Result { + let transactions = txs + .iter() + .map(|b| { + OpTxEnvelope::decode_2718_exact(b) + .map_err(|e| format!("failed to decode transaction: {e}")) + }) + .collect::, _>>(); + + match transactions { + Ok(transactions) => Ok(BundleTransactions(transactions)), + Err(_) => Err("failed to decode transactions"), + } + } +} + +impl From for Vec { + fn from(txs: BundleTransactions) -> Self { + txs.0.iter().map(|t| t.encoded_2718().into()).collect() + } +} + impl BundleTransactions { - pub fn bundle_hash(&self) -> Result { + pub fn bundle_hash(&self) -> B256 { let mut concatenated = Vec::new(); - let txs = self.transactions()?; - for tx in txs.iter() { + for tx in self.0.iter() { concatenated.extend_from_slice(tx.tx_hash().as_slice()); } - Ok(keccak256(&concatenated)) + keccak256(&concatenated) } /// Get transaction hashes for all transactions in the bundle - pub fn txn_hashes(&self) -> Result, String> { - self.transactions()? - .iter() - .map(|t| Ok(t.tx_hash())) - .collect() + pub fn txn_hashes(&self) -> Vec { + self.0.iter().map(|t| t.tx_hash()).collect() } /// Get sender addresses for all transactions in the bundle - pub fn senders(&self) -> Result, String> { - self.transactions()? - .iter() - .map(|t| { - t.recover_signer() - .map_err(|e| format!("failed to recover signer: {e}")) - }) - .collect() + pub fn senders(&self) -> Vec
{ + self.0.iter().map(|t| t.recover_signer().unwrap()).collect() } /// Get total gas limit for all transactions in the bundle - pub fn gas_limit(&self) -> Result { - Ok(self.transactions()?.iter().map(|t| t.gas_limit()).sum()) + pub fn gas_limit(&self) -> u64 { + self.0.iter().map(|t| t.gas_limit()).sum() } /// Get total data availability size for all transactions in the bundle - pub fn da_size(&self) -> Result { - Ok(self - .transactions()? - .iter() - .map(|t| tx_estimated_size_fjord_bytes(&t.encoded_2718())) - .sum()) - } - - /// Decode all transactions from bytes to OpTxEnvelope - pub fn transactions(&self) -> Result, String> { + pub fn da_size(&self) -> u64 { self.0 .iter() - .map(|b| { - OpTxEnvelope::decode_2718_exact(b) - .map_err(|e| format!("failed to decode transaction: {e}")) - }) - .collect() + .map(|t| tx_estimated_size_fjord_bytes(&t.encoded_2718())) + .sum() } } @@ -78,6 +82,13 @@ impl BundleTransactions { pub struct Bundle { pub txs: Vec, + #[serde(flatten)] + pub params: BundleParams, +} + +#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct BundleParams { #[serde(with = "alloy_serde::quantity")] pub block_number: u64, @@ -133,9 +144,20 @@ pub struct CancelBundle { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BundleWithMetadata { - bundle: Bundle, uuid: Uuid, + params: BundleParams, meter_bundle_response: MeterBundleResponse, + + pub txs: BundleTransactions, +} + +impl From for Bundle { + fn from(bundle_with_metadata: BundleWithMetadata) -> Self { + Bundle { + txs: bundle_with_metadata.txs.into(), + params: bundle_with_metadata.params.clone(), + } + } } impl BundleWithMetadata { @@ -144,17 +166,23 @@ impl BundleWithMetadata { meter_bundle_response: MeterBundleResponse, ) -> Result { let uuid = bundle + .params .replacement_uuid .clone() .unwrap_or_else(|| Uuid::new_v4().to_string()); let uuid = Uuid::parse_str(uuid.as_str()).map_err(|_| format!("Invalid UUID: {uuid}"))?; - bundle.replacement_uuid = Some(uuid.to_string()); + bundle.params.replacement_uuid = Some(uuid.to_string()); + let txs: BundleTransactions = bundle + .txs + .try_into() + .map_err(|e| format!("failed to convert transactions: {e}"))?; Ok(BundleWithMetadata { - bundle, uuid, + params: bundle.params.clone(), + txs, meter_bundle_response, }) } @@ -162,10 +190,6 @@ impl BundleWithMetadata { pub fn uuid(&self) -> &Uuid { &self.uuid } - - pub fn bundle(&self) -> &Bundle { - &self.bundle - } } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -224,10 +248,12 @@ mod tests { let bundle = BundleWithMetadata::load( Bundle { - replacement_uuid: None, txs: vec![tx1_bytes.clone().into()], - block_number: 1, - ..Default::default() + params: BundleParams { + replacement_uuid: None, + block_number: 1, + ..Default::default() + }, }, create_test_meter_bundle_response(), ) @@ -235,14 +261,14 @@ mod tests { assert!(!bundle.uuid().is_nil()); assert_eq!( - bundle.bundle.replacement_uuid, + bundle.params.replacement_uuid, Some(bundle.uuid().to_string()) ); - let bundle_txs: BundleTransactions = bundle.bundle().txs.clone().into(); - assert_eq!(bundle_txs.txn_hashes().unwrap().len(), 1); - assert_eq!(bundle_txs.txn_hashes().unwrap()[0], tx1.tx_hash()); - assert_eq!(bundle_txs.senders().unwrap().len(), 1); - assert_eq!(bundle_txs.senders().unwrap()[0], alice.address()); + let bundle_txs: BundleTransactions = bundle.txs.into(); + assert_eq!(bundle_txs.txn_hashes().len(), 1); + assert_eq!(bundle_txs.txn_hashes()[0], tx1.tx_hash()); + assert_eq!(bundle_txs.senders().len(), 1); + assert_eq!(bundle_txs.senders()[0], alice.address()); // Bundle hashes are keccack256(...txnHashes) let expected_bundle_hash_single = { @@ -251,32 +277,31 @@ mod tests { hasher.finalize() }; - assert_eq!( - bundle_txs.bundle_hash().unwrap(), - expected_bundle_hash_single - ); + assert_eq!(bundle_txs.bundle_hash(), expected_bundle_hash_single); let uuid = Uuid::new_v4(); let bundle = BundleWithMetadata::load( Bundle { - replacement_uuid: Some(uuid.to_string()), txs: vec![tx1_bytes.clone().into(), tx2_bytes.clone().into()], - block_number: 1, - ..Default::default() + params: BundleParams { + replacement_uuid: Some(uuid.to_string()), + block_number: 1, + ..Default::default() + }, }, create_test_meter_bundle_response(), ) .unwrap(); assert_eq!(*bundle.uuid(), uuid); - assert_eq!(bundle.bundle.replacement_uuid, Some(uuid.to_string())); - let bundle_txs2: BundleTransactions = bundle.bundle().txs.clone().into(); - assert_eq!(bundle_txs2.txn_hashes().unwrap().len(), 2); - assert_eq!(bundle_txs2.txn_hashes().unwrap()[0], tx1.tx_hash()); - assert_eq!(bundle_txs2.txn_hashes().unwrap()[1], tx2.tx_hash()); - assert_eq!(bundle_txs2.senders().unwrap().len(), 2); - assert_eq!(bundle_txs2.senders().unwrap()[0], alice.address()); - assert_eq!(bundle_txs2.senders().unwrap()[1], alice.address()); + assert_eq!(bundle.params.replacement_uuid, Some(uuid.to_string())); + let bundle_txs2: BundleTransactions = bundle.txs.into(); + assert_eq!(bundle_txs2.txn_hashes().len(), 2); + assert_eq!(bundle_txs2.txn_hashes()[0], tx1.tx_hash()); + assert_eq!(bundle_txs2.txn_hashes()[1], tx2.tx_hash()); + assert_eq!(bundle_txs2.senders().len(), 2); + assert_eq!(bundle_txs2.senders()[0], alice.address()); + assert_eq!(bundle_txs2.senders()[1], alice.address()); let expected_bundle_hash_double = { let mut hasher = Keccak256::default(); @@ -285,10 +310,7 @@ mod tests { hasher.finalize() }; - assert_eq!( - bundle_txs2.bundle_hash().unwrap(), - expected_bundle_hash_double - ); + assert_eq!(bundle_txs2.bundle_hash(), expected_bundle_hash_double); } #[test] diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index 3538c42..b80100a 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -98,8 +98,8 @@ mod tests { let bundle = create_test_bundle(); let bundle_with_metadata = BundleWithMetadata::load(bundle.clone(), create_test_meter_bundle_response()).unwrap(); - let bundle_txs: BundleTransactions = bundle.txs.into(); - let bundle_hash = bundle_txs.bundle_hash().unwrap(); + let bundle_txs: BundleTransactions = bundle_with_metadata.txs.clone().into(); + let bundle_hash = bundle_txs.bundle_hash(); let start = Instant::now(); let result = publisher.publish(&bundle_with_metadata, &bundle_hash).await; diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 2de7241..257cc47 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -12,8 +12,8 @@ use reth_rpc_eth_types::EthApiError; use std::time::{SystemTime, UNIX_EPOCH}; use tips_audit::{BundleEvent, BundleEventPublisher}; use tips_core::{ - BLOCK_TIME, Bundle, BundleHash, BundleTransactions, BundleWithMetadata, CancelBundle, - MeterBundleResponse, + BLOCK_TIME, Bundle, BundleHash, BundleParams, BundleTransactions, BundleWithMetadata, + CancelBundle, MeterBundleResponse, }; use tracing::{info, warn}; @@ -76,10 +76,8 @@ where let bundle_with_metadata = BundleWithMetadata::load(bundle, meter_bundle_response) .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; - let bundle_txs: BundleTransactions = bundle_with_metadata.bundle().txs.clone().into(); - let bundle_hash = bundle_txs - .bundle_hash() - .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; + let bundle_txs: BundleTransactions = bundle_with_metadata.txs.clone().into(); + let bundle_hash = bundle_txs.bundle_hash(); if let Err(e) = self .bundle_queue .publish(&bundle_with_metadata, &bundle_hash) @@ -96,7 +94,7 @@ where let audit_event = BundleEvent::Received { bundle_id: *bundle_with_metadata.uuid(), - bundle: bundle_with_metadata.bundle().clone(), + bundle: bundle_with_metadata.clone().into(), }; if let Err(e) = self.audit_publisher.publish(audit_event).await { warn!(message = "Failed to publish audit event", bundle_id = %bundle_with_metadata.uuid(), error = %e); @@ -124,18 +122,18 @@ where let bundle = Bundle { txs: vec![data.clone()], - max_timestamp: Some(expiry_timestamp), - reverting_tx_hashes: vec![transaction.tx_hash()], - ..Default::default() + params: BundleParams { + max_timestamp: Some(expiry_timestamp), + reverting_tx_hashes: vec![transaction.tx_hash()], + ..Default::default() + }, }; let meter_bundle_response = self.meter_bundle(&bundle).await?; let bundle_with_metadata = BundleWithMetadata::load(bundle.clone(), meter_bundle_response) .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; - let bundle_txs: BundleTransactions = bundle.txs.into(); - let bundle_hash = bundle_txs - .bundle_hash() - .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; + let bundle_txs: BundleTransactions = bundle_with_metadata.txs.clone().into(); + let bundle_hash = bundle_txs.bundle_hash(); if let Err(e) = self .bundle_queue @@ -168,7 +166,7 @@ where let audit_event = BundleEvent::Received { bundle_id: *bundle_with_metadata.uuid(), - bundle: bundle_with_metadata.bundle().clone(), + bundle: bundle_with_metadata.clone().into(), }; if let Err(e) = self.audit_publisher.publish(audit_event).await { warn!(message = "Failed to publish audit event", bundle_id = %bundle_with_metadata.uuid(), error = %e); diff --git a/crates/ingress-rpc/src/validation.rs b/crates/ingress-rpc/src/validation.rs index 4ff4946..c9f2559 100644 --- a/crates/ingress-rpc/src/validation.rs +++ b/crates/ingress-rpc/src/validation.rs @@ -177,7 +177,7 @@ pub fn validate_bundle(bundle: &Bundle, bundle_gas: u64, tx_hashes: Vec) - .unwrap() .as_secs() + Duration::from_secs(3600).as_secs(); - if let Some(max_timestamp) = bundle.max_timestamp + if let Some(max_timestamp) = bundle.params.max_timestamp && max_timestamp > valid_timestamp_window { return Err(EthApiError::InvalidParams( @@ -203,7 +203,7 @@ pub fn validate_bundle(bundle: &Bundle, bundle_gas: u64, tx_hashes: Vec) - } // Partial transaction dropping is not supported, `dropping_tx_hashes` must be empty - if !bundle.dropping_tx_hashes.is_empty() { + if !bundle.params.dropping_tx_hashes.is_empty() { return Err(EthApiError::InvalidParams( "Partial transaction dropping is not supported".into(), ) @@ -211,7 +211,7 @@ pub fn validate_bundle(bundle: &Bundle, bundle_gas: u64, tx_hashes: Vec) - } // revert protection: all transaction hashes must be in `reverting_tx_hashes` - let reverting_tx_hashes_set: HashSet<_> = bundle.reverting_tx_hashes.iter().collect(); + let reverting_tx_hashes_set: HashSet<_> = bundle.params.reverting_tx_hashes.iter().collect(); let tx_hashes_set: HashSet<_> = tx_hashes.iter().collect(); if reverting_tx_hashes_set != tx_hashes_set { return Err(EthApiError::InvalidParams( @@ -238,6 +238,7 @@ mod tests { use op_alloy_network::eip2718::Encodable2718; use revm_context_interface::transaction::{AccessList, AccessListItem}; use std::time::{SystemTime, UNIX_EPOCH}; + use tips_core::BundleParams; fn create_account(nonce: u64, balance: U256) -> AccountInfo { AccountInfo { @@ -531,8 +532,10 @@ mod tests { let too_far_in_the_future = current_time + 3601; let bundle = Bundle { txs: vec![], - max_timestamp: Some(too_far_in_the_future), - ..Default::default() + params: BundleParams { + max_timestamp: Some(too_far_in_the_future), + ..Default::default() + }, }; assert_eq!( validate_bundle(&bundle, 0, vec![]), @@ -580,11 +583,13 @@ mod tests { let bundle = Bundle { txs: encoded_txs, - block_number: 0, - min_timestamp: None, - max_timestamp: None, - reverting_tx_hashes: vec![], - ..Default::default() + params: BundleParams { + block_number: 0, + min_timestamp: None, + max_timestamp: None, + reverting_tx_hashes: vec![], + ..Default::default() + }, }; // Test should fail due to exceeding gas limit @@ -631,11 +636,13 @@ mod tests { let bundle = Bundle { txs: encoded_txs, - block_number: 0, - min_timestamp: None, - max_timestamp: None, - reverting_tx_hashes: vec![], - ..Default::default() + params: BundleParams { + block_number: 0, + min_timestamp: None, + max_timestamp: None, + reverting_tx_hashes: vec![], + ..Default::default() + }, }; // Test should fail due to exceeding gas limit @@ -651,8 +658,10 @@ mod tests { async fn test_err_bundle_partial_transaction_dropping_not_supported() { let bundle = Bundle { txs: vec![], - dropping_tx_hashes: vec![B256::random()], - ..Default::default() + params: BundleParams { + dropping_tx_hashes: vec![B256::random()], + ..Default::default() + }, }; assert_eq!( validate_bundle(&bundle, 0, vec![]), @@ -698,11 +707,13 @@ mod tests { let bundle = Bundle { txs: encoded_txs, - block_number: 0, - min_timestamp: None, - max_timestamp: None, - reverting_tx_hashes: tx_hashes[..2].to_vec(), - ..Default::default() + params: BundleParams { + block_number: 0, + min_timestamp: None, + max_timestamp: None, + reverting_tx_hashes: tx_hashes[..2].to_vec(), + ..Default::default() + }, }; // Test should fail due to exceeding gas limit From 323cb21680ee6b57495b862ebf78e0f676d78642 Mon Sep 17 00:00:00 2001 From: William Law Date: Mon, 3 Nov 2025 10:37:16 -0500 Subject: [PATCH 06/20] fmt --- crates/ingress-rpc/src/queue.rs | 8 ++------ crates/ingress-rpc/src/service.rs | 10 ++++------ 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index b80100a..bb8849f 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -75,10 +75,7 @@ impl QueuePublisher for KafkaQueuePublisher { mod tests { use super::*; use rdkafka::config::ClientConfig; - use tips_core::{ - Bundle, BundleTransactions, BundleWithMetadata, - test_utils::create_test_meter_bundle_response, - }; + use tips_core::{Bundle, BundleWithMetadata, test_utils::create_test_meter_bundle_response}; use tokio::time::{Duration, Instant}; fn create_test_bundle() -> Bundle { @@ -98,8 +95,7 @@ mod tests { let bundle = create_test_bundle(); let bundle_with_metadata = BundleWithMetadata::load(bundle.clone(), create_test_meter_bundle_response()).unwrap(); - let bundle_txs: BundleTransactions = bundle_with_metadata.txs.clone().into(); - let bundle_hash = bundle_txs.bundle_hash(); + let bundle_hash = bundle_with_metadata.txs.bundle_hash(); let start = Instant::now(); let result = publisher.publish(&bundle_with_metadata, &bundle_hash).await; diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 257cc47..8f0876f 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -12,8 +12,8 @@ use reth_rpc_eth_types::EthApiError; use std::time::{SystemTime, UNIX_EPOCH}; use tips_audit::{BundleEvent, BundleEventPublisher}; use tips_core::{ - BLOCK_TIME, Bundle, BundleHash, BundleParams, BundleTransactions, BundleWithMetadata, - CancelBundle, MeterBundleResponse, + BLOCK_TIME, Bundle, BundleHash, BundleParams, BundleWithMetadata, CancelBundle, + MeterBundleResponse, }; use tracing::{info, warn}; @@ -76,8 +76,7 @@ where let bundle_with_metadata = BundleWithMetadata::load(bundle, meter_bundle_response) .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; - let bundle_txs: BundleTransactions = bundle_with_metadata.txs.clone().into(); - let bundle_hash = bundle_txs.bundle_hash(); + let bundle_hash = bundle_with_metadata.txs.bundle_hash(); if let Err(e) = self .bundle_queue .publish(&bundle_with_metadata, &bundle_hash) @@ -132,8 +131,7 @@ where let bundle_with_metadata = BundleWithMetadata::load(bundle.clone(), meter_bundle_response) .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; - let bundle_txs: BundleTransactions = bundle_with_metadata.txs.clone().into(); - let bundle_hash = bundle_txs.bundle_hash(); + let bundle_hash = bundle_with_metadata.txs.bundle_hash(); if let Err(e) = self .bundle_queue From 180aca1c354696f12fa73b386f41269d26465aa3 Mon Sep 17 00:00:00 2001 From: William Law Date: Mon, 3 Nov 2025 10:45:11 -0500 Subject: [PATCH 07/20] txs helper --- crates/core/src/types.rs | 7 +++++-- crates/ingress-rpc/src/queue.rs | 2 +- crates/ingress-rpc/src/service.rs | 4 ++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 22fa2ed..45e8053 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -146,9 +146,8 @@ pub struct CancelBundle { pub struct BundleWithMetadata { uuid: Uuid, params: BundleParams, + txs: BundleTransactions, meter_bundle_response: MeterBundleResponse, - - pub txs: BundleTransactions, } impl From for Bundle { @@ -190,6 +189,10 @@ impl BundleWithMetadata { pub fn uuid(&self) -> &Uuid { &self.uuid } + + pub fn txs(&self) -> &BundleTransactions { + &self.txs + } } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index bb8849f..7c1b5b3 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -95,7 +95,7 @@ mod tests { let bundle = create_test_bundle(); let bundle_with_metadata = BundleWithMetadata::load(bundle.clone(), create_test_meter_bundle_response()).unwrap(); - let bundle_hash = bundle_with_metadata.txs.bundle_hash(); + let bundle_hash = bundle_with_metadata.txs().bundle_hash(); let start = Instant::now(); let result = publisher.publish(&bundle_with_metadata, &bundle_hash).await; diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 8f0876f..1c6afb6 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -76,7 +76,7 @@ where let bundle_with_metadata = BundleWithMetadata::load(bundle, meter_bundle_response) .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; - let bundle_hash = bundle_with_metadata.txs.bundle_hash(); + let bundle_hash = bundle_with_metadata.txs().bundle_hash(); if let Err(e) = self .bundle_queue .publish(&bundle_with_metadata, &bundle_hash) @@ -131,7 +131,7 @@ where let bundle_with_metadata = BundleWithMetadata::load(bundle.clone(), meter_bundle_response) .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; - let bundle_hash = bundle_with_metadata.txs.bundle_hash(); + let bundle_hash = bundle_with_metadata.txs().bundle_hash(); if let Err(e) = self .bundle_queue From 23c60ebb16aa546f800af26d3efa413dd33c2472 Mon Sep 17 00:00:00 2001 From: William Law Date: Mon, 3 Nov 2025 10:48:51 -0500 Subject: [PATCH 08/20] err type --- crates/core/src/types.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 45e8053..07f214a 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -21,19 +21,16 @@ impl From> for BundleTransactions { } impl TryFrom> for BundleTransactions { - type Error = &'static str; + type Error = String; fn try_from(txs: Vec) -> Result { let transactions = txs .iter() - .map(|b| { - OpTxEnvelope::decode_2718_exact(b) - .map_err(|e| format!("failed to decode transaction: {e}")) - }) + .map(|b| OpTxEnvelope::decode_2718_exact(b)) .collect::, _>>(); match transactions { Ok(transactions) => Ok(BundleTransactions(transactions)), - Err(_) => Err("failed to decode transactions"), + Err(e) => Err(format!("failed to decode transactions: {e}")), } } } From 55f336e90684f00c98e9854b06bedca87e0e09e4 Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 5 Nov 2025 10:02:21 -0500 Subject: [PATCH 09/20] spike new types --- Cargo.toml | 2 +- crates/audit/Cargo.toml | 2 +- crates/audit/src/storage.rs | 29 +- crates/audit/src/types.rs | 27 +- crates/audit/tests/integration_tests.rs | 4 +- crates/audit/tests/s3_test.rs | 41 +-- crates/bundle-pool/src/lib.rs | 4 +- crates/bundle-pool/src/pool.rs | 12 +- crates/bundle-pool/src/source.rs | 22 +- crates/bundle-pool/tests/integration_tests.rs | 4 +- crates/core/Cargo.toml | 2 +- crates/core/src/lib.rs | 4 +- crates/core/src/test_utils.rs | 41 ++- crates/core/src/types.rs | 315 +++++++++++------- crates/ingress-rpc/src/queue.rs | 14 +- crates/ingress-rpc/src/service.rs | 27 +- crates/ingress-rpc/src/validation.rs | 52 ++- 17 files changed, 334 insertions(+), 268 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d2029ef..040cf38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,7 @@ alloy-serde = "1.0.41" # op-alloy op-alloy-network = { version = "0.20.0", default-features = false } -op-alloy-consensus = { version = "0.20.0", features = ["k256"] } +op-alloy-consensus = { version = "0.20.0", features = ["k256", "serde"] } op-alloy-rpc-types = { version = "0.20.0", default-features = true} op-alloy-flz = { version = "0.13.1" } diff --git a/crates/audit/Cargo.toml b/crates/audit/Cargo.toml index 5fd0ef2..25cf7e3 100644 --- a/crates/audit/Cargo.toml +++ b/crates/audit/Cargo.toml @@ -12,7 +12,7 @@ name = "tips-audit" path = "src/bin/main.rs" [dependencies] -tips-core = { workspace = true } +tips-core = { workspace = true, features = ["test-utils"] } tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/audit/src/storage.rs b/crates/audit/src/storage.rs index fa2f53f..c8b619f 100644 --- a/crates/audit/src/storage.rs +++ b/crates/audit/src/storage.rs @@ -10,7 +10,7 @@ use aws_sdk_s3::primitives::ByteStream; use serde::{Deserialize, Serialize}; use std::fmt; use std::fmt::Debug; -use tips_core::Bundle; +use tips_core::AcceptedBundle; use tracing::info; #[derive(Debug)] @@ -39,7 +39,7 @@ pub enum BundleHistoryEvent { Received { key: String, timestamp: i64, - bundle: Bundle, + bundle: Box, }, Cancelled { key: String, @@ -365,13 +365,9 @@ mod tests { use crate::reader::Event; use crate::types::{BundleEvent, DropReason}; use alloy_primitives::TxHash; - use tips_core::Bundle; + use tips_core::test_utils::create_bundle_from_txn_data; use uuid::Uuid; - fn create_test_bundle() -> Bundle { - Bundle::default() - } - fn create_test_event(key: &str, timestamp: i64, bundle_event: BundleEvent) -> Event { Event { key: key.to_string(), @@ -383,11 +379,11 @@ mod tests { #[test] fn test_update_bundle_history_transform_adds_new_event() { let bundle_history = BundleHistory { history: vec![] }; - let bundle = create_test_bundle(); + let bundle = create_bundle_from_txn_data(); let bundle_id = Uuid::new_v4(); let bundle_event = BundleEvent::Received { bundle_id, - bundle: bundle.clone(), + bundle: Box::new(bundle.clone()), }; let event = create_test_event("test-key", 1234567890, bundle_event); @@ -405,7 +401,7 @@ mod tests { } => { assert_eq!(key, "test-key"); assert_eq!(*ts, 1234567890); - assert_eq!(b.params.block_number, bundle.params.block_number); + assert_eq!(b.block_number, bundle.block_number); } _ => panic!("Expected Created event"), } @@ -416,15 +412,18 @@ mod tests { let existing_event = BundleHistoryEvent::Received { key: "duplicate-key".to_string(), timestamp: 1111111111, - bundle: create_test_bundle(), + bundle: Box::new(create_bundle_from_txn_data()), }; let bundle_history = BundleHistory { history: vec![existing_event], }; - let bundle = create_test_bundle(); + let bundle = create_bundle_from_txn_data(); let bundle_id = Uuid::new_v4(); - let bundle_event = BundleEvent::Received { bundle_id, bundle }; + let bundle_event = BundleEvent::Received { + bundle_id, + bundle: Box::new(bundle), + }; let event = create_test_event("duplicate-key", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history, &event); @@ -437,10 +436,10 @@ mod tests { let bundle_history = BundleHistory { history: vec![] }; let bundle_id = Uuid::new_v4(); - let bundle = create_test_bundle(); + let bundle = create_bundle_from_txn_data(); let bundle_event = BundleEvent::Received { bundle_id, - bundle: bundle.clone(), + bundle: Box::new(bundle), }; let event = create_test_event("test-key", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history.clone(), &event); diff --git a/crates/audit/src/types.rs b/crates/audit/src/types.rs index 4208b3b..6b0d7d1 100644 --- a/crates/audit/src/types.rs +++ b/crates/audit/src/types.rs @@ -1,10 +1,8 @@ use alloy_consensus::transaction::{SignerRecoverable, Transaction as ConsensusTransaction}; use alloy_primitives::{Address, TxHash, U256}; -use alloy_provider::network::eip2718::Decodable2718; use bytes::Bytes; -use op_alloy_consensus::OpTxEnvelope; use serde::{Deserialize, Serialize}; -use tips_core::Bundle; +use tips_core::AcceptedBundle; use uuid::Uuid; #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -33,7 +31,7 @@ pub struct Transaction { pub enum BundleEvent { Received { bundle_id: BundleId, - bundle: Bundle, + bundle: Box, }, Cancelled { bundle_id: BundleId, @@ -72,19 +70,14 @@ impl BundleEvent { bundle .txs .iter() - .filter_map(|tx_bytes| { - match OpTxEnvelope::decode_2718_exact(tx_bytes.iter().as_slice()) { - Ok(envelope) => { - match envelope.recover_signer() { - Ok(sender) => Some(TransactionId { - sender, - nonce: U256::from(envelope.nonce()), - hash: *envelope.hash(), - }), - Err(_) => None, // Skip invalid transactions - } - } - Err(_) => None, // Skip malformed transactions + .filter_map(|envelope| { + match envelope.recover_signer() { + Ok(sender) => Some(TransactionId { + sender, + nonce: U256::from(envelope.nonce()), + hash: *envelope.hash(), + }), + Err(_) => None, // Skip invalid transactions } }) .collect() diff --git a/crates/audit/tests/integration_tests.rs b/crates/audit/tests/integration_tests.rs index d2bb0fe..b906bd7 100644 --- a/crates/audit/tests/integration_tests.rs +++ b/crates/audit/tests/integration_tests.rs @@ -5,7 +5,7 @@ use tips_audit::{ storage::{BundleEventS3Reader, S3EventReaderWriter}, types::{BundleEvent, DropReason}, }; -use tips_core::Bundle; +use tips_core::test_utils::create_test_bundle; use uuid::Uuid; mod common; use common::TestHarness; @@ -23,7 +23,7 @@ async fn test_kafka_publisher_s3_archiver_integration() let test_events = vec![ BundleEvent::Received { bundle_id: test_bundle_id, - bundle: Bundle::default(), + bundle: Box::new(create_test_bundle(vec![], None, None, None)), }, BundleEvent::Dropped { bundle_id: test_bundle_id, diff --git a/crates/audit/tests/s3_test.rs b/crates/audit/tests/s3_test.rs index 55c8f87..1f47ed7 100644 --- a/crates/audit/tests/s3_test.rs +++ b/crates/audit/tests/s3_test.rs @@ -1,30 +1,17 @@ -use alloy_primitives::{Bytes, TxHash, b256, bytes}; +use alloy_primitives::TxHash; use std::sync::Arc; use tips_audit::{ reader::Event, storage::{BundleEventS3Reader, EventWriter, S3EventReaderWriter}, types::BundleEvent, }; -use tips_core::Bundle; use tokio::task::JoinSet; use uuid::Uuid; mod common; use common::TestHarness; - -// https://basescan.org/tx/0x4f7ddfc911f5cf85dd15a413f4cbb2a0abe4f1ff275ed13581958c0bcf043c5e -const TXN_DATA: Bytes = bytes!( - "0x02f88f8221058304b6b3018315fb3883124f80948ff2f0a8d017c79454aa28509a19ab9753c2dd1480a476d58e1a0182426068c9ea5b00000000000000000002f84f00000000083e4fda54950000c080a086fbc7bbee41f441fb0f32f7aa274d2188c460fe6ac95095fa6331fa08ec4ce7a01aee3bcc3c28f7ba4e0c24da9ae85e9e0166c73cabb42c25ff7b5ecd424f3105" -); -const TXN_HASH: TxHash = - b256!("0x4f7ddfc911f5cf85dd15a413f4cbb2a0abe4f1ff275ed13581958c0bcf043c5e"); - -fn create_test_bundle() -> Bundle { - Bundle { - txs: vec![TXN_DATA.clone()], - ..Default::default() - } -} +use tips_core::test_utils::TXN_HASH; +use tips_core::test_utils::create_bundle_from_txn_data; fn create_test_event(key: &str, timestamp: i64, bundle_event: BundleEvent) -> Event { Event { @@ -40,13 +27,13 @@ async fn test_event_write_and_read() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box( sources: Vec, - bundle_rx: mpsc::UnboundedReceiver, + bundle_rx: mpsc::UnboundedReceiver, pool: Arc>, ) where S: BundleSource + Send + 'static, diff --git a/crates/bundle-pool/src/pool.rs b/crates/bundle-pool/src/pool.rs index fe972e4..4a529c9 100644 --- a/crates/bundle-pool/src/pool.rs +++ b/crates/bundle-pool/src/pool.rs @@ -3,7 +3,7 @@ use alloy_primitives::map::HashMap; use std::fmt::Debug; use std::sync::{Arc, Mutex}; use tips_audit::{BundleEvent, DropReason}; -use tips_core::BundleWithMetadata; +use tips_core::AcceptedBundle; use tokio::sync::mpsc; use tracing::warn; use uuid::Uuid; @@ -31,8 +31,8 @@ impl ProcessedBundle { } pub trait BundleStore { - fn add_bundle(&mut self, bundle: BundleWithMetadata); - fn get_bundles(&self) -> Vec; + fn add_bundle(&mut self, bundle: AcceptedBundle); + fn get_bundles(&self) -> Vec; fn built_flashblock( &mut self, block_number: u64, @@ -44,7 +44,7 @@ pub trait BundleStore { struct BundleData { flashblocks_in_block: HashMap>, - bundles: HashMap, + bundles: HashMap, } #[derive(Clone)] @@ -77,12 +77,12 @@ impl InMemoryBundlePool { } impl BundleStore for InMemoryBundlePool { - fn add_bundle(&mut self, bundle: BundleWithMetadata) { + fn add_bundle(&mut self, bundle: AcceptedBundle) { let mut inner = self.inner.lock().unwrap(); inner.bundles.insert(*bundle.uuid(), bundle); } - fn get_bundles(&self) -> Vec { + fn get_bundles(&self) -> Vec { let inner = self.inner.lock().unwrap(); inner.bundles.values().cloned().collect() } diff --git a/crates/bundle-pool/src/source.rs b/crates/bundle-pool/src/source.rs index db6df07..5f3ef76 100644 --- a/crates/bundle-pool/src/source.rs +++ b/crates/bundle-pool/src/source.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::{ClientConfig, Message}; use std::fmt::Debug; -use tips_core::BundleWithMetadata; +use tips_core::AcceptedBundle; use tokio::sync::mpsc; use tracing::{error, trace}; @@ -14,7 +14,7 @@ pub trait BundleSource { pub struct KafkaBundleSource { queue_consumer: StreamConsumer, - publisher: mpsc::UnboundedSender, + publisher: mpsc::UnboundedSender, } impl Debug for KafkaBundleSource { @@ -27,7 +27,7 @@ impl KafkaBundleSource { pub fn new( client_config: ClientConfig, topic: String, - publisher: mpsc::UnboundedSender, + publisher: mpsc::UnboundedSender, ) -> Result { let queue_consumer: StreamConsumer = client_config.create()?; queue_consumer.subscribe(&[topic.as_str()])?; @@ -52,14 +52,14 @@ impl BundleSource for KafkaBundleSource { } }; - let bundle_with_metadata: BundleWithMetadata = - match serde_json::from_slice(payload) { - Ok(b) => b, - Err(e) => { - error!(error = %e, "Failed to deserialize bundle"); - continue; - } - }; + let bundle_with_metadata: AcceptedBundle = match serde_json::from_slice(payload) + { + Ok(b) => b, + Err(e) => { + error!(error = %e, "Failed to deserialize bundle"); + continue; + } + }; trace!( bundle = ?bundle_with_metadata, diff --git a/crates/bundle-pool/tests/integration_tests.rs b/crates/bundle-pool/tests/integration_tests.rs index 2765e62..b622e31 100644 --- a/crates/bundle-pool/tests/integration_tests.rs +++ b/crates/bundle-pool/tests/integration_tests.rs @@ -11,7 +11,7 @@ use tips_bundle_pool::{ BundleStore, InMemoryBundlePool, KafkaBundleSource, connect_sources_to_pool, }; use tips_core::{ - BundleWithMetadata, + AcceptedBundle, test_utils::{create_test_bundle, create_transaction}, }; use tokio::sync::mpsc; @@ -47,7 +47,7 @@ async fn test_kafka_bundle_source_to_pool_integration() -> Result<(), Box(); + let (bundle_tx, bundle_rx) = mpsc::unbounded_channel::(); let kafka_source = KafkaBundleSource::new(kafka_consumer_config, topic.to_string(), bundle_tx)?; diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 3219ef6..ecc135a 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -16,13 +16,13 @@ alloy-primitives.workspace = true alloy-consensus.workspace = true alloy-provider.workspace = true op-alloy-consensus.workspace = true -serde = { version = "1.0.228", default-features = false, features = ["alloc", "derive"] } alloy-serde = { version = "1.0.41", default-features = false } alloy-signer-local = { workspace = true, optional = true } op-alloy-rpc-types = { workspace = true, optional = true } tracing.workspace = true tracing-subscriber.workspace = true op-alloy-flz.workspace = true +serde.workspace = true [dev-dependencies] alloy-signer-local.workspace = true diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 32cb7e6..c8e1515 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -6,6 +6,6 @@ pub mod types; pub mod test_utils; pub use types::{ - BLOCK_TIME, Bundle, BundleHash, BundleParams, BundleTransactions, BundleWithMetadata, - CancelBundle, MeterBundleResponse, + AcceptedBundle, BLOCK_TIME, Bundle, BundleExtensions, BundleHash, BundleTxs, CancelBundle, + MeterBundleResponse, }; diff --git a/crates/core/src/test_utils.rs b/crates/core/src/test_utils.rs index 1a9102a..24fc24a 100644 --- a/crates/core/src/test_utils.rs +++ b/crates/core/src/test_utils.rs @@ -1,12 +1,31 @@ -use crate::{Bundle, BundleParams, BundleWithMetadata, MeterBundleResponse}; +use crate::{AcceptedBundle, Bundle, MeterBundleResponse}; use alloy_consensus::SignableTransaction; -use alloy_primitives::{Address, B256, U256}; +use alloy_primitives::{Address, B256, Bytes, TxHash, U256, b256, bytes}; use alloy_provider::network::TxSignerSync; use alloy_provider::network::eip2718::Encodable2718; use alloy_signer_local::PrivateKeySigner; use op_alloy_consensus::OpTxEnvelope; use op_alloy_rpc_types::OpTransactionRequest; +// https://basescan.org/tx/0x4f7ddfc911f5cf85dd15a413f4cbb2a0abe4f1ff275ed13581958c0bcf043c5e +pub const TXN_DATA: Bytes = bytes!( + "0x02f88f8221058304b6b3018315fb3883124f80948ff2f0a8d017c79454aa28509a19ab9753c2dd1480a476d58e1a0182426068c9ea5b00000000000000000002f84f00000000083e4fda54950000c080a086fbc7bbee41f441fb0f32f7aa274d2188c460fe6ac95095fa6331fa08ec4ce7a01aee3bcc3c28f7ba4e0c24da9ae85e9e0166c73cabb42c25ff7b5ecd424f3105" +); + +pub const TXN_HASH: TxHash = + b256!("0x4f7ddfc911f5cf85dd15a413f4cbb2a0abe4f1ff275ed13581958c0bcf043c5e"); + +pub fn create_bundle_from_txn_data() -> AcceptedBundle { + AcceptedBundle::load( + Bundle { + txs: vec![TXN_DATA.clone()], + ..Default::default() + }, + create_test_meter_bundle_response(), + ) + .unwrap() +} + pub fn create_transaction(from: PrivateKeySigner, nonce: u64, to: Address) -> OpTxEnvelope { let mut txn = OpTransactionRequest::default() .value(U256::from(10_000)) @@ -28,21 +47,23 @@ pub fn create_test_bundle( block_number: Option, min_timestamp: Option, max_timestamp: Option, -) -> BundleWithMetadata { +) -> AcceptedBundle { let txs = txns.iter().map(|t| t.encoded_2718().into()).collect(); let bundle = Bundle { txs, - params: BundleParams { - block_number: block_number.unwrap_or(0), - min_timestamp, - max_timestamp, - ..Default::default() - }, + block_number: block_number.unwrap_or(0), + flashblock_number_min: None, + flashblock_number_max: None, + min_timestamp, + max_timestamp, + reverting_tx_hashes: vec![], + replacement_uuid: None, + dropping_tx_hashes: vec![], }; let meter_bundle_response = create_test_meter_bundle_response(); - BundleWithMetadata::load(bundle, meter_bundle_response).unwrap() + AcceptedBundle::load(bundle, meter_bundle_response).unwrap() } pub fn create_test_meter_bundle_response() -> MeterBundleResponse { diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 07f214a..f279a3a 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -1,4 +1,5 @@ use alloy_consensus::Transaction; +use alloy_consensus::transaction::Recovered; use alloy_consensus::transaction::SignerRecoverable; use alloy_primitives::{Address, B256, Bytes, TxHash, keccak256}; use alloy_provider::network::eip2718::{Decodable2718, Encodable2718}; @@ -12,80 +13,125 @@ pub const BLOCK_TIME: u128 = 2_000_000; #[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] -pub struct BundleTransactions(Vec); +pub struct Bundle { + pub txs: Vec, -impl From> for BundleTransactions { - fn from(txs: Vec) -> Self { - BundleTransactions(txs) - } -} + #[serde(with = "alloy_serde::quantity")] + pub block_number: u64, -impl TryFrom> for BundleTransactions { - type Error = String; - fn try_from(txs: Vec) -> Result { - let transactions = txs - .iter() - .map(|b| OpTxEnvelope::decode_2718_exact(b)) - .collect::, _>>(); + #[serde( + default, + deserialize_with = "alloy_serde::quantity::opt::deserialize", + skip_serializing_if = "Option::is_none" + )] + pub flashblock_number_min: Option, - match transactions { - Ok(transactions) => Ok(BundleTransactions(transactions)), - Err(e) => Err(format!("failed to decode transactions: {e}")), - } - } -} + #[serde( + default, + deserialize_with = "alloy_serde::quantity::opt::deserialize", + skip_serializing_if = "Option::is_none" + )] + pub flashblock_number_max: Option, -impl From for Vec { - fn from(txs: BundleTransactions) -> Self { - txs.0.iter().map(|t| t.encoded_2718().into()).collect() - } -} + #[serde( + default, + deserialize_with = "alloy_serde::quantity::opt::deserialize", + skip_serializing_if = "Option::is_none" + )] + pub min_timestamp: Option, -impl BundleTransactions { - pub fn bundle_hash(&self) -> B256 { - let mut concatenated = Vec::new(); - for tx in self.0.iter() { - concatenated.extend_from_slice(tx.tx_hash().as_slice()); - } - keccak256(&concatenated) - } + #[serde( + default, + deserialize_with = "alloy_serde::quantity::opt::deserialize", + skip_serializing_if = "Option::is_none" + )] + pub max_timestamp: Option, - /// Get transaction hashes for all transactions in the bundle - pub fn txn_hashes(&self) -> Vec { - self.0.iter().map(|t| t.tx_hash()).collect() - } + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub reverting_tx_hashes: Vec, - /// Get sender addresses for all transactions in the bundle - pub fn senders(&self) -> Vec
{ - self.0.iter().map(|t| t.recover_signer().unwrap()).collect() - } + #[serde(default, skip_serializing_if = "Option::is_none")] + pub replacement_uuid: Option, + + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub dropping_tx_hashes: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ParsedBundle { + pub txs: Vec>, + pub block_number: u64, + pub flashblock_number_min: Option, + pub flashblock_number_max: Option, + pub min_timestamp: Option, + pub max_timestamp: Option, + pub reverting_tx_hashes: Vec, + pub replacement_uuid: Option, + pub dropping_tx_hashes: Vec, +} - /// Get total gas limit for all transactions in the bundle - pub fn gas_limit(&self) -> u64 { - self.0.iter().map(|t| t.gas_limit()).sum() +impl TryFrom for ParsedBundle { + type Error = String; + fn try_from(bundle: Bundle) -> Result { + // TODO: better error handling + let txs: Vec> = bundle + .txs + .into_iter() + .map(|tx| { + OpTxEnvelope::decode_2718_exact(tx.iter().as_slice()) + .unwrap() + .try_into_recovered() + .unwrap() + }) + .collect(); + Ok(ParsedBundle { + txs, + block_number: bundle.block_number, + flashblock_number_min: bundle.flashblock_number_min, + flashblock_number_max: bundle.flashblock_number_max, + min_timestamp: bundle.min_timestamp, + max_timestamp: bundle.max_timestamp, + reverting_tx_hashes: bundle.reverting_tx_hashes, + replacement_uuid: bundle.replacement_uuid, + dropping_tx_hashes: bundle.dropping_tx_hashes, + }) } +} - /// Get total data availability size for all transactions in the bundle - pub fn da_size(&self) -> u64 { - self.0 - .iter() - .map(|t| tx_estimated_size_fjord_bytes(&t.encoded_2718())) - .sum() +impl From for ParsedBundle { + fn from(accepted_bundle: AcceptedBundle) -> Self { + Self { + txs: accepted_bundle.txs, + block_number: accepted_bundle.block_number, + flashblock_number_min: accepted_bundle.flashblock_number_min, + flashblock_number_max: accepted_bundle.flashblock_number_max, + min_timestamp: accepted_bundle.min_timestamp, + max_timestamp: accepted_bundle.max_timestamp, + reverting_tx_hashes: accepted_bundle.reverting_tx_hashes, + replacement_uuid: accepted_bundle.replacement_uuid, + dropping_tx_hashes: accepted_bundle.dropping_tx_hashes, + } } } -#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub struct Bundle { - pub txs: Vec, - - #[serde(flatten)] - pub params: BundleParams, +pub struct BundleHash { + pub bundle_hash: B256, } -#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub struct BundleParams { +pub struct CancelBundle { + pub replacement_uuid: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AcceptedBundle { + pub uuid: Uuid, + + pub txs: Vec>, + #[serde(with = "alloy_serde::quantity")] pub block_number: u64, @@ -125,60 +171,100 @@ pub struct BundleParams { #[serde(default, skip_serializing_if = "Vec::is_empty")] pub dropping_tx_hashes: Vec, -} -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct BundleHash { - pub bundle_hash: B256, + pub meter_bundle_response: MeterBundleResponse, } -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct CancelBundle { - pub replacement_uuid: String, +pub trait BundleTxs { + fn transactions(self) -> Vec>; } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct BundleWithMetadata { - uuid: Uuid, - params: BundleParams, - txs: BundleTransactions, - meter_bundle_response: MeterBundleResponse, +pub trait BundleExtensions { + fn bundle_hash(self) -> B256; + fn txn_hashes(self) -> Vec; + fn senders(self) -> Vec
; + fn gas_limit(self) -> u64; + fn da_size(self) -> u64; } -impl From for Bundle { - fn from(bundle_with_metadata: BundleWithMetadata) -> Self { - Bundle { - txs: bundle_with_metadata.txs.into(), - params: bundle_with_metadata.params.clone(), +impl BundleExtensions for T { + fn bundle_hash(self) -> B256 { + let parsed = self.transactions(); + let mut concatenated = Vec::new(); + for tx in &parsed { + concatenated.extend_from_slice(tx.tx_hash().as_slice()); } + keccak256(&concatenated) + } + + fn txn_hashes(self) -> Vec { + self.transactions().iter().map(|t| t.tx_hash()).collect() + } + + fn senders(self) -> Vec
{ + self.transactions() + .iter() + .map(|t| t.recover_signer().unwrap()) + .collect() + } + + fn gas_limit(self) -> u64 { + self.transactions().iter().map(|t| t.gas_limit()).sum() + } + + fn da_size(self) -> u64 { + self.transactions() + .iter() + .map(|t| tx_estimated_size_fjord_bytes(&t.encoded_2718())) + .sum() } } -impl BundleWithMetadata { +impl BundleTxs for ParsedBundle { + fn transactions(self) -> Vec> { + self.txs + } +} + +impl BundleTxs for AcceptedBundle { + fn transactions(self) -> Vec> { + self.txs + } +} + +impl AcceptedBundle { pub fn load( mut bundle: Bundle, meter_bundle_response: MeterBundleResponse, ) -> Result { let uuid = bundle - .params .replacement_uuid .clone() .unwrap_or_else(|| Uuid::new_v4().to_string()); let uuid = Uuid::parse_str(uuid.as_str()).map_err(|_| format!("Invalid UUID: {uuid}"))?; - bundle.params.replacement_uuid = Some(uuid.to_string()); + bundle.replacement_uuid = Some(uuid.to_string()); - let txs: BundleTransactions = bundle + // TODO: better error handling + let txs: Vec> = bundle .txs - .try_into() - .map_err(|e| format!("failed to convert transactions: {e}"))?; - Ok(BundleWithMetadata { + .into_iter() + .map(|tx| OpTxEnvelope::decode_2718_exact(tx.iter().as_slice())) + .map(|tx| tx.unwrap().try_into_recovered().unwrap()) + .collect(); + + Ok(AcceptedBundle { uuid, - params: bundle.params.clone(), txs, + block_number: bundle.block_number, + flashblock_number_min: bundle.flashblock_number_min, + flashblock_number_max: bundle.flashblock_number_max, + min_timestamp: bundle.min_timestamp, + max_timestamp: bundle.max_timestamp, + reverting_tx_hashes: bundle.reverting_tx_hashes, + replacement_uuid: bundle.replacement_uuid, + dropping_tx_hashes: bundle.dropping_tx_hashes, meter_bundle_response, }) } @@ -186,13 +272,9 @@ impl BundleWithMetadata { pub fn uuid(&self) -> &Uuid { &self.uuid } - - pub fn txs(&self) -> &BundleTransactions { - &self.txs - } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct TransactionResult { pub coinbase_diff: String, @@ -207,7 +289,7 @@ pub struct TransactionResult { pub execution_time_us: u128, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct MeterBundleResponse { pub bundle_gas_price: String, @@ -246,29 +328,24 @@ mod tests { let tx1_bytes = tx1.encoded_2718(); let tx2_bytes = tx2.encoded_2718(); - let bundle = BundleWithMetadata::load( + let bundle = AcceptedBundle::load( Bundle { txs: vec![tx1_bytes.clone().into()], - params: BundleParams { - replacement_uuid: None, - block_number: 1, - ..Default::default() - }, + block_number: 1, + replacement_uuid: None, + ..Default::default() }, create_test_meter_bundle_response(), ) .unwrap(); assert!(!bundle.uuid().is_nil()); - assert_eq!( - bundle.params.replacement_uuid, - Some(bundle.uuid().to_string()) - ); - let bundle_txs: BundleTransactions = bundle.txs.into(); - assert_eq!(bundle_txs.txn_hashes().len(), 1); - assert_eq!(bundle_txs.txn_hashes()[0], tx1.tx_hash()); - assert_eq!(bundle_txs.senders().len(), 1); - assert_eq!(bundle_txs.senders()[0], alice.address()); + assert_eq!(bundle.replacement_uuid, Some(bundle.uuid().to_string())); + let bundle_txs: ParsedBundle = bundle.into(); + assert_eq!(bundle_txs.clone().txn_hashes().len(), 1); + assert_eq!(bundle_txs.clone().txn_hashes()[0], tx1.tx_hash()); + assert_eq!(bundle_txs.clone().senders().len(), 1); + assert_eq!(bundle_txs.clone().senders()[0], alice.address()); // Bundle hashes are keccack256(...txnHashes) let expected_bundle_hash_single = { @@ -280,28 +357,26 @@ mod tests { assert_eq!(bundle_txs.bundle_hash(), expected_bundle_hash_single); let uuid = Uuid::new_v4(); - let bundle = BundleWithMetadata::load( + let bundle = AcceptedBundle::load( Bundle { txs: vec![tx1_bytes.clone().into(), tx2_bytes.clone().into()], - params: BundleParams { - replacement_uuid: Some(uuid.to_string()), - block_number: 1, - ..Default::default() - }, + block_number: 1, + replacement_uuid: Some(uuid.to_string()), + ..Default::default() }, create_test_meter_bundle_response(), ) .unwrap(); assert_eq!(*bundle.uuid(), uuid); - assert_eq!(bundle.params.replacement_uuid, Some(uuid.to_string())); - let bundle_txs2: BundleTransactions = bundle.txs.into(); - assert_eq!(bundle_txs2.txn_hashes().len(), 2); - assert_eq!(bundle_txs2.txn_hashes()[0], tx1.tx_hash()); - assert_eq!(bundle_txs2.txn_hashes()[1], tx2.tx_hash()); - assert_eq!(bundle_txs2.senders().len(), 2); - assert_eq!(bundle_txs2.senders()[0], alice.address()); - assert_eq!(bundle_txs2.senders()[1], alice.address()); + assert_eq!(bundle.replacement_uuid, Some(uuid.to_string())); + let bundle_txs2: ParsedBundle = bundle.into(); + assert_eq!(bundle_txs2.clone().txn_hashes().len(), 2); + assert_eq!(bundle_txs2.clone().txn_hashes()[0], tx1.tx_hash()); + assert_eq!(bundle_txs2.clone().txn_hashes()[1], tx2.tx_hash()); + assert_eq!(bundle_txs2.clone().senders().len(), 2); + assert_eq!(bundle_txs2.clone().senders()[0], alice.address()); + assert_eq!(bundle_txs2.clone().senders()[1], alice.address()); let expected_bundle_hash_double = { let mut hasher = Keccak256::default(); diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index 7c1b5b3..6e8459c 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -3,14 +3,14 @@ use anyhow::Result; use async_trait::async_trait; use backon::{ExponentialBuilder, Retryable}; use rdkafka::producer::{FutureProducer, FutureRecord}; -use tips_core::BundleWithMetadata; +use tips_core::AcceptedBundle; use tokio::time::Duration; use tracing::{error, info}; /// A queue to buffer transactions #[async_trait] pub trait QueuePublisher: Send + Sync { - async fn publish(&self, bundle: &BundleWithMetadata, bundle_hash: &B256) -> Result<()>; + async fn publish(&self, bundle: &AcceptedBundle, bundle_hash: &B256) -> Result<()>; } /// A queue to buffer transactions @@ -27,7 +27,7 @@ impl KafkaQueuePublisher { #[async_trait] impl QueuePublisher for KafkaQueuePublisher { - async fn publish(&self, bundle: &BundleWithMetadata, bundle_hash: &B256) -> Result<()> { + async fn publish(&self, bundle: &AcceptedBundle, bundle_hash: &B256) -> Result<()> { let key = bundle_hash.to_string(); let payload = serde_json::to_vec(&bundle)?; @@ -75,7 +75,9 @@ impl QueuePublisher for KafkaQueuePublisher { mod tests { use super::*; use rdkafka::config::ClientConfig; - use tips_core::{Bundle, BundleWithMetadata, test_utils::create_test_meter_bundle_response}; + use tips_core::{ + AcceptedBundle, Bundle, BundleExtensions, test_utils::create_test_meter_bundle_response, + }; use tokio::time::{Duration, Instant}; fn create_test_bundle() -> Bundle { @@ -94,8 +96,8 @@ mod tests { let publisher = KafkaQueuePublisher::new(producer, "tips-ingress-rpc".to_string()); let bundle = create_test_bundle(); let bundle_with_metadata = - BundleWithMetadata::load(bundle.clone(), create_test_meter_bundle_response()).unwrap(); - let bundle_hash = bundle_with_metadata.txs().bundle_hash(); + AcceptedBundle::load(bundle.clone(), create_test_meter_bundle_response()).unwrap(); + let bundle_hash = bundle_with_metadata.clone().bundle_hash(); let start = Instant::now(); let result = publisher.publish(&bundle_with_metadata, &bundle_hash).await; diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 1c6afb6..dfd7af2 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -12,7 +12,7 @@ use reth_rpc_eth_types::EthApiError; use std::time::{SystemTime, UNIX_EPOCH}; use tips_audit::{BundleEvent, BundleEventPublisher}; use tips_core::{ - BLOCK_TIME, Bundle, BundleHash, BundleParams, BundleWithMetadata, CancelBundle, + AcceptedBundle, BLOCK_TIME, Bundle, BundleExtensions, BundleHash, CancelBundle, MeterBundleResponse, }; use tracing::{info, warn}; @@ -73,13 +73,13 @@ where async fn send_bundle(&self, bundle: Bundle) -> RpcResult { self.validate_bundle(&bundle).await?; let meter_bundle_response = self.meter_bundle(&bundle).await?; - let bundle_with_metadata = BundleWithMetadata::load(bundle, meter_bundle_response) + let bundle_with_metadata = AcceptedBundle::load(bundle, meter_bundle_response) .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; - let bundle_hash = bundle_with_metadata.txs().bundle_hash(); + let bundle_hash = bundle_with_metadata.clone().bundle_hash(); if let Err(e) = self .bundle_queue - .publish(&bundle_with_metadata, &bundle_hash) + .publish(&bundle_with_metadata.clone(), &bundle_hash) .await { warn!(message = "Failed to publish bundle to queue", bundle_hash = %bundle_hash, error = %e); @@ -93,7 +93,7 @@ where let audit_event = BundleEvent::Received { bundle_id: *bundle_with_metadata.uuid(), - bundle: bundle_with_metadata.clone().into(), + bundle: Box::new(bundle_with_metadata.clone()), }; if let Err(e) = self.audit_publisher.publish(audit_event).await { warn!(message = "Failed to publish audit event", bundle_id = %bundle_with_metadata.uuid(), error = %e); @@ -121,17 +121,20 @@ where let bundle = Bundle { txs: vec![data.clone()], - params: BundleParams { - max_timestamp: Some(expiry_timestamp), - reverting_tx_hashes: vec![transaction.tx_hash()], - ..Default::default() - }, + block_number: 0, + flashblock_number_min: None, + flashblock_number_max: None, + min_timestamp: None, + max_timestamp: Some(expiry_timestamp), + reverting_tx_hashes: vec![transaction.tx_hash()], + replacement_uuid: None, + dropping_tx_hashes: vec![], }; let meter_bundle_response = self.meter_bundle(&bundle).await?; - let bundle_with_metadata = BundleWithMetadata::load(bundle.clone(), meter_bundle_response) + let bundle_with_metadata = AcceptedBundle::load(bundle.clone(), meter_bundle_response) .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; - let bundle_hash = bundle_with_metadata.txs().bundle_hash(); + let bundle_hash = bundle_with_metadata.clone().bundle_hash(); if let Err(e) = self .bundle_queue diff --git a/crates/ingress-rpc/src/validation.rs b/crates/ingress-rpc/src/validation.rs index c9f2559..fe4c04e 100644 --- a/crates/ingress-rpc/src/validation.rs +++ b/crates/ingress-rpc/src/validation.rs @@ -177,7 +177,7 @@ pub fn validate_bundle(bundle: &Bundle, bundle_gas: u64, tx_hashes: Vec) - .unwrap() .as_secs() + Duration::from_secs(3600).as_secs(); - if let Some(max_timestamp) = bundle.params.max_timestamp + if let Some(max_timestamp) = bundle.max_timestamp && max_timestamp > valid_timestamp_window { return Err(EthApiError::InvalidParams( @@ -203,7 +203,7 @@ pub fn validate_bundle(bundle: &Bundle, bundle_gas: u64, tx_hashes: Vec) - } // Partial transaction dropping is not supported, `dropping_tx_hashes` must be empty - if !bundle.params.dropping_tx_hashes.is_empty() { + if !bundle.dropping_tx_hashes.is_empty() { return Err(EthApiError::InvalidParams( "Partial transaction dropping is not supported".into(), ) @@ -211,7 +211,7 @@ pub fn validate_bundle(bundle: &Bundle, bundle_gas: u64, tx_hashes: Vec) - } // revert protection: all transaction hashes must be in `reverting_tx_hashes` - let reverting_tx_hashes_set: HashSet<_> = bundle.params.reverting_tx_hashes.iter().collect(); + let reverting_tx_hashes_set: HashSet<_> = bundle.reverting_tx_hashes.iter().collect(); let tx_hashes_set: HashSet<_> = tx_hashes.iter().collect(); if reverting_tx_hashes_set != tx_hashes_set { return Err(EthApiError::InvalidParams( @@ -238,7 +238,6 @@ mod tests { use op_alloy_network::eip2718::Encodable2718; use revm_context_interface::transaction::{AccessList, AccessListItem}; use std::time::{SystemTime, UNIX_EPOCH}; - use tips_core::BundleParams; fn create_account(nonce: u64, balance: U256) -> AccountInfo { AccountInfo { @@ -532,10 +531,8 @@ mod tests { let too_far_in_the_future = current_time + 3601; let bundle = Bundle { txs: vec![], - params: BundleParams { - max_timestamp: Some(too_far_in_the_future), - ..Default::default() - }, + max_timestamp: Some(too_far_in_the_future), + ..Default::default() }; assert_eq!( validate_bundle(&bundle, 0, vec![]), @@ -583,13 +580,8 @@ mod tests { let bundle = Bundle { txs: encoded_txs, - params: BundleParams { - block_number: 0, - min_timestamp: None, - max_timestamp: None, - reverting_tx_hashes: vec![], - ..Default::default() - }, + max_timestamp: None, + ..Default::default() }; // Test should fail due to exceeding gas limit @@ -636,13 +628,14 @@ mod tests { let bundle = Bundle { txs: encoded_txs, - params: BundleParams { - block_number: 0, - min_timestamp: None, - max_timestamp: None, - reverting_tx_hashes: vec![], - ..Default::default() - }, + block_number: 0, + flashblock_number_min: None, + flashblock_number_max: None, + min_timestamp: None, + max_timestamp: None, + reverting_tx_hashes: vec![], + replacement_uuid: None, + dropping_tx_hashes: vec![], }; // Test should fail due to exceeding gas limit @@ -658,10 +651,8 @@ mod tests { async fn test_err_bundle_partial_transaction_dropping_not_supported() { let bundle = Bundle { txs: vec![], - params: BundleParams { - dropping_tx_hashes: vec![B256::random()], - ..Default::default() - }, + dropping_tx_hashes: vec![B256::random()], + ..Default::default() }; assert_eq!( validate_bundle(&bundle, 0, vec![]), @@ -707,13 +698,8 @@ mod tests { let bundle = Bundle { txs: encoded_txs, - params: BundleParams { - block_number: 0, - min_timestamp: None, - max_timestamp: None, - reverting_tx_hashes: tx_hashes[..2].to_vec(), - ..Default::default() - }, + reverting_tx_hashes: tx_hashes[..2].to_vec(), + ..Default::default() }; // Test should fail due to exceeding gas limit From 9e2131020e1d11c032c1dec4c0f21b2be2e34474 Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 5 Nov 2025 10:13:57 -0500 Subject: [PATCH 10/20] better err handling --- crates/core/src/types.rs | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index f279a3a..26a8879 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -73,17 +73,20 @@ pub struct ParsedBundle { impl TryFrom for ParsedBundle { type Error = String; fn try_from(bundle: Bundle) -> Result { - // TODO: better error handling let txs: Vec> = bundle .txs .into_iter() .map(|tx| { OpTxEnvelope::decode_2718_exact(tx.iter().as_slice()) - .unwrap() - .try_into_recovered() - .unwrap() + .map_err(|e| format!("Failed to decode transaction: {e:?}")) + .and_then(|tx| { + tx.try_into_recovered().map_err(|e| { + format!("Failed to convert transaction to recovered: {e:?}") + }) + }) }) - .collect(); + .collect::>, String>>()?; + Ok(ParsedBundle { txs, block_number: bundle.block_number, @@ -246,13 +249,19 @@ impl AcceptedBundle { bundle.replacement_uuid = Some(uuid.to_string()); - // TODO: better error handling let txs: Vec> = bundle .txs .into_iter() - .map(|tx| OpTxEnvelope::decode_2718_exact(tx.iter().as_slice())) - .map(|tx| tx.unwrap().try_into_recovered().unwrap()) - .collect(); + .map(|tx| { + OpTxEnvelope::decode_2718_exact(tx.iter().as_slice()) + .map_err(|e| format!("Failed to decode transaction: {e:?}")) + .and_then(|tx| { + tx.try_into_recovered().map_err(|e| { + format!("Failed to convert transaction to recovered: {e:?}") + }) + }) + }) + .collect::>, String>>()?; Ok(AcceptedBundle { uuid, From 5b1e81437beade22313f7a9e0a2ceb7f12246894 Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 5 Nov 2025 10:17:17 -0500 Subject: [PATCH 11/20] reduce diff --- crates/ingress-rpc/src/validation.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/crates/ingress-rpc/src/validation.rs b/crates/ingress-rpc/src/validation.rs index fe4c04e..4ff4946 100644 --- a/crates/ingress-rpc/src/validation.rs +++ b/crates/ingress-rpc/src/validation.rs @@ -580,7 +580,10 @@ mod tests { let bundle = Bundle { txs: encoded_txs, + block_number: 0, + min_timestamp: None, max_timestamp: None, + reverting_tx_hashes: vec![], ..Default::default() }; @@ -629,13 +632,10 @@ mod tests { let bundle = Bundle { txs: encoded_txs, block_number: 0, - flashblock_number_min: None, - flashblock_number_max: None, min_timestamp: None, max_timestamp: None, reverting_tx_hashes: vec![], - replacement_uuid: None, - dropping_tx_hashes: vec![], + ..Default::default() }; // Test should fail due to exceeding gas limit @@ -698,6 +698,9 @@ mod tests { let bundle = Bundle { txs: encoded_txs, + block_number: 0, + min_timestamp: None, + max_timestamp: None, reverting_tx_hashes: tx_hashes[..2].to_vec(), ..Default::default() }; From 83dda87f9209e39afae7d62a1fad67a9ace72d18 Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 5 Nov 2025 10:19:11 -0500 Subject: [PATCH 12/20] more diff reduce --- crates/core/src/test_utils.rs | 6 +----- crates/ingress-rpc/src/service.rs | 7 +------ 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/crates/core/src/test_utils.rs b/crates/core/src/test_utils.rs index 24fc24a..a81ad16 100644 --- a/crates/core/src/test_utils.rs +++ b/crates/core/src/test_utils.rs @@ -53,13 +53,9 @@ pub fn create_test_bundle( let bundle = Bundle { txs, block_number: block_number.unwrap_or(0), - flashblock_number_min: None, - flashblock_number_max: None, min_timestamp, max_timestamp, - reverting_tx_hashes: vec![], - replacement_uuid: None, - dropping_tx_hashes: vec![], + ..Default::default() }; let meter_bundle_response = create_test_meter_bundle_response(); diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index dfd7af2..6af745f 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -121,14 +121,9 @@ where let bundle = Bundle { txs: vec![data.clone()], - block_number: 0, - flashblock_number_min: None, - flashblock_number_max: None, - min_timestamp: None, max_timestamp: Some(expiry_timestamp), reverting_tx_hashes: vec![transaction.tx_hash()], - replacement_uuid: None, - dropping_tx_hashes: vec![], + ..Default::default() }; let meter_bundle_response = self.meter_bundle(&bundle).await?; From cd22b9fe9710d2a5ca3281aeceb3978e5c36807a Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 5 Nov 2025 10:25:10 -0500 Subject: [PATCH 13/20] comments and diff --- crates/audit/tests/integration_tests.rs | 4 ++-- crates/audit/tests/s3_test.rs | 3 +-- crates/core/src/types.rs | 3 +++ 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/audit/tests/integration_tests.rs b/crates/audit/tests/integration_tests.rs index b906bd7..7587493 100644 --- a/crates/audit/tests/integration_tests.rs +++ b/crates/audit/tests/integration_tests.rs @@ -5,7 +5,7 @@ use tips_audit::{ storage::{BundleEventS3Reader, S3EventReaderWriter}, types::{BundleEvent, DropReason}, }; -use tips_core::test_utils::create_test_bundle; +use tips_core::test_utils::create_bundle_from_txn_data; use uuid::Uuid; mod common; use common::TestHarness; @@ -23,7 +23,7 @@ async fn test_kafka_publisher_s3_archiver_integration() let test_events = vec![ BundleEvent::Received { bundle_id: test_bundle_id, - bundle: Box::new(create_test_bundle(vec![], None, None, None)), + bundle: Box::new(create_bundle_from_txn_data()), }, BundleEvent::Dropped { bundle_id: test_bundle_id, diff --git a/crates/audit/tests/s3_test.rs b/crates/audit/tests/s3_test.rs index 1f47ed7..078d513 100644 --- a/crates/audit/tests/s3_test.rs +++ b/crates/audit/tests/s3_test.rs @@ -10,8 +10,7 @@ use uuid::Uuid; mod common; use common::TestHarness; -use tips_core::test_utils::TXN_HASH; -use tips_core::test_utils::create_bundle_from_txn_data; +use tips_core::test_utils::{TXN_HASH, create_bundle_from_txn_data}; fn create_test_event(key: &str, timestamp: i64, bundle_event: BundleEvent) -> Event { Event { diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 26a8879..43f9c17 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -11,6 +11,7 @@ use uuid::Uuid; /// Block time in microseconds pub const BLOCK_TIME: u128 = 2_000_000; +/// `Bundle` is the type that mirrors `EthSendBundle` and is used for the API. #[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct Bundle { @@ -57,6 +58,7 @@ pub struct Bundle { pub dropping_tx_hashes: Vec, } +/// `ParsedBundle` is the type that contains utility methods for the `Bundle` type. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ParsedBundle { pub txs: Vec>, @@ -129,6 +131,7 @@ pub struct CancelBundle { pub replacement_uuid: String, } +/// `AcceptedBundle` is the type that is sent over the wire. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AcceptedBundle { pub uuid: Uuid, From 97861cce1f59b29e5f3ac870267be912beed60be Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 5 Nov 2025 10:35:09 -0500 Subject: [PATCH 14/20] reduce clones --- crates/core/src/types.rs | 52 +++++++++++++++---------------- crates/ingress-rpc/src/queue.rs | 6 ++-- crates/ingress-rpc/src/service.rs | 14 +++++---- 3 files changed, 37 insertions(+), 35 deletions(-) diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 43f9c17..7e32812 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -182,43 +182,43 @@ pub struct AcceptedBundle { } pub trait BundleTxs { - fn transactions(self) -> Vec>; + fn transactions(&self) -> &Vec>; } pub trait BundleExtensions { - fn bundle_hash(self) -> B256; - fn txn_hashes(self) -> Vec; - fn senders(self) -> Vec
; - fn gas_limit(self) -> u64; - fn da_size(self) -> u64; + fn bundle_hash(&self) -> B256; + fn txn_hashes(&self) -> Vec; + fn senders(&self) -> Vec
; + fn gas_limit(&self) -> u64; + fn da_size(&self) -> u64; } impl BundleExtensions for T { - fn bundle_hash(self) -> B256 { + fn bundle_hash(&self) -> B256 { let parsed = self.transactions(); let mut concatenated = Vec::new(); - for tx in &parsed { + for tx in parsed { concatenated.extend_from_slice(tx.tx_hash().as_slice()); } keccak256(&concatenated) } - fn txn_hashes(self) -> Vec { + fn txn_hashes(&self) -> Vec { self.transactions().iter().map(|t| t.tx_hash()).collect() } - fn senders(self) -> Vec
{ + fn senders(&self) -> Vec
{ self.transactions() .iter() .map(|t| t.recover_signer().unwrap()) .collect() } - fn gas_limit(self) -> u64 { + fn gas_limit(&self) -> u64 { self.transactions().iter().map(|t| t.gas_limit()).sum() } - fn da_size(self) -> u64 { + fn da_size(&self) -> u64 { self.transactions() .iter() .map(|t| tx_estimated_size_fjord_bytes(&t.encoded_2718())) @@ -227,14 +227,14 @@ impl BundleExtensions for T { } impl BundleTxs for ParsedBundle { - fn transactions(self) -> Vec> { - self.txs + fn transactions(&self) -> &Vec> { + &self.txs } } impl BundleTxs for AcceptedBundle { - fn transactions(self) -> Vec> { - self.txs + fn transactions(&self) -> &Vec> { + &self.txs } } @@ -354,10 +354,10 @@ mod tests { assert!(!bundle.uuid().is_nil()); assert_eq!(bundle.replacement_uuid, Some(bundle.uuid().to_string())); let bundle_txs: ParsedBundle = bundle.into(); - assert_eq!(bundle_txs.clone().txn_hashes().len(), 1); - assert_eq!(bundle_txs.clone().txn_hashes()[0], tx1.tx_hash()); - assert_eq!(bundle_txs.clone().senders().len(), 1); - assert_eq!(bundle_txs.clone().senders()[0], alice.address()); + assert_eq!(bundle_txs.txn_hashes().len(), 1); + assert_eq!(bundle_txs.txn_hashes()[0], tx1.tx_hash()); + assert_eq!(bundle_txs.senders().len(), 1); + assert_eq!(bundle_txs.senders()[0], alice.address()); // Bundle hashes are keccack256(...txnHashes) let expected_bundle_hash_single = { @@ -383,12 +383,12 @@ mod tests { assert_eq!(*bundle.uuid(), uuid); assert_eq!(bundle.replacement_uuid, Some(uuid.to_string())); let bundle_txs2: ParsedBundle = bundle.into(); - assert_eq!(bundle_txs2.clone().txn_hashes().len(), 2); - assert_eq!(bundle_txs2.clone().txn_hashes()[0], tx1.tx_hash()); - assert_eq!(bundle_txs2.clone().txn_hashes()[1], tx2.tx_hash()); - assert_eq!(bundle_txs2.clone().senders().len(), 2); - assert_eq!(bundle_txs2.clone().senders()[0], alice.address()); - assert_eq!(bundle_txs2.clone().senders()[1], alice.address()); + assert_eq!(bundle_txs2.txn_hashes().len(), 2); + assert_eq!(bundle_txs2.txn_hashes()[0], tx1.tx_hash()); + assert_eq!(bundle_txs2.txn_hashes()[1], tx2.tx_hash()); + assert_eq!(bundle_txs2.senders().len(), 2); + assert_eq!(bundle_txs2.senders()[0], alice.address()); + assert_eq!(bundle_txs2.senders()[1], alice.address()); let expected_bundle_hash_double = { let mut hasher = Keccak256::default(); diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index 6e8459c..8a51623 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -96,11 +96,11 @@ mod tests { let publisher = KafkaQueuePublisher::new(producer, "tips-ingress-rpc".to_string()); let bundle = create_test_bundle(); let bundle_with_metadata = - AcceptedBundle::load(bundle.clone(), create_test_meter_bundle_response()).unwrap(); - let bundle_hash = bundle_with_metadata.clone().bundle_hash(); + AcceptedBundle::load(bundle, create_test_meter_bundle_response()).unwrap(); + let bundle_hash = &bundle_with_metadata.bundle_hash(); let start = Instant::now(); - let result = publisher.publish(&bundle_with_metadata, &bundle_hash).await; + let result = publisher.publish(&bundle_with_metadata, bundle_hash).await; let elapsed = start.elapsed(); // the backoff tries at minimum 100ms, so verify we tried at least once diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 6af745f..061577d 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -76,10 +76,10 @@ where let bundle_with_metadata = AcceptedBundle::load(bundle, meter_bundle_response) .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; - let bundle_hash = bundle_with_metadata.clone().bundle_hash(); + let bundle_hash = &bundle_with_metadata.bundle_hash(); if let Err(e) = self .bundle_queue - .publish(&bundle_with_metadata.clone(), &bundle_hash) + .publish(&bundle_with_metadata, bundle_hash) .await { warn!(message = "Failed to publish bundle to queue", bundle_hash = %bundle_hash, error = %e); @@ -99,7 +99,9 @@ where warn!(message = "Failed to publish audit event", bundle_id = %bundle_with_metadata.uuid(), error = %e); } - Ok(BundleHash { bundle_hash }) + Ok(BundleHash { + bundle_hash: *bundle_hash, + }) } async fn cancel_bundle(&self, _request: CancelBundle) -> RpcResult<()> { @@ -127,13 +129,13 @@ where }; let meter_bundle_response = self.meter_bundle(&bundle).await?; - let bundle_with_metadata = AcceptedBundle::load(bundle.clone(), meter_bundle_response) + let bundle_with_metadata = AcceptedBundle::load(bundle, meter_bundle_response) .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; - let bundle_hash = bundle_with_metadata.clone().bundle_hash(); + let bundle_hash = &bundle_with_metadata.bundle_hash(); if let Err(e) = self .bundle_queue - .publish(&bundle_with_metadata, &bundle_hash) + .publish(&bundle_with_metadata, bundle_hash) .await { warn!(message = "Failed to publish Queue::enqueue_bundle", bundle_hash = %bundle_hash, error = %e); From e6cc5d90f0b199ab8ed8a53e0360040fc68085eb Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 5 Nov 2025 10:37:56 -0500 Subject: [PATCH 15/20] naming --- crates/bundle-pool/src/source.rs | 7 +++---- crates/ingress-rpc/src/queue.rs | 6 +++--- crates/ingress-rpc/src/service.rs | 24 ++++++++++++------------ 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/crates/bundle-pool/src/source.rs b/crates/bundle-pool/src/source.rs index 5f3ef76..e81bcbb 100644 --- a/crates/bundle-pool/src/source.rs +++ b/crates/bundle-pool/src/source.rs @@ -52,8 +52,7 @@ impl BundleSource for KafkaBundleSource { } }; - let bundle_with_metadata: AcceptedBundle = match serde_json::from_slice(payload) - { + let accepted_bundle: AcceptedBundle = match serde_json::from_slice(payload) { Ok(b) => b, Err(e) => { error!(error = %e, "Failed to deserialize bundle"); @@ -62,13 +61,13 @@ impl BundleSource for KafkaBundleSource { }; trace!( - bundle = ?bundle_with_metadata, + bundle = ?accepted_bundle, offset = message.offset(), partition = message.partition(), "Received bundle from Kafka" ); - if let Err(e) = self.publisher.send(bundle_with_metadata) { + if let Err(e) = self.publisher.send(accepted_bundle) { error!(error = ?e, "Failed to publish bundle to queue"); } } diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index 8a51623..708937b 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -95,12 +95,12 @@ mod tests { let publisher = KafkaQueuePublisher::new(producer, "tips-ingress-rpc".to_string()); let bundle = create_test_bundle(); - let bundle_with_metadata = + let accepted_bundle = AcceptedBundle::load(bundle, create_test_meter_bundle_response()).unwrap(); - let bundle_hash = &bundle_with_metadata.bundle_hash(); + let bundle_hash = &accepted_bundle.bundle_hash(); let start = Instant::now(); - let result = publisher.publish(&bundle_with_metadata, bundle_hash).await; + let result = publisher.publish(&accepted_bundle, bundle_hash).await; let elapsed = start.elapsed(); // the backoff tries at minimum 100ms, so verify we tried at least once diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 061577d..566932a 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -73,13 +73,13 @@ where async fn send_bundle(&self, bundle: Bundle) -> RpcResult { self.validate_bundle(&bundle).await?; let meter_bundle_response = self.meter_bundle(&bundle).await?; - let bundle_with_metadata = AcceptedBundle::load(bundle, meter_bundle_response) + let accepted_bundle = AcceptedBundle::load(bundle, meter_bundle_response) .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; - let bundle_hash = &bundle_with_metadata.bundle_hash(); + let bundle_hash = &accepted_bundle.bundle_hash(); if let Err(e) = self .bundle_queue - .publish(&bundle_with_metadata, bundle_hash) + .publish(&accepted_bundle, bundle_hash) .await { warn!(message = "Failed to publish bundle to queue", bundle_hash = %bundle_hash, error = %e); @@ -92,11 +92,11 @@ where ); let audit_event = BundleEvent::Received { - bundle_id: *bundle_with_metadata.uuid(), - bundle: Box::new(bundle_with_metadata.clone()), + bundle_id: *accepted_bundle.uuid(), + bundle: Box::new(accepted_bundle.clone()), }; if let Err(e) = self.audit_publisher.publish(audit_event).await { - warn!(message = "Failed to publish audit event", bundle_id = %bundle_with_metadata.uuid(), error = %e); + warn!(message = "Failed to publish audit event", bundle_id = %accepted_bundle.uuid(), error = %e); } Ok(BundleHash { @@ -129,13 +129,13 @@ where }; let meter_bundle_response = self.meter_bundle(&bundle).await?; - let bundle_with_metadata = AcceptedBundle::load(bundle, meter_bundle_response) + let accepted_bundle = AcceptedBundle::load(bundle, meter_bundle_response) .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; - let bundle_hash = &bundle_with_metadata.bundle_hash(); + let bundle_hash = &accepted_bundle.bundle_hash(); if let Err(e) = self .bundle_queue - .publish(&bundle_with_metadata, bundle_hash) + .publish(&accepted_bundle, bundle_hash) .await { warn!(message = "Failed to publish Queue::enqueue_bundle", bundle_hash = %bundle_hash, error = %e); @@ -163,11 +163,11 @@ where } let audit_event = BundleEvent::Received { - bundle_id: *bundle_with_metadata.uuid(), - bundle: bundle_with_metadata.clone().into(), + bundle_id: *accepted_bundle.uuid(), + bundle: accepted_bundle.clone().into(), }; if let Err(e) = self.audit_publisher.publish(audit_event).await { - warn!(message = "Failed to publish audit event", bundle_id = %bundle_with_metadata.uuid(), error = %e); + warn!(message = "Failed to publish audit event", bundle_id = %accepted_bundle.uuid(), error = %e); } Ok(transaction.tx_hash()) From 10f8f25f0bbadca144e23daf576f286dbe7361d7 Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 5 Nov 2025 11:16:44 -0500 Subject: [PATCH 16/20] fmt --- crates/audit/tests/integration_tests.rs | 6 ++---- crates/audit/tests/s3_test.rs | 6 ++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/crates/audit/tests/integration_tests.rs b/crates/audit/tests/integration_tests.rs index 7587493..59e4085 100644 --- a/crates/audit/tests/integration_tests.rs +++ b/crates/audit/tests/integration_tests.rs @@ -20,16 +20,14 @@ async fn test_kafka_publisher_s3_archiver_integration() S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone()); let test_bundle_id = Uuid::new_v4(); - let test_events = vec![ - BundleEvent::Received { + let test_events = [BundleEvent::Received { bundle_id: test_bundle_id, bundle: Box::new(create_bundle_from_txn_data()), }, BundleEvent::Dropped { bundle_id: test_bundle_id, reason: DropReason::TimedOut, - }, - ]; + }]; let publisher = KafkaBundleEventPublisher::new(harness.kafka_producer, topic.to_string()); diff --git a/crates/audit/tests/s3_test.rs b/crates/audit/tests/s3_test.rs index 078d513..564bd0e 100644 --- a/crates/audit/tests/s3_test.rs +++ b/crates/audit/tests/s3_test.rs @@ -84,8 +84,7 @@ async fn test_events_appended() -> Result<(), Box Result<(), Box Date: Wed, 5 Nov 2025 11:43:21 -0500 Subject: [PATCH 17/20] fmt + parsed + change fn to new --- crates/audit/tests/integration_tests.rs | 6 ++- crates/audit/tests/s3_test.rs | 6 ++- crates/core/src/test_utils.rs | 8 ++-- crates/core/src/types.rs | 58 ++++++++++--------------- crates/ingress-rpc/src/queue.rs | 7 ++- crates/ingress-rpc/src/service.rs | 11 ++++- 6 files changed, 50 insertions(+), 46 deletions(-) diff --git a/crates/audit/tests/integration_tests.rs b/crates/audit/tests/integration_tests.rs index 59e4085..bb79d24 100644 --- a/crates/audit/tests/integration_tests.rs +++ b/crates/audit/tests/integration_tests.rs @@ -20,14 +20,16 @@ async fn test_kafka_publisher_s3_archiver_integration() S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone()); let test_bundle_id = Uuid::new_v4(); - let test_events = [BundleEvent::Received { + let test_events = [ + BundleEvent::Received { bundle_id: test_bundle_id, bundle: Box::new(create_bundle_from_txn_data()), }, BundleEvent::Dropped { bundle_id: test_bundle_id, reason: DropReason::TimedOut, - }]; + }, + ]; let publisher = KafkaBundleEventPublisher::new(harness.kafka_producer, topic.to_string()); diff --git a/crates/audit/tests/s3_test.rs b/crates/audit/tests/s3_test.rs index 564bd0e..53b1a72 100644 --- a/crates/audit/tests/s3_test.rs +++ b/crates/audit/tests/s3_test.rs @@ -84,7 +84,8 @@ async fn test_events_appended() -> Result<(), Box Result<(), Box AcceptedBundle { - AcceptedBundle::load( + AcceptedBundle::new( Bundle { txs: vec![TXN_DATA.clone()], ..Default::default() - }, + } + .try_into() + .unwrap(), create_test_meter_bundle_response(), ) .unwrap() @@ -59,7 +61,7 @@ pub fn create_test_bundle( }; let meter_bundle_response = create_test_meter_bundle_response(); - AcceptedBundle::load(bundle, meter_bundle_response).unwrap() + AcceptedBundle::new(bundle.try_into().unwrap(), meter_bundle_response).unwrap() } pub fn create_test_meter_bundle_response() -> MeterBundleResponse { diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 7e32812..cd45ed8 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -239,8 +239,8 @@ impl BundleTxs for AcceptedBundle { } impl AcceptedBundle { - pub fn load( - mut bundle: Bundle, + pub fn new( + mut bundle: ParsedBundle, meter_bundle_response: MeterBundleResponse, ) -> Result { let uuid = bundle @@ -252,23 +252,9 @@ impl AcceptedBundle { bundle.replacement_uuid = Some(uuid.to_string()); - let txs: Vec> = bundle - .txs - .into_iter() - .map(|tx| { - OpTxEnvelope::decode_2718_exact(tx.iter().as_slice()) - .map_err(|e| format!("Failed to decode transaction: {e:?}")) - .and_then(|tx| { - tx.try_into_recovered().map_err(|e| { - format!("Failed to convert transaction to recovered: {e:?}") - }) - }) - }) - .collect::>, String>>()?; - Ok(AcceptedBundle { uuid, - txs, + txs: bundle.txs, block_number: bundle.block_number, flashblock_number_min: bundle.flashblock_number_min, flashblock_number_max: bundle.flashblock_number_max, @@ -340,24 +326,25 @@ mod tests { let tx1_bytes = tx1.encoded_2718(); let tx2_bytes = tx2.encoded_2718(); - let bundle = AcceptedBundle::load( + let bundle = AcceptedBundle::new( Bundle { txs: vec![tx1_bytes.clone().into()], block_number: 1, replacement_uuid: None, ..Default::default() - }, + } + .try_into() + .unwrap(), create_test_meter_bundle_response(), ) .unwrap(); assert!(!bundle.uuid().is_nil()); assert_eq!(bundle.replacement_uuid, Some(bundle.uuid().to_string())); - let bundle_txs: ParsedBundle = bundle.into(); - assert_eq!(bundle_txs.txn_hashes().len(), 1); - assert_eq!(bundle_txs.txn_hashes()[0], tx1.tx_hash()); - assert_eq!(bundle_txs.senders().len(), 1); - assert_eq!(bundle_txs.senders()[0], alice.address()); + assert_eq!(bundle.txn_hashes().len(), 1); + assert_eq!(bundle.txn_hashes()[0], tx1.tx_hash()); + assert_eq!(bundle.senders().len(), 1); + assert_eq!(bundle.senders()[0], alice.address()); // Bundle hashes are keccack256(...txnHashes) let expected_bundle_hash_single = { @@ -366,29 +353,30 @@ mod tests { hasher.finalize() }; - assert_eq!(bundle_txs.bundle_hash(), expected_bundle_hash_single); + assert_eq!(bundle.bundle_hash(), expected_bundle_hash_single); let uuid = Uuid::new_v4(); - let bundle = AcceptedBundle::load( + let bundle = AcceptedBundle::new( Bundle { txs: vec![tx1_bytes.clone().into(), tx2_bytes.clone().into()], block_number: 1, replacement_uuid: Some(uuid.to_string()), ..Default::default() - }, + } + .try_into() + .unwrap(), create_test_meter_bundle_response(), ) .unwrap(); assert_eq!(*bundle.uuid(), uuid); assert_eq!(bundle.replacement_uuid, Some(uuid.to_string())); - let bundle_txs2: ParsedBundle = bundle.into(); - assert_eq!(bundle_txs2.txn_hashes().len(), 2); - assert_eq!(bundle_txs2.txn_hashes()[0], tx1.tx_hash()); - assert_eq!(bundle_txs2.txn_hashes()[1], tx2.tx_hash()); - assert_eq!(bundle_txs2.senders().len(), 2); - assert_eq!(bundle_txs2.senders()[0], alice.address()); - assert_eq!(bundle_txs2.senders()[1], alice.address()); + assert_eq!(bundle.txn_hashes().len(), 2); + assert_eq!(bundle.txn_hashes()[0], tx1.tx_hash()); + assert_eq!(bundle.txn_hashes()[1], tx2.tx_hash()); + assert_eq!(bundle.senders().len(), 2); + assert_eq!(bundle.senders()[0], alice.address()); + assert_eq!(bundle.senders()[1], alice.address()); let expected_bundle_hash_double = { let mut hasher = Keccak256::default(); @@ -397,7 +385,7 @@ mod tests { hasher.finalize() }; - assert_eq!(bundle_txs2.bundle_hash(), expected_bundle_hash_double); + assert_eq!(bundle.bundle_hash(), expected_bundle_hash_double); } #[test] diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index 708937b..fb6827d 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -95,8 +95,11 @@ mod tests { let publisher = KafkaQueuePublisher::new(producer, "tips-ingress-rpc".to_string()); let bundle = create_test_bundle(); - let accepted_bundle = - AcceptedBundle::load(bundle, create_test_meter_bundle_response()).unwrap(); + let accepted_bundle = AcceptedBundle::new( + bundle.try_into().unwrap(), + create_test_meter_bundle_response(), + ) + .unwrap(); let bundle_hash = &accepted_bundle.bundle_hash(); let start = Instant::now(); diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 566932a..ef18a67 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -11,6 +11,7 @@ use op_alloy_network::Optimism; use reth_rpc_eth_types::EthApiError; use std::time::{SystemTime, UNIX_EPOCH}; use tips_audit::{BundleEvent, BundleEventPublisher}; +use tips_core::types::ParsedBundle; use tips_core::{ AcceptedBundle, BLOCK_TIME, Bundle, BundleExtensions, BundleHash, CancelBundle, MeterBundleResponse, @@ -73,7 +74,10 @@ where async fn send_bundle(&self, bundle: Bundle) -> RpcResult { self.validate_bundle(&bundle).await?; let meter_bundle_response = self.meter_bundle(&bundle).await?; - let accepted_bundle = AcceptedBundle::load(bundle, meter_bundle_response) + let parsed_bundle: ParsedBundle = bundle + .try_into() + .map_err(|e: String| EthApiError::InvalidParams(e).into_rpc_err())?; + let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response) .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; let bundle_hash = &accepted_bundle.bundle_hash(); @@ -129,7 +133,10 @@ where }; let meter_bundle_response = self.meter_bundle(&bundle).await?; - let accepted_bundle = AcceptedBundle::load(bundle, meter_bundle_response) + let parsed_bundle: ParsedBundle = bundle + .try_into() + .map_err(|e: String| EthApiError::InvalidParams(e).into_rpc_err())?; + let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response) .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; let bundle_hash = &accepted_bundle.bundle_hash(); From aae7f4eaa18d916f28f55395be511bf154578740 Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 5 Nov 2025 12:01:17 -0500 Subject: [PATCH 18/20] accept bundle is err free --- crates/core/src/test_utils.rs | 3 +-- crates/core/src/types.rs | 38 ++++++++++++++----------------- crates/ingress-rpc/src/queue.rs | 3 +-- crates/ingress-rpc/src/service.rs | 6 ++--- 4 files changed, 21 insertions(+), 29 deletions(-) diff --git a/crates/core/src/test_utils.rs b/crates/core/src/test_utils.rs index b6b570e..edf3b8e 100644 --- a/crates/core/src/test_utils.rs +++ b/crates/core/src/test_utils.rs @@ -25,7 +25,6 @@ pub fn create_bundle_from_txn_data() -> AcceptedBundle { .unwrap(), create_test_meter_bundle_response(), ) - .unwrap() } pub fn create_transaction(from: PrivateKeySigner, nonce: u64, to: Address) -> OpTxEnvelope { @@ -61,7 +60,7 @@ pub fn create_test_bundle( }; let meter_bundle_response = create_test_meter_bundle_response(); - AcceptedBundle::new(bundle.try_into().unwrap(), meter_bundle_response).unwrap() + AcceptedBundle::new(bundle.try_into().unwrap(), meter_bundle_response) } pub fn create_test_meter_bundle_response() -> MeterBundleResponse { diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index cd45ed8..dc050dc 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -61,6 +61,7 @@ pub struct Bundle { /// `ParsedBundle` is the type that contains utility methods for the `Bundle` type. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ParsedBundle { + pub uuid: Uuid, pub txs: Vec>, pub block_number: u64, pub flashblock_number_min: Option, @@ -89,7 +90,15 @@ impl TryFrom for ParsedBundle { }) .collect::>, String>>()?; + let uuid = bundle + .replacement_uuid + .clone() + .unwrap_or_else(|| Uuid::new_v4().to_string()); + + let uuid = Uuid::parse_str(uuid.as_str()).map_err(|_| format!("Invalid UUID: {uuid}"))?; + Ok(ParsedBundle { + uuid, txs, block_number: bundle.block_number, flashblock_number_min: bundle.flashblock_number_min, @@ -97,7 +106,7 @@ impl TryFrom for ParsedBundle { min_timestamp: bundle.min_timestamp, max_timestamp: bundle.max_timestamp, reverting_tx_hashes: bundle.reverting_tx_hashes, - replacement_uuid: bundle.replacement_uuid, + replacement_uuid: Some(uuid.to_string()), dropping_tx_hashes: bundle.dropping_tx_hashes, }) } @@ -106,6 +115,7 @@ impl TryFrom for ParsedBundle { impl From for ParsedBundle { fn from(accepted_bundle: AcceptedBundle) -> Self { Self { + uuid: accepted_bundle.uuid, txs: accepted_bundle.txs, block_number: accepted_bundle.block_number, flashblock_number_min: accepted_bundle.flashblock_number_min, @@ -239,21 +249,9 @@ impl BundleTxs for AcceptedBundle { } impl AcceptedBundle { - pub fn new( - mut bundle: ParsedBundle, - meter_bundle_response: MeterBundleResponse, - ) -> Result { - let uuid = bundle - .replacement_uuid - .clone() - .unwrap_or_else(|| Uuid::new_v4().to_string()); - - let uuid = Uuid::parse_str(uuid.as_str()).map_err(|_| format!("Invalid UUID: {uuid}"))?; - - bundle.replacement_uuid = Some(uuid.to_string()); - - Ok(AcceptedBundle { - uuid, + pub fn new(bundle: ParsedBundle, meter_bundle_response: MeterBundleResponse) -> Self { + AcceptedBundle { + uuid: bundle.uuid, txs: bundle.txs, block_number: bundle.block_number, flashblock_number_min: bundle.flashblock_number_min, @@ -264,7 +262,7 @@ impl AcceptedBundle { replacement_uuid: bundle.replacement_uuid, dropping_tx_hashes: bundle.dropping_tx_hashes, meter_bundle_response, - }) + } } pub fn uuid(&self) -> &Uuid { @@ -336,8 +334,7 @@ mod tests { .try_into() .unwrap(), create_test_meter_bundle_response(), - ) - .unwrap(); + ); assert!(!bundle.uuid().is_nil()); assert_eq!(bundle.replacement_uuid, Some(bundle.uuid().to_string())); @@ -366,8 +363,7 @@ mod tests { .try_into() .unwrap(), create_test_meter_bundle_response(), - ) - .unwrap(); + ); assert_eq!(*bundle.uuid(), uuid); assert_eq!(bundle.replacement_uuid, Some(uuid.to_string())); diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index fb6827d..a13ad4f 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -98,8 +98,7 @@ mod tests { let accepted_bundle = AcceptedBundle::new( bundle.try_into().unwrap(), create_test_meter_bundle_response(), - ) - .unwrap(); + ); let bundle_hash = &accepted_bundle.bundle_hash(); let start = Instant::now(); diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index ef18a67..233c2ad 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -77,8 +77,7 @@ where let parsed_bundle: ParsedBundle = bundle .try_into() .map_err(|e: String| EthApiError::InvalidParams(e).into_rpc_err())?; - let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response) - .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; + let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response); let bundle_hash = &accepted_bundle.bundle_hash(); if let Err(e) = self @@ -136,8 +135,7 @@ where let parsed_bundle: ParsedBundle = bundle .try_into() .map_err(|e: String| EthApiError::InvalidParams(e).into_rpc_err())?; - let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response) - .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; + let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response); let bundle_hash = &accepted_bundle.bundle_hash(); if let Err(e) = self From 5e01fec3a22b90dfe7c565ecd505f57ce1a67f96 Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 5 Nov 2025 12:15:50 -0500 Subject: [PATCH 19/20] fix --- crates/core/src/types.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index dc050dc..686722b 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -61,7 +61,6 @@ pub struct Bundle { /// `ParsedBundle` is the type that contains utility methods for the `Bundle` type. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ParsedBundle { - pub uuid: Uuid, pub txs: Vec>, pub block_number: u64, pub flashblock_number_min: Option, @@ -69,7 +68,7 @@ pub struct ParsedBundle { pub min_timestamp: Option, pub max_timestamp: Option, pub reverting_tx_hashes: Vec, - pub replacement_uuid: Option, + pub replacement_uuid: Option, pub dropping_tx_hashes: Vec, } @@ -98,7 +97,6 @@ impl TryFrom for ParsedBundle { let uuid = Uuid::parse_str(uuid.as_str()).map_err(|_| format!("Invalid UUID: {uuid}"))?; Ok(ParsedBundle { - uuid, txs, block_number: bundle.block_number, flashblock_number_min: bundle.flashblock_number_min, @@ -106,7 +104,7 @@ impl TryFrom for ParsedBundle { min_timestamp: bundle.min_timestamp, max_timestamp: bundle.max_timestamp, reverting_tx_hashes: bundle.reverting_tx_hashes, - replacement_uuid: Some(uuid.to_string()), + replacement_uuid: Some(uuid), dropping_tx_hashes: bundle.dropping_tx_hashes, }) } @@ -115,7 +113,6 @@ impl TryFrom for ParsedBundle { impl From for ParsedBundle { fn from(accepted_bundle: AcceptedBundle) -> Self { Self { - uuid: accepted_bundle.uuid, txs: accepted_bundle.txs, block_number: accepted_bundle.block_number, flashblock_number_min: accepted_bundle.flashblock_number_min, @@ -183,7 +180,7 @@ pub struct AcceptedBundle { pub reverting_tx_hashes: Vec, #[serde(default, skip_serializing_if = "Option::is_none")] - pub replacement_uuid: Option, + pub replacement_uuid: Option, #[serde(default, skip_serializing_if = "Vec::is_empty")] pub dropping_tx_hashes: Vec, @@ -248,10 +245,11 @@ impl BundleTxs for AcceptedBundle { } } +#[allow(clippy::redundant_closure)] impl AcceptedBundle { pub fn new(bundle: ParsedBundle, meter_bundle_response: MeterBundleResponse) -> Self { AcceptedBundle { - uuid: bundle.uuid, + uuid: bundle.replacement_uuid.unwrap_or_else(|| Uuid::new_v4()), txs: bundle.txs, block_number: bundle.block_number, flashblock_number_min: bundle.flashblock_number_min, @@ -337,7 +335,7 @@ mod tests { ); assert!(!bundle.uuid().is_nil()); - assert_eq!(bundle.replacement_uuid, Some(bundle.uuid().to_string())); + assert_eq!(bundle.replacement_uuid, Some(*bundle.uuid())); assert_eq!(bundle.txn_hashes().len(), 1); assert_eq!(bundle.txn_hashes()[0], tx1.tx_hash()); assert_eq!(bundle.senders().len(), 1); @@ -366,7 +364,7 @@ mod tests { ); assert_eq!(*bundle.uuid(), uuid); - assert_eq!(bundle.replacement_uuid, Some(uuid.to_string())); + assert_eq!(bundle.replacement_uuid, Some(uuid)); assert_eq!(bundle.txn_hashes().len(), 2); assert_eq!(bundle.txn_hashes()[0], tx1.tx_hash()); assert_eq!(bundle.txn_hashes()[1], tx2.tx_hash()); From c2f60a92a6a2b48d974bfe4c731170e4155dd07e Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 5 Nov 2025 12:19:51 -0500 Subject: [PATCH 20/20] no redudant closure --- crates/core/src/types.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 686722b..7b3572d 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -245,11 +245,10 @@ impl BundleTxs for AcceptedBundle { } } -#[allow(clippy::redundant_closure)] impl AcceptedBundle { pub fn new(bundle: ParsedBundle, meter_bundle_response: MeterBundleResponse) -> Self { AcceptedBundle { - uuid: bundle.replacement_uuid.unwrap_or_else(|| Uuid::new_v4()), + uuid: bundle.replacement_uuid.unwrap_or_else(Uuid::new_v4), txs: bundle.txs, block_number: bundle.block_number, flashblock_number_min: bundle.flashblock_number_min,