diff --git a/Cargo.toml b/Cargo.toml index d2029ef..040cf38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,7 @@ alloy-serde = "1.0.41" # op-alloy op-alloy-network = { version = "0.20.0", default-features = false } -op-alloy-consensus = { version = "0.20.0", features = ["k256"] } +op-alloy-consensus = { version = "0.20.0", features = ["k256", "serde"] } op-alloy-rpc-types = { version = "0.20.0", default-features = true} op-alloy-flz = { version = "0.13.1" } diff --git a/crates/audit/Cargo.toml b/crates/audit/Cargo.toml index 5fd0ef2..25cf7e3 100644 --- a/crates/audit/Cargo.toml +++ b/crates/audit/Cargo.toml @@ -12,7 +12,7 @@ name = "tips-audit" path = "src/bin/main.rs" [dependencies] -tips-core = { workspace = true } +tips-core = { workspace = true, features = ["test-utils"] } tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/audit/src/storage.rs b/crates/audit/src/storage.rs index 59d9ae0..c8b619f 100644 --- a/crates/audit/src/storage.rs +++ b/crates/audit/src/storage.rs @@ -10,7 +10,7 @@ use aws_sdk_s3::primitives::ByteStream; use serde::{Deserialize, Serialize}; use std::fmt; use std::fmt::Debug; -use tips_core::Bundle; +use tips_core::AcceptedBundle; use tracing::info; #[derive(Debug)] @@ -39,7 +39,7 @@ pub enum BundleHistoryEvent { Received { key: String, timestamp: i64, - bundle: Bundle, + bundle: Box, }, Cancelled { key: String, @@ -365,13 +365,9 @@ mod tests { use crate::reader::Event; use crate::types::{BundleEvent, DropReason}; use alloy_primitives::TxHash; - use tips_core::Bundle; + use tips_core::test_utils::create_bundle_from_txn_data; use uuid::Uuid; - fn create_test_bundle() -> Bundle { - Bundle::default() - } - fn create_test_event(key: &str, timestamp: i64, bundle_event: BundleEvent) -> Event { Event { key: key.to_string(), @@ -383,11 +379,11 @@ mod tests { #[test] fn test_update_bundle_history_transform_adds_new_event() { let bundle_history = BundleHistory { history: vec![] }; - let bundle = create_test_bundle(); + let bundle = create_bundle_from_txn_data(); let bundle_id = Uuid::new_v4(); let bundle_event = BundleEvent::Received { bundle_id, - bundle: bundle.clone(), + bundle: Box::new(bundle.clone()), }; let event = create_test_event("test-key", 1234567890, bundle_event); @@ -416,15 +412,18 @@ mod tests { let existing_event = BundleHistoryEvent::Received { key: "duplicate-key".to_string(), timestamp: 1111111111, - bundle: create_test_bundle(), + bundle: Box::new(create_bundle_from_txn_data()), }; let bundle_history = BundleHistory { history: vec![existing_event], }; - let bundle = create_test_bundle(); + let bundle = create_bundle_from_txn_data(); let bundle_id = Uuid::new_v4(); - let bundle_event = BundleEvent::Received { bundle_id, bundle }; + let bundle_event = BundleEvent::Received { + bundle_id, + bundle: Box::new(bundle), + }; let event = create_test_event("duplicate-key", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history, &event); @@ -437,10 +436,10 @@ mod tests { let bundle_history = BundleHistory { history: vec![] }; let bundle_id = Uuid::new_v4(); - let bundle = create_test_bundle(); + let bundle = create_bundle_from_txn_data(); let bundle_event = BundleEvent::Received { bundle_id, - bundle: bundle.clone(), + bundle: Box::new(bundle), }; let event = create_test_event("test-key", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history.clone(), &event); diff --git a/crates/audit/src/types.rs b/crates/audit/src/types.rs index 4208b3b..6b0d7d1 100644 --- a/crates/audit/src/types.rs +++ b/crates/audit/src/types.rs @@ -1,10 +1,8 @@ use alloy_consensus::transaction::{SignerRecoverable, Transaction as ConsensusTransaction}; use alloy_primitives::{Address, TxHash, U256}; -use alloy_provider::network::eip2718::Decodable2718; use bytes::Bytes; -use op_alloy_consensus::OpTxEnvelope; use serde::{Deserialize, Serialize}; -use tips_core::Bundle; +use tips_core::AcceptedBundle; use uuid::Uuid; #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -33,7 +31,7 @@ pub struct Transaction { pub enum BundleEvent { Received { bundle_id: BundleId, - bundle: Bundle, + bundle: Box, }, Cancelled { bundle_id: BundleId, @@ -72,19 +70,14 @@ impl BundleEvent { bundle .txs .iter() - .filter_map(|tx_bytes| { - match OpTxEnvelope::decode_2718_exact(tx_bytes.iter().as_slice()) { - Ok(envelope) => { - match envelope.recover_signer() { - Ok(sender) => Some(TransactionId { - sender, - nonce: U256::from(envelope.nonce()), - hash: *envelope.hash(), - }), - Err(_) => None, // Skip invalid transactions - } - } - Err(_) => None, // Skip malformed transactions + .filter_map(|envelope| { + match envelope.recover_signer() { + Ok(sender) => Some(TransactionId { + sender, + nonce: U256::from(envelope.nonce()), + hash: *envelope.hash(), + }), + Err(_) => None, // Skip invalid transactions } }) .collect() diff --git a/crates/audit/tests/integration_tests.rs b/crates/audit/tests/integration_tests.rs index d2bb0fe..bb79d24 100644 --- a/crates/audit/tests/integration_tests.rs +++ b/crates/audit/tests/integration_tests.rs @@ -5,7 +5,7 @@ use tips_audit::{ storage::{BundleEventS3Reader, S3EventReaderWriter}, types::{BundleEvent, DropReason}, }; -use tips_core::Bundle; +use tips_core::test_utils::create_bundle_from_txn_data; use uuid::Uuid; mod common; use common::TestHarness; @@ -20,10 +20,10 @@ async fn test_kafka_publisher_s3_archiver_integration() S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone()); let test_bundle_id = Uuid::new_v4(); - let test_events = vec![ + let test_events = [ BundleEvent::Received { bundle_id: test_bundle_id, - bundle: Bundle::default(), + bundle: Box::new(create_bundle_from_txn_data()), }, BundleEvent::Dropped { bundle_id: test_bundle_id, diff --git a/crates/audit/tests/s3_test.rs b/crates/audit/tests/s3_test.rs index 55c8f87..53b1a72 100644 --- a/crates/audit/tests/s3_test.rs +++ b/crates/audit/tests/s3_test.rs @@ -1,30 +1,16 @@ -use alloy_primitives::{Bytes, TxHash, b256, bytes}; +use alloy_primitives::TxHash; use std::sync::Arc; use tips_audit::{ reader::Event, storage::{BundleEventS3Reader, EventWriter, S3EventReaderWriter}, types::BundleEvent, }; -use tips_core::Bundle; use tokio::task::JoinSet; use uuid::Uuid; mod common; use common::TestHarness; - -// https://basescan.org/tx/0x4f7ddfc911f5cf85dd15a413f4cbb2a0abe4f1ff275ed13581958c0bcf043c5e -const TXN_DATA: Bytes = bytes!( - "0x02f88f8221058304b6b3018315fb3883124f80948ff2f0a8d017c79454aa28509a19ab9753c2dd1480a476d58e1a0182426068c9ea5b00000000000000000002f84f00000000083e4fda54950000c080a086fbc7bbee41f441fb0f32f7aa274d2188c460fe6ac95095fa6331fa08ec4ce7a01aee3bcc3c28f7ba4e0c24da9ae85e9e0166c73cabb42c25ff7b5ecd424f3105" -); -const TXN_HASH: TxHash = - b256!("0x4f7ddfc911f5cf85dd15a413f4cbb2a0abe4f1ff275ed13581958c0bcf043c5e"); - -fn create_test_bundle() -> Bundle { - Bundle { - txs: vec![TXN_DATA.clone()], - ..Default::default() - } -} +use tips_core::test_utils::{TXN_HASH, create_bundle_from_txn_data}; fn create_test_event(key: &str, timestamp: i64, bundle_event: BundleEvent) -> Event { Event { @@ -40,13 +26,13 @@ async fn test_event_write_and_read() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box( sources: Vec, - bundle_rx: mpsc::UnboundedReceiver, + bundle_rx: mpsc::UnboundedReceiver, pool: Arc>, ) where S: BundleSource + Send + 'static, diff --git a/crates/bundle-pool/src/pool.rs b/crates/bundle-pool/src/pool.rs index fe972e4..4a529c9 100644 --- a/crates/bundle-pool/src/pool.rs +++ b/crates/bundle-pool/src/pool.rs @@ -3,7 +3,7 @@ use alloy_primitives::map::HashMap; use std::fmt::Debug; use std::sync::{Arc, Mutex}; use tips_audit::{BundleEvent, DropReason}; -use tips_core::BundleWithMetadata; +use tips_core::AcceptedBundle; use tokio::sync::mpsc; use tracing::warn; use uuid::Uuid; @@ -31,8 +31,8 @@ impl ProcessedBundle { } pub trait BundleStore { - fn add_bundle(&mut self, bundle: BundleWithMetadata); - fn get_bundles(&self) -> Vec; + fn add_bundle(&mut self, bundle: AcceptedBundle); + fn get_bundles(&self) -> Vec; fn built_flashblock( &mut self, block_number: u64, @@ -44,7 +44,7 @@ pub trait BundleStore { struct BundleData { flashblocks_in_block: HashMap>, - bundles: HashMap, + bundles: HashMap, } #[derive(Clone)] @@ -77,12 +77,12 @@ impl InMemoryBundlePool { } impl BundleStore for InMemoryBundlePool { - fn add_bundle(&mut self, bundle: BundleWithMetadata) { + fn add_bundle(&mut self, bundle: AcceptedBundle) { let mut inner = self.inner.lock().unwrap(); inner.bundles.insert(*bundle.uuid(), bundle); } - fn get_bundles(&self) -> Vec { + fn get_bundles(&self) -> Vec { let inner = self.inner.lock().unwrap(); inner.bundles.values().cloned().collect() } diff --git a/crates/bundle-pool/src/source.rs b/crates/bundle-pool/src/source.rs index db6df07..e81bcbb 100644 --- a/crates/bundle-pool/src/source.rs +++ b/crates/bundle-pool/src/source.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::{ClientConfig, Message}; use std::fmt::Debug; -use tips_core::BundleWithMetadata; +use tips_core::AcceptedBundle; use tokio::sync::mpsc; use tracing::{error, trace}; @@ -14,7 +14,7 @@ pub trait BundleSource { pub struct KafkaBundleSource { queue_consumer: StreamConsumer, - publisher: mpsc::UnboundedSender, + publisher: mpsc::UnboundedSender, } impl Debug for KafkaBundleSource { @@ -27,7 +27,7 @@ impl KafkaBundleSource { pub fn new( client_config: ClientConfig, topic: String, - publisher: mpsc::UnboundedSender, + publisher: mpsc::UnboundedSender, ) -> Result { let queue_consumer: StreamConsumer = client_config.create()?; queue_consumer.subscribe(&[topic.as_str()])?; @@ -52,23 +52,22 @@ impl BundleSource for KafkaBundleSource { } }; - let bundle_with_metadata: BundleWithMetadata = - match serde_json::from_slice(payload) { - Ok(b) => b, - Err(e) => { - error!(error = %e, "Failed to deserialize bundle"); - continue; - } - }; + let accepted_bundle: AcceptedBundle = match serde_json::from_slice(payload) { + Ok(b) => b, + Err(e) => { + error!(error = %e, "Failed to deserialize bundle"); + continue; + } + }; trace!( - bundle = ?bundle_with_metadata, + bundle = ?accepted_bundle, offset = message.offset(), partition = message.partition(), "Received bundle from Kafka" ); - if let Err(e) = self.publisher.send(bundle_with_metadata) { + if let Err(e) = self.publisher.send(accepted_bundle) { error!(error = ?e, "Failed to publish bundle to queue"); } } diff --git a/crates/bundle-pool/tests/integration_tests.rs b/crates/bundle-pool/tests/integration_tests.rs index 2765e62..b622e31 100644 --- a/crates/bundle-pool/tests/integration_tests.rs +++ b/crates/bundle-pool/tests/integration_tests.rs @@ -11,7 +11,7 @@ use tips_bundle_pool::{ BundleStore, InMemoryBundlePool, KafkaBundleSource, connect_sources_to_pool, }; use tips_core::{ - BundleWithMetadata, + AcceptedBundle, test_utils::{create_test_bundle, create_transaction}, }; use tokio::sync::mpsc; @@ -47,7 +47,7 @@ async fn test_kafka_bundle_source_to_pool_integration() -> Result<(), Box(); + let (bundle_tx, bundle_rx) = mpsc::unbounded_channel::(); let kafka_source = KafkaBundleSource::new(kafka_consumer_config, topic.to_string(), bundle_tx)?; diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 3219ef6..ecc135a 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -16,13 +16,13 @@ alloy-primitives.workspace = true alloy-consensus.workspace = true alloy-provider.workspace = true op-alloy-consensus.workspace = true -serde = { version = "1.0.228", default-features = false, features = ["alloc", "derive"] } alloy-serde = { version = "1.0.41", default-features = false } alloy-signer-local = { workspace = true, optional = true } op-alloy-rpc-types = { workspace = true, optional = true } tracing.workspace = true tracing-subscriber.workspace = true op-alloy-flz.workspace = true +serde.workspace = true [dev-dependencies] alloy-signer-local.workspace = true diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 5b7906c..c8e1515 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -6,5 +6,6 @@ pub mod types; pub mod test_utils; pub use types::{ - BLOCK_TIME, Bundle, BundleHash, BundleWithMetadata, CancelBundle, MeterBundleResponse, + AcceptedBundle, BLOCK_TIME, Bundle, BundleExtensions, BundleHash, BundleTxs, CancelBundle, + MeterBundleResponse, }; diff --git a/crates/core/src/test_utils.rs b/crates/core/src/test_utils.rs index d3f6f8d..edf3b8e 100644 --- a/crates/core/src/test_utils.rs +++ b/crates/core/src/test_utils.rs @@ -1,12 +1,32 @@ -use crate::{Bundle, BundleWithMetadata, MeterBundleResponse}; +use crate::{AcceptedBundle, Bundle, MeterBundleResponse}; use alloy_consensus::SignableTransaction; -use alloy_primitives::{Address, B256, U256}; +use alloy_primitives::{Address, B256, Bytes, TxHash, U256, b256, bytes}; use alloy_provider::network::TxSignerSync; use alloy_provider::network::eip2718::Encodable2718; use alloy_signer_local::PrivateKeySigner; use op_alloy_consensus::OpTxEnvelope; use op_alloy_rpc_types::OpTransactionRequest; +// https://basescan.org/tx/0x4f7ddfc911f5cf85dd15a413f4cbb2a0abe4f1ff275ed13581958c0bcf043c5e +pub const TXN_DATA: Bytes = bytes!( + "0x02f88f8221058304b6b3018315fb3883124f80948ff2f0a8d017c79454aa28509a19ab9753c2dd1480a476d58e1a0182426068c9ea5b00000000000000000002f84f00000000083e4fda54950000c080a086fbc7bbee41f441fb0f32f7aa274d2188c460fe6ac95095fa6331fa08ec4ce7a01aee3bcc3c28f7ba4e0c24da9ae85e9e0166c73cabb42c25ff7b5ecd424f3105" +); + +pub const TXN_HASH: TxHash = + b256!("0x4f7ddfc911f5cf85dd15a413f4cbb2a0abe4f1ff275ed13581958c0bcf043c5e"); + +pub fn create_bundle_from_txn_data() -> AcceptedBundle { + AcceptedBundle::new( + Bundle { + txs: vec![TXN_DATA.clone()], + ..Default::default() + } + .try_into() + .unwrap(), + create_test_meter_bundle_response(), + ) +} + pub fn create_transaction(from: PrivateKeySigner, nonce: u64, to: Address) -> OpTxEnvelope { let mut txn = OpTransactionRequest::default() .value(U256::from(10_000)) @@ -28,7 +48,7 @@ pub fn create_test_bundle( block_number: Option, min_timestamp: Option, max_timestamp: Option, -) -> BundleWithMetadata { +) -> AcceptedBundle { let txs = txns.iter().map(|t| t.encoded_2718().into()).collect(); let bundle = Bundle { @@ -40,7 +60,7 @@ pub fn create_test_bundle( }; let meter_bundle_response = create_test_meter_bundle_response(); - BundleWithMetadata::load(bundle, meter_bundle_response).unwrap() + AcceptedBundle::new(bundle.try_into().unwrap(), meter_bundle_response) } pub fn create_test_meter_bundle_response() -> MeterBundleResponse { diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index a38abf4..7b3572d 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -1,4 +1,5 @@ use alloy_consensus::Transaction; +use alloy_consensus::transaction::Recovered; use alloy_consensus::transaction::SignerRecoverable; use alloy_primitives::{Address, B256, Bytes, TxHash, keccak256}; use alloy_provider::network::eip2718::{Decodable2718, Encodable2718}; @@ -10,6 +11,7 @@ use uuid::Uuid; /// Block time in microseconds pub const BLOCK_TIME: u128 = 2_000_000; +/// `Bundle` is the type that mirrors `EthSendBundle` and is used for the API. #[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct Bundle { @@ -56,6 +58,74 @@ pub struct Bundle { pub dropping_tx_hashes: Vec, } +/// `ParsedBundle` is the type that contains utility methods for the `Bundle` type. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ParsedBundle { + pub txs: Vec>, + pub block_number: u64, + pub flashblock_number_min: Option, + pub flashblock_number_max: Option, + pub min_timestamp: Option, + pub max_timestamp: Option, + pub reverting_tx_hashes: Vec, + pub replacement_uuid: Option, + pub dropping_tx_hashes: Vec, +} + +impl TryFrom for ParsedBundle { + type Error = String; + fn try_from(bundle: Bundle) -> Result { + let txs: Vec> = bundle + .txs + .into_iter() + .map(|tx| { + OpTxEnvelope::decode_2718_exact(tx.iter().as_slice()) + .map_err(|e| format!("Failed to decode transaction: {e:?}")) + .and_then(|tx| { + tx.try_into_recovered().map_err(|e| { + format!("Failed to convert transaction to recovered: {e:?}") + }) + }) + }) + .collect::>, String>>()?; + + let uuid = bundle + .replacement_uuid + .clone() + .unwrap_or_else(|| Uuid::new_v4().to_string()); + + let uuid = Uuid::parse_str(uuid.as_str()).map_err(|_| format!("Invalid UUID: {uuid}"))?; + + Ok(ParsedBundle { + txs, + block_number: bundle.block_number, + flashblock_number_min: bundle.flashblock_number_min, + flashblock_number_max: bundle.flashblock_number_max, + min_timestamp: bundle.min_timestamp, + max_timestamp: bundle.max_timestamp, + reverting_tx_hashes: bundle.reverting_tx_hashes, + replacement_uuid: Some(uuid), + dropping_tx_hashes: bundle.dropping_tx_hashes, + }) + } +} + +impl From for ParsedBundle { + fn from(accepted_bundle: AcceptedBundle) -> Self { + Self { + txs: accepted_bundle.txs, + block_number: accepted_bundle.block_number, + flashblock_number_min: accepted_bundle.flashblock_number_min, + flashblock_number_max: accepted_bundle.flashblock_number_max, + min_timestamp: accepted_bundle.min_timestamp, + max_timestamp: accepted_bundle.max_timestamp, + reverting_tx_hashes: accepted_bundle.reverting_tx_hashes, + replacement_uuid: accepted_bundle.replacement_uuid, + dropping_tx_hashes: accepted_bundle.dropping_tx_hashes, + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct BundleHash { @@ -68,89 +138,136 @@ pub struct CancelBundle { pub replacement_uuid: String, } +/// `AcceptedBundle` is the type that is sent over the wire. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct BundleWithMetadata { - bundle: Bundle, - uuid: Uuid, - transactions: Vec, - meter_bundle_response: MeterBundleResponse, -} +pub struct AcceptedBundle { + pub uuid: Uuid, -impl BundleWithMetadata { - pub fn load( - mut bundle: Bundle, - meter_bundle_response: MeterBundleResponse, - ) -> Result { - let uuid = bundle - .replacement_uuid - .clone() - .unwrap_or_else(|| Uuid::new_v4().to_string()); + pub txs: Vec>, - let uuid = Uuid::parse_str(uuid.as_str()).map_err(|_| format!("Invalid UUID: {uuid}"))?; + #[serde(with = "alloy_serde::quantity")] + pub block_number: u64, - bundle.replacement_uuid = Some(uuid.to_string()); + #[serde( + default, + deserialize_with = "alloy_serde::quantity::opt::deserialize", + skip_serializing_if = "Option::is_none" + )] + pub flashblock_number_min: Option, - let transactions: Vec = bundle - .txs - .iter() - .map(|b| { - OpTxEnvelope::decode_2718_exact(b) - .map_err(|e| format!("failed to decode transaction: {e}")) - }) - .collect::, _>>()?; + #[serde( + default, + deserialize_with = "alloy_serde::quantity::opt::deserialize", + skip_serializing_if = "Option::is_none" + )] + pub flashblock_number_max: Option, - Ok(BundleWithMetadata { - bundle, - transactions, - uuid, - meter_bundle_response, - }) - } + #[serde( + default, + deserialize_with = "alloy_serde::quantity::opt::deserialize", + skip_serializing_if = "Option::is_none" + )] + pub min_timestamp: Option, - pub fn transactions(&self) -> &[OpTxEnvelope] { - self.transactions.as_slice() - } + #[serde( + default, + deserialize_with = "alloy_serde::quantity::opt::deserialize", + skip_serializing_if = "Option::is_none" + )] + pub max_timestamp: Option, - pub fn uuid(&self) -> &Uuid { - &self.uuid - } + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub reverting_tx_hashes: Vec, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub replacement_uuid: Option, + + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub dropping_tx_hashes: Vec, + + pub meter_bundle_response: MeterBundleResponse, +} + +pub trait BundleTxs { + fn transactions(&self) -> &Vec>; +} + +pub trait BundleExtensions { + fn bundle_hash(&self) -> B256; + fn txn_hashes(&self) -> Vec; + fn senders(&self) -> Vec
; + fn gas_limit(&self) -> u64; + fn da_size(&self) -> u64; +} - pub fn bundle_hash(&self) -> B256 { +impl BundleExtensions for T { + fn bundle_hash(&self) -> B256 { + let parsed = self.transactions(); let mut concatenated = Vec::new(); - for tx in self.transactions() { + for tx in parsed { concatenated.extend_from_slice(tx.tx_hash().as_slice()); } keccak256(&concatenated) } - pub fn txn_hashes(&self) -> Vec { + fn txn_hashes(&self) -> Vec { self.transactions().iter().map(|t| t.tx_hash()).collect() } - pub fn bundle(&self) -> &Bundle { - &self.bundle - } - - pub fn senders(&self) -> Vec
{ + fn senders(&self) -> Vec
{ self.transactions() .iter() .map(|t| t.recover_signer().unwrap()) .collect() } - pub fn gas_limit(&self) -> u64 { - self.transactions.iter().map(|t| t.gas_limit()).sum() + fn gas_limit(&self) -> u64 { + self.transactions().iter().map(|t| t.gas_limit()).sum() } - pub fn da_size(&self) -> u64 { - self.transactions + fn da_size(&self) -> u64 { + self.transactions() .iter() .map(|t| tx_estimated_size_fjord_bytes(&t.encoded_2718())) .sum() } } -#[derive(Debug, Clone, Serialize, Deserialize)] +impl BundleTxs for ParsedBundle { + fn transactions(&self) -> &Vec> { + &self.txs + } +} + +impl BundleTxs for AcceptedBundle { + fn transactions(&self) -> &Vec> { + &self.txs + } +} + +impl AcceptedBundle { + pub fn new(bundle: ParsedBundle, meter_bundle_response: MeterBundleResponse) -> Self { + AcceptedBundle { + uuid: bundle.replacement_uuid.unwrap_or_else(Uuid::new_v4), + txs: bundle.txs, + block_number: bundle.block_number, + flashblock_number_min: bundle.flashblock_number_min, + flashblock_number_max: bundle.flashblock_number_max, + min_timestamp: bundle.min_timestamp, + max_timestamp: bundle.max_timestamp, + reverting_tx_hashes: bundle.reverting_tx_hashes, + replacement_uuid: bundle.replacement_uuid, + dropping_tx_hashes: bundle.dropping_tx_hashes, + meter_bundle_response, + } + } + + pub fn uuid(&self) -> &Uuid { + &self.uuid + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct TransactionResult { pub coinbase_diff: String, @@ -165,7 +282,7 @@ pub struct TransactionResult { pub execution_time_us: u128, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct MeterBundleResponse { pub bundle_gas_price: String, @@ -204,22 +321,20 @@ mod tests { let tx1_bytes = tx1.encoded_2718(); let tx2_bytes = tx2.encoded_2718(); - let bundle = BundleWithMetadata::load( + let bundle = AcceptedBundle::new( Bundle { - replacement_uuid: None, txs: vec![tx1_bytes.clone().into()], block_number: 1, + replacement_uuid: None, ..Default::default() - }, + } + .try_into() + .unwrap(), create_test_meter_bundle_response(), - ) - .unwrap(); + ); assert!(!bundle.uuid().is_nil()); - assert_eq!( - bundle.bundle.replacement_uuid, - Some(bundle.uuid().to_string()) - ); + assert_eq!(bundle.replacement_uuid, Some(*bundle.uuid())); assert_eq!(bundle.txn_hashes().len(), 1); assert_eq!(bundle.txn_hashes()[0], tx1.tx_hash()); assert_eq!(bundle.senders().len(), 1); @@ -235,19 +350,20 @@ mod tests { assert_eq!(bundle.bundle_hash(), expected_bundle_hash_single); let uuid = Uuid::new_v4(); - let bundle = BundleWithMetadata::load( + let bundle = AcceptedBundle::new( Bundle { - replacement_uuid: Some(uuid.to_string()), txs: vec![tx1_bytes.clone().into(), tx2_bytes.clone().into()], block_number: 1, + replacement_uuid: Some(uuid.to_string()), ..Default::default() - }, + } + .try_into() + .unwrap(), create_test_meter_bundle_response(), - ) - .unwrap(); + ); assert_eq!(*bundle.uuid(), uuid); - assert_eq!(bundle.bundle.replacement_uuid, Some(uuid.to_string())); + assert_eq!(bundle.replacement_uuid, Some(uuid)); assert_eq!(bundle.txn_hashes().len(), 2); assert_eq!(bundle.txn_hashes()[0], tx1.tx_hash()); assert_eq!(bundle.txn_hashes()[1], tx2.tx_hash()); diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index 0f60520..a13ad4f 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -3,14 +3,14 @@ use anyhow::Result; use async_trait::async_trait; use backon::{ExponentialBuilder, Retryable}; use rdkafka::producer::{FutureProducer, FutureRecord}; -use tips_core::BundleWithMetadata; +use tips_core::AcceptedBundle; use tokio::time::Duration; use tracing::{error, info}; /// A queue to buffer transactions #[async_trait] pub trait QueuePublisher: Send + Sync { - async fn publish(&self, bundle: &BundleWithMetadata, bundle_hash: &B256) -> Result<()>; + async fn publish(&self, bundle: &AcceptedBundle, bundle_hash: &B256) -> Result<()>; } /// A queue to buffer transactions @@ -27,7 +27,7 @@ impl KafkaQueuePublisher { #[async_trait] impl QueuePublisher for KafkaQueuePublisher { - async fn publish(&self, bundle: &BundleWithMetadata, bundle_hash: &B256) -> Result<()> { + async fn publish(&self, bundle: &AcceptedBundle, bundle_hash: &B256) -> Result<()> { let key = bundle_hash.to_string(); let payload = serde_json::to_vec(&bundle)?; @@ -75,7 +75,9 @@ impl QueuePublisher for KafkaQueuePublisher { mod tests { use super::*; use rdkafka::config::ClientConfig; - use tips_core::{Bundle, BundleWithMetadata, test_utils::create_test_meter_bundle_response}; + use tips_core::{ + AcceptedBundle, Bundle, BundleExtensions, test_utils::create_test_meter_bundle_response, + }; use tokio::time::{Duration, Instant}; fn create_test_bundle() -> Bundle { @@ -93,12 +95,14 @@ mod tests { let publisher = KafkaQueuePublisher::new(producer, "tips-ingress-rpc".to_string()); let bundle = create_test_bundle(); - let bundle_with_metadata = - BundleWithMetadata::load(bundle.clone(), create_test_meter_bundle_response()).unwrap(); - let bundle_hash = bundle_with_metadata.bundle_hash(); + let accepted_bundle = AcceptedBundle::new( + bundle.try_into().unwrap(), + create_test_meter_bundle_response(), + ); + let bundle_hash = &accepted_bundle.bundle_hash(); let start = Instant::now(); - let result = publisher.publish(&bundle_with_metadata, &bundle_hash).await; + let result = publisher.publish(&accepted_bundle, bundle_hash).await; let elapsed = start.elapsed(); // the backoff tries at minimum 100ms, so verify we tried at least once diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index fe3ce65..233c2ad 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -11,8 +11,10 @@ use op_alloy_network::Optimism; use reth_rpc_eth_types::EthApiError; use std::time::{SystemTime, UNIX_EPOCH}; use tips_audit::{BundleEvent, BundleEventPublisher}; +use tips_core::types::ParsedBundle; use tips_core::{ - BLOCK_TIME, Bundle, BundleHash, BundleWithMetadata, CancelBundle, MeterBundleResponse, + AcceptedBundle, BLOCK_TIME, Bundle, BundleExtensions, BundleHash, CancelBundle, + MeterBundleResponse, }; use tracing::{info, warn}; @@ -72,13 +74,15 @@ where async fn send_bundle(&self, bundle: Bundle) -> RpcResult { self.validate_bundle(&bundle).await?; let meter_bundle_response = self.meter_bundle(&bundle).await?; - let bundle_with_metadata = BundleWithMetadata::load(bundle, meter_bundle_response) - .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; + let parsed_bundle: ParsedBundle = bundle + .try_into() + .map_err(|e: String| EthApiError::InvalidParams(e).into_rpc_err())?; + let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response); - let bundle_hash = bundle_with_metadata.bundle_hash(); + let bundle_hash = &accepted_bundle.bundle_hash(); if let Err(e) = self .bundle_queue - .publish(&bundle_with_metadata, &bundle_hash) + .publish(&accepted_bundle, bundle_hash) .await { warn!(message = "Failed to publish bundle to queue", bundle_hash = %bundle_hash, error = %e); @@ -88,18 +92,19 @@ where info!( message = "queued bundle", bundle_hash = %bundle_hash, - tx_count = bundle_with_metadata.transactions().len(), ); let audit_event = BundleEvent::Received { - bundle_id: *bundle_with_metadata.uuid(), - bundle: bundle_with_metadata.bundle().clone(), + bundle_id: *accepted_bundle.uuid(), + bundle: Box::new(accepted_bundle.clone()), }; if let Err(e) = self.audit_publisher.publish(audit_event).await { - warn!(message = "Failed to publish audit event", bundle_id = %bundle_with_metadata.uuid(), error = %e); + warn!(message = "Failed to publish audit event", bundle_id = %accepted_bundle.uuid(), error = %e); } - Ok(BundleHash { bundle_hash }) + Ok(BundleHash { + bundle_hash: *bundle_hash, + }) } async fn cancel_bundle(&self, _request: CancelBundle) -> RpcResult<()> { @@ -127,13 +132,15 @@ where }; let meter_bundle_response = self.meter_bundle(&bundle).await?; - let bundle_with_metadata = BundleWithMetadata::load(bundle, meter_bundle_response) - .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; - let bundle_hash = bundle_with_metadata.bundle_hash(); + let parsed_bundle: ParsedBundle = bundle + .try_into() + .map_err(|e: String| EthApiError::InvalidParams(e).into_rpc_err())?; + let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response); + let bundle_hash = &accepted_bundle.bundle_hash(); if let Err(e) = self .bundle_queue - .publish(&bundle_with_metadata, &bundle_hash) + .publish(&accepted_bundle, bundle_hash) .await { warn!(message = "Failed to publish Queue::enqueue_bundle", bundle_hash = %bundle_hash, error = %e); @@ -161,11 +168,11 @@ where } let audit_event = BundleEvent::Received { - bundle_id: *bundle_with_metadata.uuid(), - bundle: bundle_with_metadata.bundle().clone(), + bundle_id: *accepted_bundle.uuid(), + bundle: accepted_bundle.clone().into(), }; if let Err(e) = self.audit_publisher.publish(audit_event).await { - warn!(message = "Failed to publish audit event", bundle_id = %bundle_with_metadata.uuid(), error = %e); + warn!(message = "Failed to publish audit event", bundle_id = %accepted_bundle.uuid(), error = %e); } Ok(transaction.tx_hash())