diff --git a/.env.example b/.env.example index 8788158..69b1a01 100644 --- a/.env.example +++ b/.env.example @@ -3,8 +3,10 @@ TIPS_INGRESS_ADDRESS=0.0.0.0 TIPS_INGRESS_PORT=8080 TIPS_INGRESS_RPC_MEMPOOL=http://localhost:2222 TIPS_INGRESS_DUAL_WRITE_MEMPOOL=false -TIPS_INGRESS_KAFKA_INGRESS_PROPERTIES_FILE=/app/docker/ingress-kafka-properties +TIPS_INGRESS_KAFKA_INGRESS_PROPERTIES_FILE=/app/docker/ingress-bundles-kafka-properties TIPS_INGRESS_KAFKA_INGRESS_TOPIC=tips-ingress +TIPS_INGRESS_KAFKA_AUDIT_PROPERTIES_FILE=/app/docker/ingress-audit-kafka-properties +TIPS_INGRESS_KAFKA_AUDIT_TOPIC=tips-audit TIPS_INGRESS_LOG_LEVEL=info TIPS_INGRESS_SEND_TRANSACTION_DEFAULT_LIFETIME_SECONDS=10800 diff --git a/Cargo.lock b/Cargo.lock index 837f78d..b93bed8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -166,7 +166,7 @@ dependencies = [ "alloy-sol-types", "auto_impl", "derive_more", - "op-alloy-consensus 0.20.0", + "op-alloy-consensus", "op-alloy-rpc-types-engine", "op-revm", "revm", @@ -278,7 +278,7 @@ dependencies = [ "alloy-op-hardforks", "alloy-primitives", "auto_impl", - "op-alloy-consensus 0.20.0", + "op-alloy-consensus", "op-revm", "revm", ] @@ -4016,8 +4016,10 @@ checksum = "3a501241474c3118833d6195312ae7eb7cc90bbb0d5f524cbb0b06619e49ff67" dependencies = [ "alloy-consensus", "alloy-eips", + "alloy-network", "alloy-primitives", "alloy-rlp", + "alloy-rpc-types-eth", "alloy-serde", "derive_more", "serde", @@ -4025,28 +4027,16 @@ dependencies = [ ] [[package]] -name = "op-alloy-consensus" -version = "0.21.0" +name = "op-alloy-flz" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf1fc8aa0e2f5b136d101630be009e4e6dbdd1f17bc3ce670f431511600d2930" -dependencies = [ - "alloy-consensus", - "alloy-eips", - "alloy-network", - "alloy-primitives", - "alloy-rlp", - "alloy-rpc-types-eth", - "alloy-serde", - "derive_more", - "serde", - "thiserror", -] +checksum = "a79f352fc3893dcd670172e615afef993a41798a1d3fc0db88a3e60ef2e70ecc" [[package]] name = "op-alloy-network" -version = "0.21.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c5cca341184dbfcb49dbc124e5958e6a857499f04782907e5d969abb644e0b6" +checksum = "f80108e3b36901200a4c5df1db1ee9ef6ce685b59ea79d7be1713c845e3765da" dependencies = [ "alloy-consensus", "alloy-network", @@ -4054,8 +4044,8 @@ dependencies = [ "alloy-provider", "alloy-rpc-types-eth", "alloy-signer", - "op-alloy-consensus 0.21.0", - "op-alloy-rpc-types 0.21.0", + "op-alloy-consensus", + "op-alloy-rpc-types", ] [[package]] @@ -4071,26 +4061,7 @@ dependencies = [ "alloy-rpc-types-eth", "alloy-serde", "derive_more", - "op-alloy-consensus 0.20.0", - "serde", - "serde_json", - "thiserror", -] - -[[package]] -name = "op-alloy-rpc-types" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "274972c3c5e911b6675f6794ea0476b05e0bc1ea7e464f99ec2dc01b76d2eeb6" -dependencies = [ - "alloy-consensus", - "alloy-eips", - "alloy-network-primitives", - "alloy-primitives", - "alloy-rpc-types-eth", - "alloy-serde", - "derive_more", - "op-alloy-consensus 0.21.0", + "op-alloy-consensus", "serde", "serde_json", "thiserror", @@ -4110,7 +4081,7 @@ dependencies = [ "derive_more", "ethereum_ssz", "ethereum_ssz_derive", - "op-alloy-consensus 0.20.0", + "op-alloy-consensus", "snap", "thiserror", ] @@ -4909,7 +4880,7 @@ dependencies = [ "alloy-trie", "bytes", "modular-bitfield", - "op-alloy-consensus 0.20.0", + "op-alloy-consensus", "reth-codecs-derive", "reth-zstd-compressors", "serde", @@ -5188,8 +5159,8 @@ dependencies = [ "alloy-primitives", "derive_more", "miniz_oxide", - "op-alloy-consensus 0.20.0", - "op-alloy-rpc-types 0.20.0", + "op-alloy-consensus", + "op-alloy-rpc-types", "reth-chainspec", "reth-ethereum-forks", "reth-network-peers", @@ -5236,7 +5207,7 @@ dependencies = [ "alloy-evm", "alloy-op-evm", "alloy-primitives", - "op-alloy-consensus 0.20.0", + "op-alloy-consensus", "op-alloy-rpc-types-engine", "op-revm", "reth-chainspec", @@ -5274,7 +5245,7 @@ dependencies = [ "alloy-primitives", "alloy-rlp", "bytes", - "op-alloy-consensus 0.20.0", + "op-alloy-consensus", "reth-codecs", "reth-primitives-traits", "reth-zstd-compressors", @@ -5298,7 +5269,7 @@ dependencies = [ "bytes", "derive_more", "once_cell", - "op-alloy-consensus 0.20.0", + "op-alloy-consensus", "reth-codecs", "revm-bytecode", "revm-primitives", @@ -6870,7 +6841,7 @@ dependencies = [ "bytes", "clap", "dotenvy", - "op-alloy-consensus 0.21.0", + "op-alloy-consensus", "rdkafka", "serde", "serde_json", @@ -6894,8 +6865,8 @@ dependencies = [ "alloy-signer-local", "anyhow", "async-trait", - "op-alloy-consensus 0.21.0", - "op-alloy-rpc-types 0.21.0", + "op-alloy-consensus", + "op-alloy-rpc-types", "rdkafka", "serde", "serde_json", @@ -6917,8 +6888,9 @@ dependencies = [ "alloy-provider", "alloy-serde", "alloy-signer-local", - "op-alloy-consensus 0.21.0", - "op-alloy-rpc-types 0.21.0", + "op-alloy-consensus", + "op-alloy-flz", + "op-alloy-rpc-types", "serde", "tracing", "tracing-subscriber 0.3.20", @@ -6939,7 +6911,7 @@ dependencies = [ "clap", "dotenvy", "jsonrpsee", - "op-alloy-consensus 0.21.0", + "op-alloy-consensus", "op-alloy-network", "op-revm", "rdkafka", @@ -6947,6 +6919,7 @@ dependencies = [ "reth-rpc-eth-types", "revm-context-interface", "serde_json", + "tips-audit", "tips-core", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index cb8ff2f..d2029ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,16 +25,15 @@ alloy-primitives = { version = "1.3.1", default-features = false, features = [ "map-foldhash", "serde", ] } -alloy-rpc-types = { version = "1.0.35", default-features = false } -alloy-consensus = { version = "1.0.35" } -alloy-provider = { version = "1.0.35" } -alloy-rpc-types-mev = "1.0.35" +alloy-consensus = { version = "1.0.37" } +alloy-provider = { version = "1.0.37" } alloy-serde = "1.0.41" # op-alloy -op-alloy-network = { version = "0.21.0", default-features = false } -op-alloy-consensus = { version = "0.21.0", features = ["k256"] } -op-alloy-rpc-types = { version = "0.21.0", default-features = true} +op-alloy-network = { version = "0.20.0", default-features = false } +op-alloy-consensus = { version = "0.20.0", features = ["k256"] } +op-alloy-rpc-types = { version = "0.20.0", default-features = true} +op-alloy-flz = { version = "0.13.1" } tokio = { version = "1.47.1", features = ["full"] } tracing = "0.1.41" diff --git a/crates/audit/src/storage.rs b/crates/audit/src/storage.rs index ed5d947..59d9ae0 100644 --- a/crates/audit/src/storage.rs +++ b/crates/audit/src/storage.rs @@ -36,12 +36,7 @@ pub struct TransactionMetadata { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "event", content = "data")] pub enum BundleHistoryEvent { - Created { - key: String, - timestamp: i64, - bundle: Bundle, - }, - Updated { + Received { key: String, timestamp: i64, bundle: Bundle, @@ -73,8 +68,7 @@ pub enum BundleHistoryEvent { impl BundleHistoryEvent { pub fn key(&self) -> &str { match self { - BundleHistoryEvent::Created { key, .. } => key, - BundleHistoryEvent::Updated { key, .. } => key, + BundleHistoryEvent::Received { key, .. } => key, BundleHistoryEvent::Cancelled { key, .. } => key, BundleHistoryEvent::BuilderIncluded { key, .. } => key, BundleHistoryEvent::BlockIncluded { key, .. } => key, @@ -106,12 +100,7 @@ fn update_bundle_history_transform( } let history_event = match &event.event { - BundleEvent::Created { bundle, .. } => BundleHistoryEvent::Created { - key: event.key.clone(), - timestamp: event.timestamp, - bundle: bundle.clone(), - }, - BundleEvent::Updated { bundle, .. } => BundleHistoryEvent::Updated { + BundleEvent::Received { bundle, .. } => BundleHistoryEvent::Received { key: event.key.clone(), timestamp: event.timestamp, bundle: bundle.clone(), @@ -396,7 +385,7 @@ mod tests { let bundle_history = BundleHistory { history: vec![] }; let bundle = create_test_bundle(); let bundle_id = Uuid::new_v4(); - let bundle_event = BundleEvent::Created { + let bundle_event = BundleEvent::Received { bundle_id, bundle: bundle.clone(), }; @@ -409,7 +398,7 @@ mod tests { assert_eq!(bundle_history.history.len(), 1); match &bundle_history.history[0] { - BundleHistoryEvent::Created { + BundleHistoryEvent::Received { key, timestamp: ts, bundle: b, @@ -424,7 +413,7 @@ mod tests { #[test] fn test_update_bundle_history_transform_skips_duplicate_key() { - let existing_event = BundleHistoryEvent::Created { + let existing_event = BundleHistoryEvent::Received { key: "duplicate-key".to_string(), timestamp: 1111111111, bundle: create_test_bundle(), @@ -435,7 +424,7 @@ mod tests { let bundle = create_test_bundle(); let bundle_id = Uuid::new_v4(); - let bundle_event = BundleEvent::Updated { bundle_id, bundle }; + let bundle_event = BundleEvent::Received { bundle_id, bundle }; let event = create_test_event("duplicate-key", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history, &event); @@ -449,7 +438,7 @@ mod tests { let bundle_id = Uuid::new_v4(); let bundle = create_test_bundle(); - let bundle_event = BundleEvent::Created { + let bundle_event = BundleEvent::Received { bundle_id, bundle: bundle.clone(), }; @@ -457,16 +446,8 @@ mod tests { let result = update_bundle_history_transform(bundle_history.clone(), &event); assert!(result.is_some()); - let bundle_event = BundleEvent::Updated { - bundle_id, - bundle: bundle.clone(), - }; - let event = create_test_event("test-key-2", 1234567890, bundle_event); - let result = update_bundle_history_transform(bundle_history.clone(), &event); - assert!(result.is_some()); - let bundle_event = BundleEvent::Cancelled { bundle_id }; - let event = create_test_event("test-key-3", 1234567890, bundle_event); + let event = create_test_event("test-key-2", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history.clone(), &event); assert!(result.is_some()); @@ -476,7 +457,7 @@ mod tests { block_number: 12345, flashblock_index: 1, }; - let event = create_test_event("test-key-4", 1234567890, bundle_event); + let event = create_test_event("test-key-3", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history.clone(), &event); assert!(result.is_some()); @@ -485,7 +466,7 @@ mod tests { block_number: 12345, block_hash: TxHash::from([1u8; 32]), }; - let event = create_test_event("test-key-5", 1234567890, bundle_event); + let event = create_test_event("test-key-4", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history.clone(), &event); assert!(result.is_some()); @@ -493,7 +474,7 @@ mod tests { bundle_id, reason: DropReason::TimedOut, }; - let event = create_test_event("test-key-6", 1234567890, bundle_event); + let event = create_test_event("test-key-5", 1234567890, bundle_event); let result = update_bundle_history_transform(bundle_history, &event); assert!(result.is_some()); } diff --git a/crates/audit/src/types.rs b/crates/audit/src/types.rs index a29ac2f..4208b3b 100644 --- a/crates/audit/src/types.rs +++ b/crates/audit/src/types.rs @@ -31,11 +31,7 @@ pub struct Transaction { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "event", content = "data")] pub enum BundleEvent { - Created { - bundle_id: BundleId, - bundle: Bundle, - }, - Updated { + Received { bundle_id: BundleId, bundle: Bundle, }, @@ -62,8 +58,7 @@ pub enum BundleEvent { impl BundleEvent { pub fn bundle_id(&self) -> BundleId { match self { - BundleEvent::Created { bundle_id, .. } => *bundle_id, - BundleEvent::Updated { bundle_id, .. } => *bundle_id, + BundleEvent::Received { bundle_id, .. } => *bundle_id, BundleEvent::Cancelled { bundle_id, .. } => *bundle_id, BundleEvent::BuilderIncluded { bundle_id, .. } => *bundle_id, BundleEvent::BlockIncluded { bundle_id, .. } => *bundle_id, @@ -73,7 +68,7 @@ impl BundleEvent { pub fn transaction_ids(&self) -> Vec { match self { - BundleEvent::Created { bundle, .. } | BundleEvent::Updated { bundle, .. } => { + BundleEvent::Received { bundle, .. } => { bundle .txs .iter() diff --git a/crates/audit/tests/integration_tests.rs b/crates/audit/tests/integration_tests.rs index f219677..d2bb0fe 100644 --- a/crates/audit/tests/integration_tests.rs +++ b/crates/audit/tests/integration_tests.rs @@ -21,7 +21,7 @@ async fn test_kafka_publisher_s3_archiver_integration() let test_bundle_id = Uuid::new_v4(); let test_events = vec![ - BundleEvent::Created { + BundleEvent::Received { bundle_id: test_bundle_id, bundle: Bundle::default(), }, diff --git a/crates/audit/tests/s3_test.rs b/crates/audit/tests/s3_test.rs index f637d61..55c8f87 100644 --- a/crates/audit/tests/s3_test.rs +++ b/crates/audit/tests/s3_test.rs @@ -44,7 +44,7 @@ async fn test_event_write_and_read() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box, audit_log: mpsc::UnboundedSender, diff --git a/crates/bundle-pool/src/source.rs b/crates/bundle-pool/src/source.rs index 65592f9..aa0f1e5 100644 --- a/crates/bundle-pool/src/source.rs +++ b/crates/bundle-pool/src/source.rs @@ -2,9 +2,10 @@ use anyhow::Result; use async_trait::async_trait; use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::{ClientConfig, Message}; +use std::fmt::Debug; use tips_core::{Bundle, BundleWithMetadata}; use tokio::sync::mpsc; -use tracing::{debug, error}; +use tracing::{error, trace}; #[async_trait] pub trait BundleSource { @@ -16,6 +17,12 @@ pub struct KafkaBundleSource { publisher: mpsc::UnboundedSender, } +impl Debug for KafkaBundleSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "KafkaBundleSource") + } +} + impl KafkaBundleSource { pub fn new( client_config: ClientConfig, @@ -53,7 +60,7 @@ impl BundleSource for KafkaBundleSource { } }; - debug!( + trace!( bundle = ?bundle, offset = message.offset(), partition = message.partition(), diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 8774956..cfe431b 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -22,6 +22,7 @@ alloy-signer-local = { workspace = true, optional = true } op-alloy-rpc-types = { workspace = true, optional = true } tracing.workspace = true tracing-subscriber.workspace = true +op-alloy-flz.workspace = true [dev-dependencies] alloy-signer-local.workspace = true diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 9e6e645..51a19ca 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -1,7 +1,9 @@ +use alloy_consensus::Transaction; use alloy_consensus::transaction::SignerRecoverable; use alloy_primitives::{Address, B256, Bytes, TxHash, keccak256}; -use alloy_provider::network::eip2718::Decodable2718; +use alloy_provider::network::eip2718::{Decodable2718, Encodable2718}; use op_alloy_consensus::OpTxEnvelope; +use op_alloy_flz::tx_estimated_size_fjord_bytes; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -127,6 +129,17 @@ impl BundleWithMetadata { .map(|t| t.recover_signer().unwrap()) .collect() } + + pub fn gas_limit(&self) -> u64 { + self.transactions.iter().map(|t| t.gas_limit()).sum() + } + + pub fn da_size(&self) -> u64 { + self.transactions + .iter() + .map(|t| tx_estimated_size_fjord_bytes(&t.encoded_2718())) + .sum() + } } #[cfg(test)] diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 120aa1e..5df7ec6 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -13,6 +13,7 @@ path = "src/bin/main.rs" [dependencies] tips-core.workspace = true +tips-audit.workspace = true jsonrpsee.workspace = true alloy-primitives.workspace = true op-alloy-network.workspace = true diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index 64d6fae..0ea590b 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -5,6 +5,7 @@ use op_alloy_network::Optimism; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; use std::net::IpAddr; +use tips_audit::KafkaBundleEventPublisher; use tips_core::kafka::load_kafka_config_from_file; use tips_core::logger::init_logger; use tips_ingress_rpc::queue::KafkaQueuePublisher; @@ -43,6 +44,18 @@ struct Config { )] ingress_topic: String, + /// Kafka properties file for audit events + #[arg(long, env = "TIPS_INGRESS_KAFKA_AUDIT_PROPERTIES_FILE")] + audit_kafka_properties: String, + + /// Kafka topic for audit events + #[arg( + long, + env = "TIPS_INGRESS_KAFKA_AUDIT_TOPIC", + default_value = "tips-audit" + )] + audit_topic: String, + #[arg(long, env = "TIPS_INGRESS_LOG_LEVEL", default_value = "info")] log_level: String, @@ -75,18 +88,26 @@ async fn main() -> anyhow::Result<()> { .network::() .connect_http(config.mempool_url); - let client_config = ClientConfig::from_iter(load_kafka_config_from_file( + let ingress_client_config = ClientConfig::from_iter(load_kafka_config_from_file( &config.ingress_kafka_properties, )?); - let queue_producer: FutureProducer = client_config.create()?; + let queue_producer: FutureProducer = ingress_client_config.create()?; let queue = KafkaQueuePublisher::new(queue_producer, config.ingress_topic); + let audit_client_config = + ClientConfig::from_iter(load_kafka_config_from_file(&config.audit_kafka_properties)?); + + let audit_producer: FutureProducer = audit_client_config.create()?; + + let audit_publisher = KafkaBundleEventPublisher::new(audit_producer, config.audit_topic); + let service = IngressService::new( provider, config.dual_write_mempool, queue, + audit_publisher, config.send_transaction_default_lifetime_seconds, ); let bind_addr = format!("{}:{}", config.address, config.port); diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index dafd085..811788a 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -10,6 +10,7 @@ use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::Optimism; use reth_rpc_eth_types::EthApiError; use std::time::{SystemTime, UNIX_EPOCH}; +use tips_audit::{BundleEvent, BundleEventPublisher}; use tips_core::{Bundle, BundleHash, BundleWithMetadata, CancelBundle}; use tracing::{info, warn}; @@ -31,40 +32,44 @@ pub trait IngressApi { async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult; } -pub struct IngressService { +pub struct IngressService { provider: RootProvider, dual_write_mempool: bool, - queue: Queue, + bundle_queue: Queue, + audit_publisher: Audit, send_transaction_default_lifetime_seconds: u64, } -impl IngressService { +impl IngressService { pub fn new( provider: RootProvider, dual_write_mempool: bool, queue: Queue, + audit_publisher: Audit, send_transaction_default_lifetime_seconds: u64, ) -> Self { Self { provider, dual_write_mempool, - queue, + bundle_queue: queue, + audit_publisher, send_transaction_default_lifetime_seconds, } } } #[async_trait] -impl IngressApiServer for IngressService +impl IngressApiServer for IngressService where Queue: QueuePublisher + Sync + Send + 'static, + Audit: BundleEventPublisher + Sync + Send + 'static, { async fn send_bundle(&self, bundle: Bundle) -> RpcResult { let bundle_with_metadata = self.validate_bundle(bundle).await?; let bundle_hash = bundle_with_metadata.bundle_hash(); if let Err(e) = self - .queue + .bundle_queue .publish(bundle_with_metadata.bundle(), &bundle_hash) .await { @@ -78,6 +83,14 @@ where tx_count = bundle_with_metadata.transactions().len(), ); + let audit_event = BundleEvent::Received { + bundle_id: *bundle_with_metadata.uuid(), + bundle: bundle_with_metadata.bundle().clone(), + }; + if let Err(e) = self.audit_publisher.publish(audit_event).await { + warn!(message = "Failed to publish audit event", bundle_id = %bundle_with_metadata.uuid(), error = %e); + } + Ok(BundleHash { bundle_hash }) } @@ -108,8 +121,9 @@ where let bundle_with_metadata = BundleWithMetadata::load(bundle) .map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?; let bundle_hash = bundle_with_metadata.bundle_hash(); + if let Err(e) = self - .queue + .bundle_queue .publish(bundle_with_metadata.bundle(), &bundle_hash) .await { @@ -137,13 +151,22 @@ where } } + let audit_event = BundleEvent::Received { + bundle_id: *bundle_with_metadata.uuid(), + bundle: bundle_with_metadata.bundle().clone(), + }; + if let Err(e) = self.audit_publisher.publish(audit_event).await { + warn!(message = "Failed to publish audit event", bundle_id = %bundle_with_metadata.uuid(), error = %e); + } + Ok(transaction.tx_hash()) } } -impl IngressService +impl IngressService where Queue: QueuePublisher + Sync + Send + 'static, + Audit: BundleEventPublisher + Sync + Send + 'static, { async fn validate_tx(&self, data: &Bytes) -> RpcResult> { if data.is_empty() { diff --git a/docker-compose.tips.yml b/docker-compose.tips.yml index 6b5d01c..e4613f7 100644 --- a/docker-compose.tips.yml +++ b/docker-compose.tips.yml @@ -11,7 +11,8 @@ services: env_file: - .env.docker volumes: - - ./docker/ingress-kafka-properties:/app/docker/ingress-kafka-properties:ro + - ./docker/ingress-bundles-kafka-properties:/app/docker/ingress-bundles-kafka-properties:ro + - ./docker/ingress-audit-kafka-properties:/app/docker/ingress-audit-kafka-properties:ro restart: unless-stopped audit: diff --git a/docker/ingress-audit-kafka-properties b/docker/ingress-audit-kafka-properties new file mode 100644 index 0000000..d9606ff --- /dev/null +++ b/docker/ingress-audit-kafka-properties @@ -0,0 +1,3 @@ +# Kafka configuration properties for ingress audit events +bootstrap.servers=host.docker.internal:9094 +message.timeout.ms=5000 diff --git a/docker/ingress-kafka-properties b/docker/ingress-bundles-kafka-properties similarity index 100% rename from docker/ingress-kafka-properties rename to docker/ingress-bundles-kafka-properties diff --git a/justfile b/justfile index 8dbeaf5..ebac294 100644 --- a/justfile +++ b/justfile @@ -16,9 +16,6 @@ fix: # UI cd ui && npx biome check --write --unsafe -create-migration name: - touch crates/datastore/migrations/$(date +%s)_{{ name }}.sql - sync: deps-reset ### ENV ### just sync-env @@ -84,4 +81,27 @@ ingress-writer: cargo run --bin tips-ingress-writer ui: - cd ui && yarn dev \ No newline at end of file + cd ui && yarn dev + +sequencer_url := "http://localhost:8547" +builder_url := "http://localhost:2222" +ingress_url := "http://localhost:8080" + +get-blocks: + echo "Sequencer" + cast bn -r {{ sequencer_url }} + echo "Builder" + cast bn -r {{ builder_url }} + +sender := "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266" +sender_key := "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" + +send-txn: + #!/usr/bin/env bash + set -euxo pipefail + echo "sending txn" + nonce=$(cast nonce {{ sender }} -r {{ builder_url }}) + txn=$(cast mktx --private-key {{ sender_key }} 0x0000000000000000000000000000000000000000 --value 0.01ether --nonce $nonce --chain-id 13 -r {{ builder_url }}) + hash=$(curl -s {{ ingress_url }} -X POST -H "Content-Type: application/json" --data "{\"method\":\"eth_sendRawTransaction\",\"params\":[\"$txn\"],\"id\":1,\"jsonrpc\":\"2.0\"}" | jq -r ".result") + cast receipt $hash -r {{ sequencer_url }} | grep status + cast receipt $hash -r {{ builder_url }} | grep status \ No newline at end of file diff --git a/ui/src/app/bundles/[uuid]/page.tsx b/ui/src/app/bundles/[uuid]/page.tsx index 38b4286..83e1df2 100644 --- a/ui/src/app/bundles/[uuid]/page.tsx +++ b/ui/src/app/bundles/[uuid]/page.tsx @@ -7,40 +7,6 @@ interface PageProps { params: Promise<{ uuid: string }>; } -function formatEventType(eventType: string): string { - switch (eventType) { - case "ReceivedBundle": - return "Bundle Received"; - case "CancelledBundle": - return "Bundle Cancelled"; - case "BuilderMined": - return "Builder Mined"; - case "FlashblockInclusion": - return "Flashblock Inclusion"; - case "BlockInclusion": - return "Block Inclusion"; - default: - return eventType; - } -} - -function getEventStatus(eventType: string): { color: string; bgColor: string } { - switch (eventType) { - case "ReceivedBundle": - return { color: "text-blue-600", bgColor: "bg-blue-100" }; - case "CancelledBundle": - return { color: "text-red-600", bgColor: "bg-red-100" }; - case "BuilderMined": - return { color: "text-yellow-600", bgColor: "bg-yellow-100" }; - case "FlashblockInclusion": - return { color: "text-purple-600", bgColor: "bg-purple-100" }; - case "BlockInclusion": - return { color: "text-green-600", bgColor: "bg-green-100" }; - default: - return { color: "text-gray-600", bgColor: "bg-gray-100" }; - } -} - export default function BundlePage({ params }: PageProps) { const [uuid, setUuid] = useState(""); const [data, setData] = useState(null); @@ -111,23 +77,19 @@ export default function BundlePage({ params }: PageProps) { {data && (
{(() => { - const allTransactions = new Set(); + const transactions = new Set(); data.history.forEach((event) => { - if (event.event === "Created") { - event.data?.bundle?.revertingTxHashes?.forEach((tx) => { - allTransactions.add(tx); - }); - } + event.data?.bundle?.revertingTxHashes?.forEach((tx) => { + transactions.add(tx); + }); }); - const uniqueTransactions = Array.from(allTransactions.values()); - - return uniqueTransactions.length > 0 ? ( + return transactions.size > 0 ? (

Transactions

    - {uniqueTransactions.map((tx) => ( + {Array.from(transactions).map((tx) => (
  • {tx}
  • ))}
@@ -141,7 +103,6 @@ export default function BundlePage({ params }: PageProps) { {data.history.length > 0 ? (
{data.history.map((event, index) => { - const { color, bgColor } = getEventStatus(event.event); return (
- {formatEventType(event.event)} + {event.event} {event.data?.timestamp