diff --git a/.github/workflows/checks.yaml b/.github/workflows/checks.yaml index 35dee21f9..9df920ad6 100644 --- a/.github/workflows/checks.yaml +++ b/.github/workflows/checks.yaml @@ -36,15 +36,18 @@ jobs: # https://github.com/swatinem/rust-cache - name: Run Swatinem/rust-cache@v2 + if: ${{ !env.ACT }} uses: Swatinem/rust-cache@v2 with: cache-on-failure: true # https://github.com/Mozilla-Actions/sccache-action - name: Run sccache-action + if: ${{ !env.ACT }} uses: mozilla-actions/sccache-action@v0.0.9 - name: Set sccache env vars + if: ${{ !env.ACT }} run: | echo "SCCACHE_GHA_ENABLED=true" >> $GITHUB_ENV echo "RUSTC_WRAPPER=sccache" >> $GITHUB_ENV @@ -55,7 +58,18 @@ jobs: version: nightly - name: Install native dependencies - run: sudo apt-get update && sudo apt-get install -y libsqlite3-dev + run: sudo apt-get update && sudo apt-get install -y libsqlite3-dev clang libclang-dev llvm build-essential pkg-config + + - name: Compile tester + run: make tester + + - name: Compile op-rbuilder + run: cargo build -p op-rbuilder --bin op-rbuilder + + - name: Download op-reth + run: | + ./scripts/ci/download-op-reth.sh + echo "$(pwd)" >> $GITHUB_PATH - name: Lint run: make lint @@ -84,15 +98,18 @@ jobs: # https://github.com/swatinem/rust-cache - name: Run Swatinem/rust-cache@v2 + if: ${{ !env.ACT }} uses: Swatinem/rust-cache@v2 with: cache-on-failure: true # https://github.com/Mozilla-Actions/sccache-action - name: Run sccache-action + if: ${{ !env.ACT }} uses: mozilla-actions/sccache-action@v0.0.9 - name: Set sccache env vars + if: ${{ !env.ACT }} run: | echo "SCCACHE_GHA_ENABLED=true" >> $GITHUB_ENV echo "RUSTC_WRAPPER=sccache" >> $GITHUB_ENV @@ -102,20 +119,27 @@ jobs: ./scripts/ci/download-op-reth.sh echo "$(pwd)" >> $GITHUB_PATH + - name: Install native dependencies + if: ${{ env.ACT }} + run: sudo apt-get update && sudo apt-get install -y libsqlite3-dev clang libclang-dev llvm build-essential pkg-config + + - name: Compile tester + run: make tester + - name: Build the rbuilder run: cargo build -p op-rbuilder --bin op-rbuilder - name: Generate test genesis file - run: cargo run -p op-rbuilder --bin tester -- genesis --output genesis.json + run: cargo run -p op-rbuilder --bin tester --features=testing -- genesis --output genesis.json - name: Run integration tests - run: cargo test --package op-rbuilder --lib --features integration -- integration::integration_test::tests + run: cargo test --package op-rbuilder --lib - name: Build flashblocks rbuilder run: cargo build -p op-rbuilder --bin op-rbuilder --features flashblocks - name: Run flashblocks builder integration tests - run: cargo test --package op-rbuilder --lib --features integration,flashblocks -- integration::integration_test::tests + run: cargo test --package op-rbuilder --lib --features flashblocks - name: Aggregate playground logs # This steps fails if the test fails early and the playground logs dir has not been created diff --git a/Cargo.lock b/Cargo.lock index 0e432bd6b..6096a5953 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5342,6 +5342,7 @@ dependencies = [ "clap_builder", "derive_more", "eyre", + "futures", "futures-util", "jsonrpsee", "metrics", diff --git a/Makefile b/Makefile index 5303afb77..8fd76b92f 100644 --- a/Makefile +++ b/Makefile @@ -27,6 +27,14 @@ clean: ## Clean up build: ## Build (debug version) cargo build --features "$(FEATURES)" +.PHONY: op-rbuilder +op-rbuilder: ## Build op-rbuilder (debug version) + cargo build -p op-rbuilder --bin op-rbuilder --features "$(FEATURES)" + +.PHONY: tester +tester: ## Build tester (debug version) + cargo build -p op-rbuilder --bin tester --features "testing,$(FEATURES)" + .PHONY: docker-image-rbuilder docker-image-rbuilder: ## Build a rbuilder Docker image docker build --platform linux/amd64 --target rbuilder-runtime --build-arg FEATURES="$(FEATURES)" . -t rbuilder diff --git a/README.md b/README.md index c0d1f1230..b44218079 100644 --- a/README.md +++ b/README.md @@ -145,4 +145,18 @@ This will automatically try to detect all settings and ports from the currently ``` rm -rf ~/.local/share/reth sudo rm -rf ~/.playground -``` \ No newline at end of file +``` + +## Running GitHub actions locally + +To verify that CI will allow your PR to be merged before sending it please make sure that our GitHub `checks.yaml` action passes locall by calling: + +``` +act -W .github/workflows/checks.yaml +``` + +More instructions on installing and configuring `act` can be found on [their website](https://nektosact.com). + +### Known issues +- Running actions locally require a Github Token. You can generate one by following instructions on [Github Docs](https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens). After generating a token you will need to pass it to `act` either through the command line using `-s GITHUB_TOKEN=` or by adding it to the `~/.config/act/actrc` file. +- You might get an error about missing or incompatible `warp-ubuntu-latest-x64-32x` platform. This can be mitigated by adding `-P warp-ubuntu-latest-x64-32x=ghcr.io/catthehacker/ubuntu:act-latest` on the command line when calling `act` or appending this flag to `~/.config/act/actrc` \ No newline at end of file diff --git a/crates/op-rbuilder/Cargo.toml b/crates/op-rbuilder/Cargo.toml index 862f2a66c..109551f66 100644 --- a/crates/op-rbuilder/Cargo.toml +++ b/crates/op-rbuilder/Cargo.toml @@ -67,9 +67,7 @@ revm.workspace = true op-revm.workspace = true tracing.workspace = true -futures-util = "0.3.31" eyre.workspace = true -tower = "0.4" serde_with.workspace = true serde.workspace = true secp256k1.workspace = true @@ -86,6 +84,9 @@ thiserror.workspace = true parking_lot.workspace = true url.workspace = true +tower = "0.4" +futures = "0.3" +futures-util = "0.3.31" time = { version = "0.3.36", features = ["macros", "formatting", "parsing"] } chrono = "0.4" uuid = { version = "1.6.1", features = ["serde", "v5", "v4"] } @@ -127,7 +128,7 @@ min-info-logs = ["tracing/release_max_level_info"] min-debug-logs = ["tracing/release_max_level_debug"] min-trace-logs = ["tracing/release_max_level_trace"] -integration = [] +testing = [] flashblocks = [] [[bin]] @@ -136,4 +137,4 @@ path = "src/main.rs" [[bin]] name = "tester" -path = "src/tester/main.rs" +required-features = ["testing"] diff --git a/crates/op-rbuilder/src/tester/main.rs b/crates/op-rbuilder/src/bin/tester/main.rs similarity index 69% rename from crates/op-rbuilder/src/tester/main.rs rename to crates/op-rbuilder/src/bin/tester/main.rs index c1041d986..b86138087 100644 --- a/crates/op-rbuilder/src/tester/main.rs +++ b/crates/op-rbuilder/src/bin/tester/main.rs @@ -1,6 +1,6 @@ use alloy_primitives::Address; use clap::Parser; -use op_rbuilder::tester::*; +use op_rbuilder::tests::*; /// CLI Commands #[derive(Parser, Debug)] @@ -77,3 +77,38 @@ async fn main() -> eyre::Result<()> { } } } + +#[allow(dead_code)] +pub async fn run_system( + validation: bool, + no_tx_pool: bool, + block_time_secs: u64, + flashblocks_endpoint: Option, + no_sleep: bool, +) -> eyre::Result<()> { + println!("Validation: {validation}"); + + let engine_api = EngineApi::new("http://localhost:4444").unwrap(); + let validation_api = if validation { + Some(EngineApi::new("http://localhost:5555").unwrap()) + } else { + None + }; + + let mut generator = BlockGenerator::new( + engine_api, + validation_api, + no_tx_pool, + block_time_secs, + flashblocks_endpoint, + ); + + generator.init().await?; + + // Infinite loop generating blocks + loop { + println!("Generating new block..."); + let block_hash = generator.submit_payload(None, 0, no_sleep).await?; + println!("Generated block: {block_hash}"); + } +} diff --git a/crates/op-rbuilder/src/generator.rs b/crates/op-rbuilder/src/generator.rs index 0fa83e4d7..586aa313a 100644 --- a/crates/op-rbuilder/src/generator.rs +++ b/crates/op-rbuilder/src/generator.rs @@ -436,7 +436,7 @@ mod tests { use super::*; use alloy_eips::eip7685::Requests; use alloy_primitives::U256; - use rand::thread_rng; + use rand::rng; use reth::tasks::TokioTaskExecutor; use reth_chain_state::ExecutedBlockWithTrieUpdates; use reth_node_api::NodePrimitives; @@ -624,7 +624,7 @@ mod tests { #[tokio::test] async fn test_payload_generator() -> eyre::Result<()> { - let mut rng = thread_rng(); + let mut rng = rng(); let client = MockEthProvider::default(); let executor = TokioTaskExecutor::default(); diff --git a/crates/op-rbuilder/src/integration/integration_test.rs b/crates/op-rbuilder/src/integration/integration_test.rs deleted file mode 100644 index 1a251d23b..000000000 --- a/crates/op-rbuilder/src/integration/integration_test.rs +++ /dev/null @@ -1,719 +0,0 @@ -#[cfg(all(test, feature = "integration"))] -mod tests { - use crate::{ - integration::{ - op_rbuilder::OpRbuilderConfig, op_reth::OpRethConfig, IntegrationFramework, - TestHarness, TestHarnessBuilder, - }, - tester::{BlockGenerator, EngineApi}, - tx_signer::Signer, - }; - use alloy_consensus::{Transaction, TxEip1559}; - use alloy_eips::{eip1559::MIN_PROTOCOL_BASE_FEE, eip2718::Encodable2718}; - use alloy_primitives::hex; - use alloy_provider::{Identity, Provider, ProviderBuilder}; - use alloy_rpc_types_eth::BlockTransactionsKind; - use futures_util::StreamExt; - use op_alloy_consensus::OpTypedTransaction; - use op_alloy_network::Optimism; - use std::{ - cmp::max, - path::PathBuf, - sync::{Arc, Mutex}, - time::Duration, - }; - use tokio_tungstenite::connect_async; - use uuid::Uuid; - - const BUILDER_PRIVATE_KEY: &str = - "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d"; - - #[tokio::test] - #[cfg(not(feature = "flashblocks"))] - async fn integration_test_chain_produces_blocks() -> eyre::Result<()> { - // This is a simple test using the integration framework to test that the chain - // produces blocks. - let mut framework = - IntegrationFramework::new("integration_test_chain_produces_blocks").unwrap(); - - // we are going to use a genesis file pre-generated before the test - let mut genesis_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - genesis_path.push("../../genesis.json"); - assert!(genesis_path.exists()); - - // create the builder - let builder_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); - let op_rbuilder_config = OpRbuilderConfig::new() - .chain_config_path(genesis_path.clone()) - .data_dir(builder_data_dir) - .auth_rpc_port(1234) - .network_port(1235) - .http_port(1238) - .with_builder_private_key(BUILDER_PRIVATE_KEY); - - // create the validation reth node - let reth_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); - let reth = OpRethConfig::new() - .chain_config_path(genesis_path) - .data_dir(reth_data_dir) - .auth_rpc_port(1236) - .network_port(1237); - - framework.start("op-reth", &reth).await.unwrap(); - - let op_rbuilder = framework - .start("op-rbuilder", &op_rbuilder_config) - .await - .unwrap(); - - let engine_api = EngineApi::new("http://localhost:1234").unwrap(); - let validation_api = EngineApi::new("http://localhost:1236").unwrap(); - - let mut generator = BlockGenerator::new(engine_api, Some(validation_api), false, 1, None); - generator.init().await?; - - let provider = ProviderBuilder::::default() - .on_http("http://localhost:1238".parse()?); - - for _ in 0..10 { - let block_hash = generator.generate_block().await?; - - // query the block and the transactions inside the block - let block = provider - .get_block_by_hash(block_hash) - .await? - .expect("block"); - - for hash in block.transactions.hashes() { - let _ = provider - .get_transaction_receipt(hash) - .await? - .expect("receipt"); - } - } - - Ok(()) - } - - #[tokio::test] - #[cfg(not(feature = "flashblocks"))] - async fn integration_test_monitor_transaction_drops() -> eyre::Result<()> { - // This test ensures that the transactions that get reverted an not included in the block - // are emitted as a log on the builder. - let harness = TestHarnessBuilder::new("integration_test_monitor_transaction_drops") - .with_revert_protection() - .build() - .await?; - - let mut generator = harness.block_generator().await?; - - // send 10 reverting transactions - let mut pending_txn = Vec::new(); - for _ in 0..10 { - let txn = harness.send_revert_transaction().await?; - pending_txn.push(txn); - } - - // generate 10 blocks - for _ in 0..10 { - let block_hash = generator.generate_block().await?; - - // query the block and the transactions inside the block - let block = harness - .provider()? - .get_block_by_hash(block_hash) - .await? - .expect("block"); - - // blocks should only include two transactions (deposit + builder) - assert_eq!(block.transactions.len(), 2); - } - - // check that the builder emitted logs for the reverted transactions - // with the monitoring logic - // TODO: this is not ideal, lets find a different way to detect this - // Each time a transaction is dropped, it emits a log like this - // 'Transaction event received target="monitoring" tx_hash="" kind="discarded"' - let builder_logs = std::fs::read_to_string(harness.builder_log_path)?; - - for txn in pending_txn { - let txn_log = format!( - "Transaction event received target=\"monitoring\" tx_hash=\"{}\" kind=\"discarded\"", - txn.tx_hash() - ); - - assert!(builder_logs.contains(txn_log.as_str())); - } - - Ok(()) - } - - #[tokio::test] - #[cfg(not(feature = "flashblocks"))] - async fn integration_test_revert_protection_disabled() -> eyre::Result<()> { - let harness = TestHarnessBuilder::new("integration_test_revert_protection_disabled") - .build() - .await?; - - let mut generator = harness.block_generator().await?; - - let txn1 = harness.send_valid_transaction().await?; - let txn2 = harness.send_revert_transaction().await?; - let pending_txn = vec![txn1, txn2]; - - let block_hash = generator.generate_block().await?; - - // the transactions should be included in the block now - let pending_txn = { - let mut transaction_hashes = Vec::new(); - for txn in pending_txn { - let txn_hash = txn.with_timeout(None).watch().await?; - transaction_hashes.push(txn_hash); - } - transaction_hashes - }; - - // validate that all the transaction hashes are included in the block - let provider = harness.provider()?; - let block = provider - .get_block_by_hash(block_hash) - .await? - .expect("block"); - - for txn in pending_txn { - assert!(block.transactions.hashes().any(|hash| hash == txn)); - } - - Ok(()) - } - - #[tokio::test] - #[cfg(not(feature = "flashblocks"))] - async fn integration_test_revert_protection() -> eyre::Result<()> { - // This is a simple test using the integration framework to test that the chain - // produces blocks. - let mut framework = - IntegrationFramework::new("integration_test_revert_protection").unwrap(); - - // we are going to use a genesis file pre-generated before the test - let mut genesis_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - genesis_path.push("../../genesis.json"); - assert!(genesis_path.exists()); - - // create the builder - let builder_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); - let op_rbuilder_config = OpRbuilderConfig::new() - .chain_config_path(genesis_path.clone()) - .data_dir(builder_data_dir) - .auth_rpc_port(1244) - .network_port(1245) - .http_port(1248) - .with_builder_private_key(BUILDER_PRIVATE_KEY) - .with_revert_protection(true); - - // create the validation reth node - let reth_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); - let reth = OpRethConfig::new() - .chain_config_path(genesis_path) - .data_dir(reth_data_dir) - .auth_rpc_port(1246) - .network_port(1247); - - framework.start("op-reth", &reth).await.unwrap(); - - let _ = framework - .start("op-rbuilder", &op_rbuilder_config) - .await - .unwrap(); - - let engine_api = EngineApi::new("http://localhost:1244").unwrap(); - let validation_api = EngineApi::new("http://localhost:1246").unwrap(); - - let mut generator = BlockGenerator::new(engine_api, Some(validation_api), false, 1, None); - let latest_block = generator.init().await?; - - let provider = ProviderBuilder::::default() - .on_http("http://localhost:1248".parse()?); - - let mut base_fee = max( - latest_block.header.base_fee_per_gas.unwrap(), - MIN_PROTOCOL_BASE_FEE, - ); - for _ in 0..10 { - // Get builder's address - let known_wallet = Signer::try_from_secret(BUILDER_PRIVATE_KEY.parse()?)?; - let builder_address = known_wallet.address; - // Get current nonce from chain - let nonce = provider.get_transaction_count(builder_address).await?; - // Transaction from builder should succeed - let tx_request = OpTypedTransaction::Eip1559(TxEip1559 { - chain_id: 901, - nonce, - gas_limit: 210000, - max_fee_per_gas: base_fee.into(), - ..Default::default() - }); - let signed_tx = known_wallet.sign_tx(tx_request)?; - let known_tx = provider - .send_raw_transaction(signed_tx.encoded_2718().as_slice()) - .await?; - - // Create a reverting transaction - let tx_request = OpTypedTransaction::Eip1559(TxEip1559 { - chain_id: 901, - nonce: nonce + 1, - gas_limit: 300000, - max_fee_per_gas: base_fee.into(), - input: hex!("60006000fd").into(), // PUSH1 0x00 PUSH1 0x00 REVERT - ..Default::default() - }); - let signed_tx = known_wallet.sign_tx(tx_request)?; - let reverting_tx = provider - .send_raw_transaction(signed_tx.encoded_2718().as_slice()) - .await?; - - let block_hash = generator.generate_block().await?; - - // query the block and the transactions inside the block - let block = provider - .get_block_by_hash(block_hash) - .await? - .expect("block"); - - // Verify known transaction is included - assert!( - block - .transactions - .hashes() - .any(|hash| hash == *known_tx.tx_hash()), - "successful transaction missing from block" - ); - - // Verify reverted transaction is NOT included - assert!( - !block - .transactions - .hashes() - .any(|hash| hash == *reverting_tx.tx_hash()), - "reverted transaction unexpectedly included in block" - ); - for hash in block.transactions.hashes() { - let receipt = provider - .get_transaction_receipt(hash) - .await? - .expect("receipt"); - let success = receipt.inner.inner.status(); - assert!(success); - } - base_fee = max( - block.header.base_fee_per_gas.unwrap(), - MIN_PROTOCOL_BASE_FEE, - ); - } - - Ok(()) - } - - #[tokio::test] - #[cfg(not(feature = "flashblocks"))] - async fn integration_test_fee_priority_ordering() -> eyre::Result<()> { - // This test validates that transactions are ordered by fee priority in blocks - let mut framework = - IntegrationFramework::new("integration_test_fee_priority_ordering").unwrap(); - - // we are going to use a genesis file pre-generated before the test - let mut genesis_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - genesis_path.push("../../genesis.json"); - assert!(genesis_path.exists()); - - // create the builder - let builder_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); - let op_rbuilder_config = OpRbuilderConfig::new() - .chain_config_path(genesis_path.clone()) - .data_dir(builder_data_dir) - .auth_rpc_port(1264) - .network_port(1265) - .http_port(1268) - .with_builder_private_key(BUILDER_PRIVATE_KEY); - - // create the validation reth node - let reth_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); - let reth = OpRethConfig::new() - .chain_config_path(genesis_path) - .data_dir(reth_data_dir) - .auth_rpc_port(1266) - .network_port(1267); - - framework.start("op-reth", &reth).await.unwrap(); - - let _ = framework - .start("op-rbuilder", &op_rbuilder_config) - .await - .unwrap(); - - let engine_api = EngineApi::new("http://localhost:1264").unwrap(); - let validation_api = EngineApi::new("http://localhost:1266").unwrap(); - - let mut generator = BlockGenerator::new(engine_api, Some(validation_api), false, 1, None); - let latest_block = generator.init().await?; - - let provider = ProviderBuilder::::default() - .on_http("http://localhost:1268".parse()?); - - let base_fee = max( - latest_block.header.base_fee_per_gas.unwrap(), - MIN_PROTOCOL_BASE_FEE, - ); - - // Create transactions with increasing fee values - let priority_fees: [u128; 5] = [1, 3, 5, 2, 4]; // Deliberately not in order - let signers = vec![ - Signer::random(), - Signer::random(), - Signer::random(), - Signer::random(), - Signer::random(), - ]; - let mut txs = Vec::new(); - - // Fund test accounts with deposits - for signer in &signers { - generator - .deposit(signer.address, 1000000000000000000) - .await?; - } - - // Send transactions in non-optimal fee order - for (i, priority_fee) in priority_fees.iter().enumerate() { - let tx_request = OpTypedTransaction::Eip1559(TxEip1559 { - chain_id: 901, - nonce: 1, - gas_limit: 210000, - max_fee_per_gas: base_fee as u128 + *priority_fee, - max_priority_fee_per_gas: *priority_fee, - ..Default::default() - }); - let signed_tx = signers[i].sign_tx(tx_request)?; - let tx = provider - .send_raw_transaction(signed_tx.encoded_2718().as_slice()) - .await?; - txs.push(tx); - } - - // Generate a block that should include these transactions - let block_hash = generator.generate_block().await?; - - // Query the block and check transaction ordering - let block = provider - .get_block_by_hash(block_hash) - .full() - .await? - .expect("block"); - - // Verify all transactions are included - for tx in &txs { - assert!( - block - .transactions - .hashes() - .any(|hash| hash == *tx.tx_hash()), - "transaction missing from block" - ); - } - - let tx_fees: Vec<_> = block - .transactions - .into_transactions() - .map(|tx| tx.effective_tip_per_gas(base_fee.into())) - .collect(); - - // Verify transactions are ordered by decreasing fee (highest fee first) - // Skip the first deposit transaction and last builder transaction - for i in 1..tx_fees.len() - 2 { - assert!( - tx_fees[i] >= tx_fees[i + 1], - "Transactions not ordered by decreasing fee: {:?}", - tx_fees - ); - } - - Ok(()) - } - - #[tokio::test] - #[cfg(not(feature = "flashblocks"))] - async fn integration_test_get_payload_close_to_fcu() -> eyre::Result<()> { - let test_harness = TestHarnessBuilder::new("integration_test_get_payload_close_to_fcu") - .build() - .await?; - let mut block_generator = test_harness.block_generator().await?; - - // add some transactions to the pool so that the builder is busy when we send the fcu/getPayload requests - for _ in 0..10 { - // Note, for this test it is okay if they are not valid - let _ = test_harness.send_valid_transaction().await?; - } - - // TODO: In the fail case scenario, this hangs forever, but it should return an error - // Figure out how to do timeout (i.e. 1s) on the engine api. - block_generator.submit_payload(None, 0, true).await?; - - Ok(()) - } - - #[tokio::test] - #[cfg(not(feature = "flashblocks"))] - async fn integration_test_transaction_flood_no_sleep() -> eyre::Result<()> { - // This test validates that if we flood the builder with many transactions - // and we request short block times, the builder can still eventually resolve all the transactions - - let test_harness = TestHarnessBuilder::new("integration_test_transaction_flood_no_sleep") - .build() - .await?; - - let mut block_generator = test_harness.block_generator().await?; - let provider = test_harness.provider()?; - - // Send 200 valid transactions to the builder - // More than this and there is an issue with the RPC endpoint not being able to handle the load - let mut transactions = vec![]; - for _ in 0..200 { - let tx = test_harness.send_valid_transaction().await?; - let tx_hash = *tx.tx_hash(); - transactions.push(tx_hash); - } - - // After a 10 blocks all the transactions should be included in a block - for _ in 0..10 { - block_generator.submit_payload(None, 0, true).await.unwrap(); - } - - for tx in transactions { - provider.get_transaction_receipt(tx).await?; - } - - Ok(()) - } - - #[tokio::test] - #[cfg(feature = "flashblocks")] - async fn integration_test_chain_produces_blocks() -> eyre::Result<()> { - // This is a simple test using the integration framework to test that the chain - // produces blocks. - let mut framework = - IntegrationFramework::new("integration_test_chain_produces_blocks").unwrap(); - - // we are going to use a genesis file pre-generated before the test - let mut genesis_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - genesis_path.push("../../genesis.json"); - assert!(genesis_path.exists()); - - // create the builder - let builder_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); - let op_rbuilder_config = OpRbuilderConfig::new() - .chain_config_path(genesis_path.clone()) - .data_dir(builder_data_dir) - .auth_rpc_port(1234) - .network_port(1235) - .http_port(1238) - .with_builder_private_key(BUILDER_PRIVATE_KEY) - .with_flashblocks_ws_url("localhost:1239") - .with_chain_block_time(2000) - .with_flashbots_block_time(200); - - // create the validation reth node - let reth_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); - let reth = OpRethConfig::new() - .chain_config_path(genesis_path) - .data_dir(reth_data_dir) - .auth_rpc_port(1236) - .network_port(1237); - - framework.start("op-reth", &reth).await.unwrap(); - - let op_rbuilder = framework - .start("op-rbuilder", &op_rbuilder_config) - .await - .unwrap(); - - // Create a struct to hold received messages - let received_messages = Arc::new(Mutex::new(Vec::new())); - let messages_clone = received_messages.clone(); - - // Spawn WebSocket listener task - let ws_handle = tokio::spawn(async move { - let (ws_stream, _) = connect_async("ws://localhost:1239").await?; - let (_, mut read) = ws_stream.split(); - - while let Some(Ok(msg)) = read.next().await { - if let Ok(text) = msg.into_text() { - messages_clone.lock().unwrap().push(text); - } - } - Ok::<_, eyre::Error>(()) - }); - - let engine_api = EngineApi::new("http://localhost:1234").unwrap(); - let validation_api = EngineApi::new("http://localhost:1236").unwrap(); - - let mut generator = BlockGenerator::new(engine_api, Some(validation_api), false, 2, None); - generator.init().await?; - - let provider = ProviderBuilder::::default() - .on_http("http://localhost:1238".parse()?); - - for _ in 0..10 { - let block_hash = generator.generate_block().await?; - - // query the block and the transactions inside the block - let block = provider - .get_block_by_hash(block_hash) - .await? - .expect("block"); - - for hash in block.transactions.hashes() { - let _ = provider - .get_transaction_receipt(hash) - .await? - .expect("receipt"); - } - } - - // check there's 10 flashblocks log lines (2000ms / 200ms) - op_rbuilder.find_log_line("Building flashblock 9").await?; - - // Process websocket messages - let timeout_duration = Duration::from_secs(10); - tokio::time::timeout(timeout_duration, async { - let mut message_count = 0; - loop { - if message_count >= 10 { - break; - } - let messages = received_messages.lock().unwrap(); - let messages_json: Vec = messages - .iter() - .map(|msg| serde_json::from_str(msg).unwrap()) - .collect(); - for msg in messages_json.iter() { - let metadata = msg.get("metadata"); - assert!(metadata.is_some(), "metadata field missing"); - let metadata = metadata.unwrap(); - assert!( - metadata.get("block_number").is_some(), - "block_number missing" - ); - assert!( - metadata.get("new_account_balances").is_some(), - "new_account_balances missing" - ); - assert!(metadata.get("receipts").is_some(), "receipts missing"); - // also check if the length of the receipts is the same as the number of transactions - assert!( - metadata.get("receipts").unwrap().as_object().unwrap().len() - == msg - .get("diff") - .unwrap() - .get("transactions") - .unwrap() - .as_array() - .unwrap() - .len(), - "receipts length mismatch" - ); - message_count += 1; - } - drop(messages); - tokio::time::sleep(Duration::from_millis(100)).await; - } - }) - .await?; - ws_handle.abort(); - - Ok(()) - } - - #[tokio::test] - #[cfg(feature = "flashblocks")] - async fn integration_test_flashblocks_respects_gas_limit() -> eyre::Result<()> { - // This is a simple test using the integration framework to test that the chain - // produces blocks. - let mut framework = - IntegrationFramework::new("integration_test_flashblocks_respects_gas_limit").unwrap(); - - // we are going to use a genesis file pre-generated before the test - let mut genesis_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - genesis_path.push("../../genesis.json"); - assert!(genesis_path.exists()); - - let block_time_ms = 1000; - let flashblock_time_ms = 100; - - // create the builder - let builder_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); - let op_rbuilder_config = OpRbuilderConfig::new() - .chain_config_path(genesis_path.clone()) - .data_dir(builder_data_dir) - .auth_rpc_port(1244) - .network_port(1245) - .http_port(1248) - .with_builder_private_key(BUILDER_PRIVATE_KEY) - .with_flashblocks_ws_url("localhost:1249") - .with_chain_block_time(block_time_ms) - .with_flashbots_block_time(flashblock_time_ms); - - // create the validation reth node - let reth_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); - let reth = OpRethConfig::new() - .chain_config_path(genesis_path) - .data_dir(reth_data_dir) - .auth_rpc_port(1246) - .network_port(1247); - - framework.start("op-reth", &reth).await.unwrap(); - - let op_rbuilder = framework - .start("op-rbuilder", &op_rbuilder_config) - .await - .unwrap(); - - let engine_api = EngineApi::new("http://localhost:1244").unwrap(); - let validation_api = EngineApi::new("http://localhost:1246").unwrap(); - - let mut generator = BlockGenerator::new( - engine_api, - Some(validation_api), - false, - block_time_ms / 1000, - None, - ); - generator.init().await?; - - let provider = ProviderBuilder::::default() - .on_http("http://localhost:1248".parse()?); - - // Delay the payload building by 4s, ensure that the correct number of flashblocks are built - let block_hash = generator.generate_block_with_delay(4).await?; - - // query the block and the transactions inside the block - let block = provider - .get_block_by_hash(block_hash) - .await? - .expect("block"); - - for hash in block.transactions.hashes() { - let _ = provider - .get_transaction_receipt(hash) - .await? - .expect("receipt"); - } - - // check there's no more than 10 flashblocks log lines (2000ms / 200ms) - op_rbuilder.find_log_line("Building flashblock 9").await?; - op_rbuilder - .find_log_line("Skipping flashblock reached target=10 idx=10") - .await?; - - Ok(()) - } -} diff --git a/crates/op-rbuilder/src/integration/mod.rs b/crates/op-rbuilder/src/integration/mod.rs deleted file mode 100644 index 14ce32704..000000000 --- a/crates/op-rbuilder/src/integration/mod.rs +++ /dev/null @@ -1,418 +0,0 @@ -use alloy_consensus::TxEip1559; -use alloy_eips::{eip1559::MIN_PROTOCOL_BASE_FEE, eip2718::Encodable2718, BlockNumberOrTag}; -use alloy_primitives::hex; -use alloy_provider::{ - Identity, PendingTransactionBuilder, Provider, ProviderBuilder, RootProvider, -}; -use op_alloy_consensus::OpTypedTransaction; -use op_alloy_network::Optimism; -use op_rbuilder::OpRbuilderConfig; -use op_reth::OpRethConfig; -use parking_lot::Mutex; -use std::{ - cmp::max, - collections::HashSet, - fs::{File, OpenOptions}, - future::Future, - io, - io::prelude::*, - net::TcpListener, - path::{Path, PathBuf}, - process::{Child, Command}, - sync::LazyLock, - time::{Duration, SystemTime}, -}; -use time::{format_description, OffsetDateTime}; -use tokio::time::sleep; -use uuid::Uuid; - -use crate::{ - tester::{BlockGenerator, EngineApi}, - tx_signer::Signer, -}; - -/// Default JWT token for testing purposes -pub const DEFAULT_JWT_TOKEN: &str = - "688f5d737bad920bdfb2fc2f488d6b6209eebda1dae949a8de91398d932c517a"; - -mod integration_test; -pub mod op_rbuilder; -pub mod op_reth; - -#[derive(Debug)] -pub enum IntegrationError { - SpawnError, - BinaryNotFound, - SetupError, - LogError, - ServiceAlreadyRunning, -} - -pub struct ServiceInstance { - process: Option, - pub log_path: PathBuf, -} - -pub struct IntegrationFramework { - test_dir: PathBuf, - services: Vec, -} - -pub trait Service { - /// Configure and return the command to run the service - fn command(&self) -> Command; - - /// Return a future that resolves when the service is ready - fn ready(&self, log_path: &Path) -> impl Future> + Send; -} - -/// Helper function to poll logs periodically -pub async fn poll_logs( - log_path: &Path, - pattern: &str, - interval: Duration, - timeout: Duration, -) -> Result<(), IntegrationError> { - let start = std::time::Instant::now(); - - loop { - if start.elapsed() > timeout { - return Err(IntegrationError::SpawnError); - } - - let mut file = File::open(log_path).map_err(|_| IntegrationError::LogError)?; - let mut contents = String::new(); - file.read_to_string(&mut contents) - .map_err(|_| IntegrationError::LogError)?; - - if contents.contains(pattern) { - return Ok(()); - } - - sleep(interval).await; - } -} - -impl ServiceInstance { - pub fn new(name: String, test_dir: PathBuf) -> Self { - let log_path = test_dir.join(format!("{name}.log")); - Self { - process: None, - log_path, - } - } - - pub fn start(&mut self, command: Command) -> Result<(), IntegrationError> { - if self.process.is_some() { - return Err(IntegrationError::ServiceAlreadyRunning); - } - - let log = open_log_file(&self.log_path)?; - let stdout = log.try_clone().map_err(|_| IntegrationError::LogError)?; - let stderr = log.try_clone().map_err(|_| IntegrationError::LogError)?; - - let mut cmd = command; - cmd.stdout(stdout).stderr(stderr); - - let child = match cmd.spawn() { - Ok(child) => Ok(child), - Err(e) => match e.kind() { - io::ErrorKind::NotFound => Err(IntegrationError::BinaryNotFound), - _ => Err(IntegrationError::SpawnError), - }, - }?; - - self.process = Some(child); - Ok(()) - } - - pub fn stop(&mut self) -> Result<(), IntegrationError> { - if let Some(mut process) = self.process.take() { - process.kill().map_err(|_| IntegrationError::SpawnError)?; - } - Ok(()) - } - - /// Start a service using its configuration and wait for it to be ready - pub async fn start_with_config( - &mut self, - config: &T, - ) -> Result<(), IntegrationError> { - self.start(config.command())?; - config.ready(&self.log_path).await?; - Ok(()) - } - - pub async fn find_log_line(&self, pattern: &str) -> eyre::Result<()> { - let mut file = - File::open(&self.log_path).map_err(|_| eyre::eyre!("Failed to open log file"))?; - let mut contents = String::new(); - file.read_to_string(&mut contents) - .map_err(|_| eyre::eyre!("Failed to read log file"))?; - - if contents.contains(pattern) { - Ok(()) - } else { - Err(eyre::eyre!("Pattern not found in log file: {}", pattern)) - } - } -} - -impl IntegrationFramework { - pub fn new(test_name: &str) -> Result { - let dt: OffsetDateTime = SystemTime::now().into(); - let format = format_description::parse("[year]_[month]_[day]_[hour]_[minute]_[second]") - .map_err(|_| IntegrationError::SetupError)?; - - let date_format = dt - .format(&format) - .map_err(|_| IntegrationError::SetupError)?; - - let mut test_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - test_dir.push("../../integration_logs"); - test_dir.push(format!("{date_format}_{test_name}")); - - std::fs::create_dir_all(&test_dir).map_err(|_| IntegrationError::SetupError)?; - - Ok(Self { - test_dir, - services: Vec::new(), - }) - } - - pub async fn start( - &mut self, - name: &str, - config: &T, - ) -> Result<&mut ServiceInstance, IntegrationError> { - let service = self.create_service(name)?; - service.start_with_config(config).await?; - Ok(service) - } - - pub fn create_service(&mut self, name: &str) -> Result<&mut ServiceInstance, IntegrationError> { - let service = ServiceInstance::new(name.to_string(), self.test_dir.clone()); - self.services.push(service); - Ok(self.services.last_mut().unwrap()) - } -} - -fn open_log_file(path: &PathBuf) -> Result { - let prefix = path.parent().unwrap(); - std::fs::create_dir_all(prefix).map_err(|_| IntegrationError::LogError)?; - - OpenOptions::new() - .append(true) - .create(true) - .open(path) - .map_err(|_| IntegrationError::LogError) -} - -impl Drop for IntegrationFramework { - fn drop(&mut self) { - for service in &mut self.services { - let _ = service.stop(); - } - } -} - -const BUILDER_PRIVATE_KEY: &str = - "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d"; - -pub struct TestHarnessBuilder { - name: String, - use_revert_protection: bool, -} - -impl TestHarnessBuilder { - pub fn new(name: &str) -> Self { - Self { - name: name.to_string(), - use_revert_protection: false, - } - } - - pub fn with_revert_protection(mut self) -> Self { - self.use_revert_protection = true; - self - } - - pub async fn build(self) -> eyre::Result { - let mut framework = IntegrationFramework::new(&self.name).unwrap(); - - // we are going to use the fixture genesis and copy it to each test folder - let genesis = include_str!("../tester/fixtures/genesis.json.tmpl"); - - let mut genesis_path = framework.test_dir.clone(); - genesis_path.push("genesis.json"); - std::fs::write(&genesis_path, genesis)?; - - // create the builder - let builder_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); - let builder_auth_rpc_port = get_available_port(); - let builder_http_port = get_available_port(); - let op_rbuilder_config = OpRbuilderConfig::new() - .chain_config_path(genesis_path.clone()) - .data_dir(builder_data_dir) - .auth_rpc_port(builder_auth_rpc_port) - .network_port(get_available_port()) - .http_port(builder_http_port) - .with_builder_private_key(BUILDER_PRIVATE_KEY) - .with_revert_protection(self.use_revert_protection); - - // create the validation reth node - - let reth_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); - let validator_auth_rpc_port = get_available_port(); - let reth = OpRethConfig::new() - .chain_config_path(genesis_path) - .data_dir(reth_data_dir) - .auth_rpc_port(validator_auth_rpc_port) - .network_port(get_available_port()); - - framework.start("op-reth", &reth).await.unwrap(); - - let builder = framework - .start("op-rbuilder", &op_rbuilder_config) - .await - .unwrap(); - - let builder_log_path = builder.log_path.clone(); - - Ok(TestHarness { - _framework: framework, - builder_auth_rpc_port, - builder_http_port, - validator_auth_rpc_port, - builder_log_path, - }) - } -} - -pub struct TestHarness { - _framework: IntegrationFramework, - builder_auth_rpc_port: u16, - builder_http_port: u16, - validator_auth_rpc_port: u16, - #[allow(dead_code)] // I think this is due to some feature flag conflicts - builder_log_path: PathBuf, -} - -impl TestHarness { - pub async fn send_valid_transaction( - &self, - ) -> eyre::Result> { - // Get builder's address - let known_wallet = Signer::try_from_secret(BUILDER_PRIVATE_KEY.parse()?)?; - let builder_address = known_wallet.address; - - let url = format!("http://localhost:{}", self.builder_http_port); - let provider = - ProviderBuilder::::default().on_http(url.parse()?); - - // Get current nonce includeing the ones from the txpool - let nonce = provider - .get_transaction_count(builder_address) - .pending() - .await?; - - let latest_block = provider - .get_block_by_number(BlockNumberOrTag::Latest) - .await? - .unwrap(); - - let base_fee = max( - latest_block.header.base_fee_per_gas.unwrap(), - MIN_PROTOCOL_BASE_FEE, - ); - - // Transaction from builder should succeed - let tx_request = OpTypedTransaction::Eip1559(TxEip1559 { - chain_id: 901, - nonce, - gas_limit: 210000, - max_fee_per_gas: base_fee.into(), - ..Default::default() - }); - let signed_tx = known_wallet.sign_tx(tx_request)?; - let pending_tx = provider - .send_raw_transaction(signed_tx.encoded_2718().as_slice()) - .await?; - - Ok(pending_tx) - } - - pub async fn send_revert_transaction( - &self, - ) -> eyre::Result> { - // TODO: Merge this with send_valid_transaction - // Get builder's address - let known_wallet = Signer::try_from_secret(BUILDER_PRIVATE_KEY.parse()?)?; - let builder_address = known_wallet.address; - - let url = format!("http://localhost:{}", self.builder_http_port); - let provider = - ProviderBuilder::::default().on_http(url.parse()?); - - // Get current nonce includeing the ones from the txpool - let nonce = provider - .get_transaction_count(builder_address) - .pending() - .await?; - - let latest_block = provider - .get_block_by_number(BlockNumberOrTag::Latest) - .await? - .unwrap(); - - let base_fee = max( - latest_block.header.base_fee_per_gas.unwrap(), - MIN_PROTOCOL_BASE_FEE, - ); - - // Transaction from builder should succeed - let tx_request = OpTypedTransaction::Eip1559(TxEip1559 { - chain_id: 901, - nonce, - gas_limit: 210000, - max_fee_per_gas: base_fee.into(), - input: hex!("60006000fd").into(), // PUSH1 0x00 PUSH1 0x00 REVERT - ..Default::default() - }); - let signed_tx = known_wallet.sign_tx(tx_request)?; - let pending_tx = provider - .send_raw_transaction(signed_tx.encoded_2718().as_slice()) - .await?; - - Ok(pending_tx) - } - - pub fn provider(&self) -> eyre::Result> { - let url = format!("http://localhost:{}", self.builder_http_port); - let provider = - ProviderBuilder::::default().on_http(url.parse()?); - - Ok(provider) - } - - pub async fn block_generator(&self) -> eyre::Result { - let engine_api = EngineApi::new_with_port(self.builder_auth_rpc_port).unwrap(); - let validation_api = Some(EngineApi::new_with_port(self.validator_auth_rpc_port).unwrap()); - - let mut generator = BlockGenerator::new(engine_api, validation_api, false, 1, None); - generator.init().await?; - - Ok(generator) - } -} - -pub fn get_available_port() -> u16 { - static CLAIMED_PORTS: LazyLock>> = - LazyLock::new(|| Mutex::new(HashSet::new())); - loop { - let port: u16 = rand::random_range(1000..20000); - if TcpListener::bind(("127.0.0.1", port)).is_ok() && CLAIMED_PORTS.lock().insert(port) { - return port; - } - } -} diff --git a/crates/op-rbuilder/src/integration/op_reth.rs b/crates/op-rbuilder/src/integration/op_reth.rs deleted file mode 100644 index 154ba1190..000000000 --- a/crates/op-rbuilder/src/integration/op_reth.rs +++ /dev/null @@ -1,110 +0,0 @@ -use crate::integration::{poll_logs, IntegrationError, Service, DEFAULT_JWT_TOKEN}; -use futures_util::Future; -use std::{ - path::{Path, PathBuf}, - process::Command, - time::Duration, -}; - -fn get_or_create_jwt_path(jwt_path: Option<&PathBuf>) -> PathBuf { - jwt_path.cloned().unwrap_or_else(|| { - let tmp_dir = std::env::temp_dir(); - let jwt_path = tmp_dir.join("jwt.hex"); - std::fs::write(&jwt_path, DEFAULT_JWT_TOKEN).expect("Failed to write JWT secret file"); - jwt_path - }) -} - -#[derive(Default)] -pub struct OpRethConfig { - auth_rpc_port: Option, - jwt_secret_path: Option, - chain_config_path: Option, - data_dir: Option, - http_port: Option, - network_port: Option, -} - -impl OpRethConfig { - pub fn new() -> Self { - Self::default() - } - - pub fn auth_rpc_port(mut self, port: u16) -> Self { - self.auth_rpc_port = Some(port); - self - } - - pub fn chain_config_path>(mut self, path: P) -> Self { - self.chain_config_path = Some(path.into()); - self - } - - pub fn data_dir>(mut self, path: P) -> Self { - self.data_dir = Some(path.into()); - self - } - - pub fn network_port(mut self, port: u16) -> Self { - self.network_port = Some(port); - self - } -} - -impl Service for OpRethConfig { - fn command(&self) -> Command { - let bin_path = PathBuf::from("op-reth"); - - let mut cmd = Command::new(bin_path); - let jwt_path = get_or_create_jwt_path(self.jwt_secret_path.as_ref()); - - cmd.arg("node") - .arg("--authrpc.port") - .arg( - self.auth_rpc_port - .expect("auth_rpc_port not set") - .to_string(), - ) - .arg("--authrpc.jwtsecret") - .arg( - jwt_path - .to_str() - .expect("Failed to convert jwt_path to string"), - ) - .arg("--chain") - .arg( - self.chain_config_path - .as_ref() - .expect("chain_config_path not set"), - ) - .arg("--datadir") - .arg(self.data_dir.as_ref().expect("data_dir not set")) - .arg("--disable-discovery") - .arg("--color") - .arg("never") - .arg("--port") - .arg(self.network_port.expect("network_port not set").to_string()) - .arg("--ipcdisable"); - - if let Some(http_port) = self.http_port { - cmd.arg("--http") - .arg("--http.port") - .arg(http_port.to_string()); - } - - cmd - } - - #[allow(clippy::manual_async_fn)] - fn ready(&self, log_path: &Path) -> impl Future> + Send { - async move { - poll_logs( - log_path, - "Starting consensus engine", - Duration::from_millis(100), - Duration::from_secs(60), - ) - .await - } - } -} diff --git a/crates/op-rbuilder/src/lib.rs b/crates/op-rbuilder/src/lib.rs index 211b1ddbb..f377643f9 100644 --- a/crates/op-rbuilder/src/lib.rs +++ b/crates/op-rbuilder/src/lib.rs @@ -1,4 +1,5 @@ -pub mod integration; pub mod primitives; -pub mod tester; pub mod tx_signer; + +#[cfg(any(test, feature = "testing"))] +pub mod tests; diff --git a/crates/op-rbuilder/src/main.rs b/crates/op-rbuilder/src/main.rs index c7db90a67..0da7a7075 100644 --- a/crates/op-rbuilder/src/main.rs +++ b/crates/op-rbuilder/src/main.rs @@ -2,28 +2,28 @@ use args::CliExt; use clap::Parser; use reth_optimism_cli::{chainspec::OpChainSpecParser, Cli}; use reth_optimism_node::{node::OpAddOnsBuilder, OpNode}; - -#[cfg(feature = "flashblocks")] -use payload_builder::CustomOpPayloadBuilder; -#[cfg(not(feature = "flashblocks"))] -use payload_builder_vanilla::CustomOpPayloadBuilder; use reth_transaction_pool::TransactionPool; /// CLI argument parsing. pub mod args; pub mod generator; -#[cfg(test)] -mod integration; mod metrics; mod monitor_tx_pool; +mod primitives; +mod tx_signer; + #[cfg(feature = "flashblocks")] pub mod payload_builder; + #[cfg(not(feature = "flashblocks"))] mod payload_builder_vanilla; -mod primitives; -#[cfg(test)] -mod tester; -mod tx_signer; + +#[cfg(not(feature = "flashblocks"))] +use payload_builder_vanilla::CustomOpPayloadBuilder; + +#[cfg(feature = "flashblocks")] +use payload_builder::CustomOpPayloadBuilder; + use metrics::{ VersionInfo, BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES, VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA, diff --git a/crates/op-rbuilder/src/tests/flashblocks/mod.rs b/crates/op-rbuilder/src/tests/flashblocks/mod.rs new file mode 100644 index 000000000..6ca676337 --- /dev/null +++ b/crates/op-rbuilder/src/tests/flashblocks/mod.rs @@ -0,0 +1,3 @@ +#![cfg(test)] + +mod smoke; diff --git a/crates/op-rbuilder/src/tests/flashblocks/smoke.rs b/crates/op-rbuilder/src/tests/flashblocks/smoke.rs new file mode 100644 index 000000000..63e71cb3b --- /dev/null +++ b/crates/op-rbuilder/src/tests/flashblocks/smoke.rs @@ -0,0 +1,67 @@ +use std::sync::Arc; + +use futures::StreamExt; +use parking_lot::Mutex; +use tokio::task::JoinHandle; +use tokio_tungstenite::{connect_async, tungstenite::Message}; +use tokio_util::sync::CancellationToken; + +use crate::tests::TestHarnessBuilder; + +#[tokio::test] +#[ignore = "Flashblocks tests need more work"] +async fn chain_produces_blocks() -> eyre::Result<()> { + let harness = TestHarnessBuilder::new("flashbots_chain_produces_blocks") + .with_flashblocks_ws_url("ws://localhost:1239") + .with_chain_block_time(2000) + .with_flashbots_block_time(200) + .build() + .await?; + + // Create a struct to hold received messages + let received_messages = Arc::new(Mutex::new(Vec::new())); + let messages_clone = received_messages.clone(); + let cancellation_token = CancellationToken::new(); + + // Spawn WebSocket listener task + let cancellation_token_clone = cancellation_token.clone(); + let ws_handle: JoinHandle> = tokio::spawn(async move { + let (ws_stream, _) = connect_async("ws://localhost:1239").await?; + let (_, mut read) = ws_stream.split(); + + loop { + tokio::select! { + _ = cancellation_token_clone.cancelled() => { + break Ok(()); + } + Some(Ok(Message::Text(text))) = read.next() => { + messages_clone.lock().push(text); + } + } + } + }); + + let mut generator = harness.block_generator().await?; + + for _ in 0..10 { + for _ in 0..5 { + // send a valid transaction + let _ = harness.send_valid_transaction().await?; + } + + generator.generate_block().await?; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + + cancellation_token.cancel(); + assert!(ws_handle.await.is_ok(), "WebSocket listener task failed"); + assert!( + !received_messages + .lock() + .iter() + .any(|msg| msg.contains("Building flashblock")), + "No messages received from WebSocket" + ); + + Ok(()) +} diff --git a/crates/op-rbuilder/src/tests/framework/apis.rs b/crates/op-rbuilder/src/tests/framework/apis.rs new file mode 100644 index 000000000..37ac84dc1 --- /dev/null +++ b/crates/op-rbuilder/src/tests/framework/apis.rs @@ -0,0 +1,180 @@ +use super::DEFAULT_JWT_TOKEN; +use alloy_eips::BlockNumberOrTag; +use alloy_primitives::B256; +use alloy_rpc_types_engine::{ExecutionPayloadV3, ForkchoiceUpdated, PayloadStatus}; +use jsonrpsee::{ + core::RpcResult, + http_client::{transport::HttpBackend, HttpClient}, + proc_macros::rpc, +}; +use reth::rpc::{api::EngineApiClient, types::engine::ForkchoiceState}; +use reth_node_api::{EngineTypes, PayloadTypes}; +use reth_optimism_node::OpEngineTypes; +use reth_payload_builder::PayloadId; +use reth_rpc_layer::{AuthClientLayer, AuthClientService, JwtSecret}; +use serde_json::Value; +use std::str::FromStr; + +/// Helper for engine api operations +pub struct EngineApi { + pub engine_api_client: HttpClient>, +} + +/// Builder for EngineApi configuration +pub struct EngineApiBuilder { + url: String, + jwt_secret: String, +} + +impl Default for EngineApiBuilder { + fn default() -> Self { + Self::new() + } +} + +impl EngineApiBuilder { + pub fn new() -> Self { + Self { + url: String::from("http://localhost:8551"), + jwt_secret: String::from(DEFAULT_JWT_TOKEN), + } + } + + pub fn with_url(mut self, url: &str) -> Self { + self.url = url.to_string(); + self + } + + pub fn build(self) -> Result> { + let secret_layer = AuthClientLayer::new(JwtSecret::from_str(&self.jwt_secret)?); + let middleware = tower::ServiceBuilder::default().layer(secret_layer); + let client = jsonrpsee::http_client::HttpClientBuilder::default() + .set_http_middleware(middleware) + .build(&self.url) + .expect("Failed to create http client"); + + Ok(EngineApi { + engine_api_client: client, + }) + } +} + +impl EngineApi { + pub fn builder() -> EngineApiBuilder { + EngineApiBuilder::new() + } + + pub fn new(url: &str) -> Result> { + Self::builder().with_url(url).build() + } + + pub fn new_with_port(port: u16) -> Result> { + Self::builder() + .with_url(&format!("http://localhost:{port}")) + .build() + } + + pub async fn get_payload_v3( + &self, + payload_id: PayloadId, + ) -> eyre::Result<::ExecutionPayloadEnvelopeV3> { + println!( + "Fetching payload with id: {} at {}", + payload_id, + chrono::Utc::now() + ); + + Ok( + EngineApiClient::::get_payload_v3(&self.engine_api_client, payload_id) + .await?, + ) + } + + pub async fn new_payload( + &self, + payload: ExecutionPayloadV3, + versioned_hashes: Vec, + parent_beacon_block_root: B256, + ) -> eyre::Result { + println!("Submitting new payload at {}...", chrono::Utc::now()); + + Ok(EngineApiClient::::new_payload_v3( + &self.engine_api_client, + payload, + versioned_hashes, + parent_beacon_block_root, + ) + .await?) + } + + pub async fn update_forkchoice( + &self, + current_head: B256, + new_head: B256, + payload_attributes: Option<::PayloadAttributes>, + ) -> eyre::Result { + println!("Updating forkchoice at {}...", chrono::Utc::now()); + + Ok(EngineApiClient::::fork_choice_updated_v3( + &self.engine_api_client, + ForkchoiceState { + head_block_hash: new_head, + safe_block_hash: current_head, + finalized_block_hash: current_head, + }, + payload_attributes, + ) + .await?) + } + + pub async fn latest(&self) -> eyre::Result> { + self.get_block_by_number(BlockNumberOrTag::Latest, false) + .await + } + + pub async fn get_block_by_number( + &self, + number: BlockNumberOrTag, + include_txs: bool, + ) -> eyre::Result> { + Ok( + BlockApiClient::get_block_by_number(&self.engine_api_client, number, include_txs) + .await?, + ) + } +} + +#[rpc(server, client, namespace = "eth")] +pub trait BlockApi { + #[method(name = "getBlockByNumber")] + async fn get_block_by_number( + &self, + block_number: BlockNumberOrTag, + include_txs: bool, + ) -> RpcResult>; +} + +pub async fn generate_genesis(output: Option) -> eyre::Result<()> { + // Read the template file + let template = include_str!("artifacts/genesis.json.tmpl"); + + // Parse the JSON + let mut genesis: Value = serde_json::from_str(template)?; + + // Update the timestamp field - example using current timestamp + let timestamp = chrono::Utc::now().timestamp(); + if let Some(config) = genesis.as_object_mut() { + // Assuming timestamp is at the root level - adjust path as needed + config["timestamp"] = Value::String(format!("0x{timestamp:x}")); + } + + // Write the result to the output file + if let Some(output) = output { + std::fs::write(&output, serde_json::to_string_pretty(&genesis)?)?; + println!("Generated genesis file at: {output}"); + } else { + println!("{}", serde_json::to_string_pretty(&genesis)?); + } + + Ok(()) +} diff --git a/crates/op-rbuilder/src/tester/fixtures/genesis.json.tmpl b/crates/op-rbuilder/src/tests/framework/artifacts/genesis.json.tmpl similarity index 100% rename from crates/op-rbuilder/src/tester/fixtures/genesis.json.tmpl rename to crates/op-rbuilder/src/tests/framework/artifacts/genesis.json.tmpl diff --git a/crates/op-rbuilder/src/tester/fixtures/test-jwt-secret.txt b/crates/op-rbuilder/src/tests/framework/artifacts/test-jwt-secret.txt similarity index 100% rename from crates/op-rbuilder/src/tester/fixtures/test-jwt-secret.txt rename to crates/op-rbuilder/src/tests/framework/artifacts/test-jwt-secret.txt diff --git a/crates/op-rbuilder/src/tester/mod.rs b/crates/op-rbuilder/src/tests/framework/blocks.rs similarity index 66% rename from crates/op-rbuilder/src/tester/mod.rs rename to crates/op-rbuilder/src/tests/framework/blocks.rs index 08bbb7929..847025d74 100644 --- a/crates/op-rbuilder/src/tester/mod.rs +++ b/crates/op-rbuilder/src/tests/framework/blocks.rs @@ -2,193 +2,15 @@ use crate::tx_signer::Signer; use alloy_eips::{eip2718::Encodable2718, BlockNumberOrTag}; use alloy_primitives::{address, hex, Address, Bytes, TxKind, B256, U256}; use alloy_rpc_types_engine::{ - ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3, ForkchoiceUpdated, - PayloadAttributes, PayloadStatus, PayloadStatusEnum, + ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3, PayloadAttributes, + PayloadStatusEnum, }; use alloy_rpc_types_eth::Block; -use jsonrpsee::{ - core::RpcResult, - http_client::{transport::HttpBackend, HttpClient}, - proc_macros::rpc, -}; use op_alloy_consensus::{OpTypedTransaction, TxDeposit}; use op_alloy_rpc_types_engine::OpPayloadAttributes; -use reth::rpc::{api::EngineApiClient, types::engine::ForkchoiceState}; -use reth_node_api::{EngineTypes, PayloadTypes}; -use reth_optimism_node::OpEngineTypes; -use reth_payload_builder::PayloadId; -use reth_rpc_layer::{AuthClientLayer, AuthClientService, JwtSecret}; use rollup_boost::{Flashblocks, FlashblocksService}; -use serde_json::Value; -use std::str::FromStr; - -/// Helper for engine api operations -pub struct EngineApi { - pub engine_api_client: HttpClient>, -} - -/// Builder for EngineApi configuration -pub struct EngineApiBuilder { - url: String, - jwt_secret: String, -} - -impl Default for EngineApiBuilder { - fn default() -> Self { - Self::new() - } -} - -impl EngineApiBuilder { - pub fn new() -> Self { - Self { - url: String::from("http://localhost:8551"), // default value - jwt_secret: String::from( - "688f5d737bad920bdfb2fc2f488d6b6209eebda1dae949a8de91398d932c517a", - ), // default value - } - } - - pub fn with_url(mut self, url: &str) -> Self { - self.url = url.to_string(); - self - } - pub fn build(self) -> Result> { - let secret_layer = AuthClientLayer::new(JwtSecret::from_str(&self.jwt_secret)?); - let middleware = tower::ServiceBuilder::default().layer(secret_layer); - let client = jsonrpsee::http_client::HttpClientBuilder::default() - .set_http_middleware(middleware) - .build(&self.url) - .expect("Failed to create http client"); - - Ok(EngineApi { - engine_api_client: client, - }) - } -} - -impl EngineApi { - pub fn builder() -> EngineApiBuilder { - EngineApiBuilder::new() - } - - pub fn new(url: &str) -> Result> { - Self::builder().with_url(url).build() - } - - pub fn new_with_port(port: u16) -> Result> { - Self::builder() - .with_url(&format!("http://localhost:{port}")) - .build() - } - - pub async fn get_payload_v3( - &self, - payload_id: PayloadId, - ) -> eyre::Result<::ExecutionPayloadEnvelopeV3> { - println!( - "Fetching payload with id: {} at {}", - payload_id, - chrono::Utc::now() - ); - - Ok( - EngineApiClient::::get_payload_v3(&self.engine_api_client, payload_id) - .await?, - ) - } - - pub async fn new_payload( - &self, - payload: ExecutionPayloadV3, - versioned_hashes: Vec, - parent_beacon_block_root: B256, - ) -> eyre::Result { - println!("Submitting new payload at {}...", chrono::Utc::now()); - - Ok(EngineApiClient::::new_payload_v3( - &self.engine_api_client, - payload, - versioned_hashes, - parent_beacon_block_root, - ) - .await?) - } - - pub async fn update_forkchoice( - &self, - current_head: B256, - new_head: B256, - payload_attributes: Option<::PayloadAttributes>, - ) -> eyre::Result { - println!("Updating forkchoice at {}...", chrono::Utc::now()); - - Ok(EngineApiClient::::fork_choice_updated_v3( - &self.engine_api_client, - ForkchoiceState { - head_block_hash: new_head, - safe_block_hash: current_head, - finalized_block_hash: current_head, - }, - payload_attributes, - ) - .await?) - } - - pub async fn latest(&self) -> eyre::Result> { - self.get_block_by_number(BlockNumberOrTag::Latest, false) - .await - } - - pub async fn get_block_by_number( - &self, - number: BlockNumberOrTag, - include_txs: bool, - ) -> eyre::Result> { - Ok( - BlockApiClient::get_block_by_number(&self.engine_api_client, number, include_txs) - .await?, - ) - } -} - -#[rpc(server, client, namespace = "eth")] -pub trait BlockApi { - #[method(name = "getBlockByNumber")] - async fn get_block_by_number( - &self, - block_number: BlockNumberOrTag, - include_txs: bool, - ) -> RpcResult>; -} - -// TODO: This is not being recognized as used code by the main function -#[allow(dead_code)] -pub async fn generate_genesis(output: Option) -> eyre::Result<()> { - // Read the template file - let template = include_str!("fixtures/genesis.json.tmpl"); - - // Parse the JSON - let mut genesis: Value = serde_json::from_str(template)?; - - // Update the timestamp field - example using current timestamp - let timestamp = chrono::Utc::now().timestamp(); - if let Some(config) = genesis.as_object_mut() { - // Assuming timestamp is at the root level - adjust path as needed - config["timestamp"] = Value::String(format!("0x{timestamp:x}")); - } - - // Write the result to the output file - if let Some(output) = output { - std::fs::write(&output, serde_json::to_string_pretty(&genesis)?)?; - println!("Generated genesis file at: {output}"); - } else { - println!("{}", serde_json::to_string_pretty(&genesis)?); - } - - Ok(()) -} +use super::apis::EngineApi; // L1 block info for OP mainnet block 124665056 (stored in input of tx at index 0) // @@ -475,7 +297,6 @@ impl BlockGenerator { } /// Submit a deposit transaction to seed an account with ETH - #[allow(dead_code)] pub async fn deposit(&mut self, address: Address, value: u128) -> eyre::Result { // Create deposit transaction let deposit_tx = TxDeposit { @@ -497,40 +318,25 @@ impl BlockGenerator { self.submit_payload(Some(vec![signed_tx_rlp.into()]), 0, false) .await } -} -// TODO: This is not being recognized as used code by the main function -#[allow(dead_code)] -pub async fn run_system( - validation: bool, - no_tx_pool: bool, - block_time_secs: u64, - flashblocks_endpoint: Option, - no_sleep: bool, -) -> eyre::Result<()> { - println!("Validation: {validation}"); - - let engine_api = EngineApi::new("http://localhost:4444").unwrap(); - let validation_api = if validation { - Some(EngineApi::new("http://localhost:5555").unwrap()) - } else { - None - }; - - let mut generator = BlockGenerator::new( - engine_api, - validation_api, - no_tx_pool, - block_time_secs, - flashblocks_endpoint, - ); - - generator.init().await?; - - // Infinite loop generating blocks - loop { - println!("Generating new block..."); - let block_hash = generator.submit_payload(None, 0, no_sleep).await?; - println!("Generated block: {block_hash}"); + pub async fn create_funded_accounts( + &mut self, + count: usize, + amount: u128, + ) -> eyre::Result> { + let mut signers = Vec::with_capacity(count); + + for _ in 0..count { + // Create a new signer + let signer = Signer::random(); + let address = signer.address; + + // Deposit funds to the new account + self.deposit(address, amount).await?; + + signers.push(signer); + } + + Ok(signers) } } diff --git a/crates/op-rbuilder/src/tests/framework/harness.rs b/crates/op-rbuilder/src/tests/framework/harness.rs new file mode 100644 index 000000000..c9f355c5e --- /dev/null +++ b/crates/op-rbuilder/src/tests/framework/harness.rs @@ -0,0 +1,260 @@ +use super::{ + apis::EngineApi, + blocks::BlockGenerator, + op::{OpRbuilderConfig, OpRethConfig}, + service::{self, Service, ServiceInstance}, + TransactionBuilder, BUILDER_PRIVATE_KEY, +}; +use alloy_eips::BlockNumberOrTag; +use alloy_network::Network; +use alloy_primitives::hex; +use alloy_provider::{ + Identity, PendingTransactionBuilder, Provider, ProviderBuilder, RootProvider, +}; +use op_alloy_network::Optimism; +use parking_lot::Mutex; +use std::{ + collections::HashSet, net::TcpListener, path::PathBuf, sync::LazyLock, time::SystemTime, +}; +use time::{format_description, OffsetDateTime}; +use uuid::Uuid; + +pub struct TestHarnessBuilder { + name: String, + use_revert_protection: bool, + flashblocks_ws_url: Option, + chain_block_time: Option, + flashbots_block_time: Option, +} + +impl TestHarnessBuilder { + pub fn new(name: &str) -> Self { + Self { + name: name.to_string(), + use_revert_protection: false, + flashblocks_ws_url: None, + chain_block_time: None, + flashbots_block_time: None, + } + } + + pub fn with_revert_protection(mut self) -> Self { + self.use_revert_protection = true; + self + } + + pub fn with_flashblocks_ws_url(mut self, url: &str) -> Self { + self.flashblocks_ws_url = Some(url.to_string()); + self + } + + pub fn with_chain_block_time(mut self, block_time: u64) -> Self { + self.chain_block_time = Some(block_time); + self + } + + pub fn with_flashbots_block_time(mut self, block_time: u64) -> Self { + self.flashbots_block_time = Some(block_time); + self + } + + pub async fn build(self) -> eyre::Result { + let mut framework = IntegrationFramework::new(&self.name).unwrap(); + + // we are going to use the fixture genesis and copy it to each test folder + let genesis = include_str!("artifacts/genesis.json.tmpl"); + + let mut genesis_path = framework.test_dir.clone(); + genesis_path.push("genesis.json"); + std::fs::write(&genesis_path, genesis)?; + + // create the builder + let builder_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); + let builder_auth_rpc_port = get_available_port(); + let builder_http_port = get_available_port(); + let mut op_rbuilder_config = OpRbuilderConfig::new() + .chain_config_path(genesis_path.clone()) + .data_dir(builder_data_dir) + .auth_rpc_port(builder_auth_rpc_port) + .network_port(get_available_port()) + .http_port(builder_http_port) + .with_builder_private_key(BUILDER_PRIVATE_KEY) + .with_revert_protection(self.use_revert_protection); + + if let Some(flashblocks_ws_url) = self.flashblocks_ws_url { + op_rbuilder_config = op_rbuilder_config.with_flashblocks_ws_url(&flashblocks_ws_url); + } + + if let Some(chain_block_time) = self.chain_block_time { + op_rbuilder_config = op_rbuilder_config.with_chain_block_time(chain_block_time); + } + + if let Some(flashbots_block_time) = self.flashbots_block_time { + op_rbuilder_config = op_rbuilder_config.with_flashbots_block_time(flashbots_block_time); + } + + // create the validation reth node + + let reth_data_dir = std::env::temp_dir().join(Uuid::new_v4().to_string()); + let validator_auth_rpc_port = get_available_port(); + let reth = OpRethConfig::new() + .chain_config_path(genesis_path) + .data_dir(reth_data_dir) + .auth_rpc_port(validator_auth_rpc_port) + .network_port(get_available_port()); + + framework.start("op-reth", &reth).await.unwrap(); + + let builder = framework + .start("op-rbuilder", &op_rbuilder_config) + .await + .unwrap(); + + let builder_log_path = builder.log_path.clone(); + + Ok(TestHarness { + _framework: framework, + builder_auth_rpc_port, + builder_http_port, + validator_auth_rpc_port, + builder_log_path, + }) + } +} + +pub struct TestHarness { + _framework: IntegrationFramework, + builder_auth_rpc_port: u16, + builder_http_port: u16, + validator_auth_rpc_port: u16, + builder_log_path: PathBuf, +} + +impl TestHarness { + pub async fn send_valid_transaction( + &self, + ) -> eyre::Result> { + self.create_transaction().send().await + } + + pub async fn send_revert_transaction( + &self, + ) -> eyre::Result> { + self.create_transaction() + .with_input(hex!("60006000fd").into()) // PUSH1 0x00 PUSH1 0x00 REVERT + .send() + .await + } + + pub fn provider(&self) -> eyre::Result> { + let url = format!("http://localhost:{}", self.builder_http_port); + let provider = + ProviderBuilder::::default().on_http(url.parse()?); + + Ok(provider) + } + + pub async fn block_generator(&self) -> eyre::Result { + let engine_api = EngineApi::new_with_port(self.builder_auth_rpc_port).unwrap(); + let validation_api = Some(EngineApi::new_with_port(self.validator_auth_rpc_port).unwrap()); + + let mut generator = BlockGenerator::new(engine_api, validation_api, false, 1, None); + generator.init().await?; + + Ok(generator) + } + + pub fn create_transaction(&self) -> TransactionBuilder { + TransactionBuilder::new(self.provider().expect("provider not available")) + } + + pub async fn latest_block(&self) -> ::BlockResponse { + self.provider() + .expect("provider not available") + .get_block_by_number(BlockNumberOrTag::Latest) + .full() + .await + .expect("failed to get latest block by hash") + .expect("latest block should exist") + } + + pub async fn latest_base_fee(&self) -> u128 { + self.latest_block() + .await + .header + .base_fee_per_gas + .expect("Base fee per gas not found in the latest block header") as u128 + } + + pub const fn builder_private_key() -> &'static str { + BUILDER_PRIVATE_KEY + } + + pub const fn builder_log_path(&self) -> &PathBuf { + &self.builder_log_path + } +} + +pub fn get_available_port() -> u16 { + static CLAIMED_PORTS: LazyLock>> = + LazyLock::new(|| Mutex::new(HashSet::new())); + loop { + let port: u16 = rand::random_range(1000..20000); + if TcpListener::bind(("127.0.0.1", port)).is_ok() && CLAIMED_PORTS.lock().insert(port) { + return port; + } + } +} + +#[derive(Debug)] +pub enum IntegrationError { + SpawnError, + BinaryNotFound, + SetupError, + LogError, + ServiceAlreadyRunning, +} + +struct IntegrationFramework { + test_dir: PathBuf, + services: Vec, +} + +impl IntegrationFramework { + pub fn new(test_name: &str) -> Result { + let dt: OffsetDateTime = SystemTime::now().into(); + let format = format_description::parse("[year]_[month]_[day]_[hour]_[minute]_[second]") + .map_err(|_| IntegrationError::SetupError)?; + + let date_format = dt + .format(&format) + .map_err(|_| IntegrationError::SetupError)?; + + let mut test_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + test_dir.push("../../integration_logs"); + test_dir.push(format!("{date_format}_{test_name}")); + + std::fs::create_dir_all(&test_dir).map_err(|_| IntegrationError::SetupError)?; + + Ok(Self { + test_dir, + services: Vec::new(), + }) + } + + pub async fn start( + &mut self, + name: &str, + config: &T, + ) -> Result<&mut ServiceInstance, service::Error> { + let service = self.create_service(name)?; + service.start_with_config(config).await?; + Ok(service) + } + + pub fn create_service(&mut self, name: &str) -> Result<&mut ServiceInstance, service::Error> { + let service = ServiceInstance::new(name.to_string(), self.test_dir.clone()); + self.services.push(service); + Ok(self.services.last_mut().unwrap()) + } +} diff --git a/crates/op-rbuilder/src/tests/framework/mod.rs b/crates/op-rbuilder/src/tests/framework/mod.rs new file mode 100644 index 000000000..7ebab77fd --- /dev/null +++ b/crates/op-rbuilder/src/tests/framework/mod.rs @@ -0,0 +1,21 @@ +mod apis; +mod blocks; +mod harness; +mod op; +mod service; +mod txs; + +pub use apis::*; +pub use blocks::*; +pub use harness::*; +pub use op::*; +pub use service::*; +pub use txs::*; + +const BUILDER_PRIVATE_KEY: &str = + "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d"; + +pub const DEFAULT_JWT_TOKEN: &str = + "688f5d737bad920bdfb2fc2f488d6b6209eebda1dae949a8de91398d932c517a"; + +pub const ONE_ETH: u128 = 1_000_000_000_000_000_000; diff --git a/crates/op-rbuilder/src/integration/op_rbuilder.rs b/crates/op-rbuilder/src/tests/framework/op.rs similarity index 58% rename from crates/op-rbuilder/src/integration/op_rbuilder.rs rename to crates/op-rbuilder/src/tests/framework/op.rs index 505fe520d..00a0f7f72 100644 --- a/crates/op-rbuilder/src/integration/op_rbuilder.rs +++ b/crates/op-rbuilder/src/tests/framework/op.rs @@ -1,21 +1,20 @@ -use crate::integration::{poll_logs, IntegrationError, Service, DEFAULT_JWT_TOKEN}; -use futures_util::Future; use std::{ + fs::File, + future::Future, + io::{ErrorKind, Read}, path::{Path, PathBuf}, process::Command, - time::Duration, }; -fn get_or_create_jwt_path(jwt_path: Option<&PathBuf>) -> PathBuf { - jwt_path.cloned().unwrap_or_else(|| { - let tmp_dir = std::env::temp_dir(); - let jwt_path = tmp_dir.join("jwt.hex"); - std::fs::write(&jwt_path, DEFAULT_JWT_TOKEN).expect("Failed to write JWT secret file"); - jwt_path - }) -} +use std::time::Duration; +use tokio::time::sleep; -#[derive(Default)] +use super::{ + service::{self, Service}, + DEFAULT_JWT_TOKEN, +}; + +#[derive(Default, Debug)] pub struct OpRbuilderConfig { auth_rpc_port: Option, jwt_secret_path: Option, @@ -160,7 +159,7 @@ impl Service for OpRbuilderConfig { } #[allow(clippy::manual_async_fn)] - fn ready(&self, log_path: &Path) -> impl Future> + Send { + fn ready(&self, log_path: &Path) -> impl Future> + Send { async move { poll_logs( log_path, @@ -172,3 +171,133 @@ impl Service for OpRbuilderConfig { } } } + +#[derive(Default, Debug)] +pub struct OpRethConfig { + auth_rpc_port: Option, + jwt_secret_path: Option, + chain_config_path: Option, + data_dir: Option, + http_port: Option, + network_port: Option, +} + +impl OpRethConfig { + pub fn new() -> Self { + Self::default() + } + + pub fn auth_rpc_port(mut self, port: u16) -> Self { + self.auth_rpc_port = Some(port); + self + } + + pub fn chain_config_path>(mut self, path: P) -> Self { + self.chain_config_path = Some(path.into()); + self + } + + pub fn data_dir>(mut self, path: P) -> Self { + self.data_dir = Some(path.into()); + self + } + + pub fn network_port(mut self, port: u16) -> Self { + self.network_port = Some(port); + self + } +} + +impl Service for OpRethConfig { + fn command(&self) -> Command { + let bin_path = PathBuf::from("op-reth"); + + let mut cmd = Command::new(bin_path); + let jwt_path = get_or_create_jwt_path(self.jwt_secret_path.as_ref()); + + cmd.arg("node") + .arg("--authrpc.port") + .arg( + self.auth_rpc_port + .expect("auth_rpc_port not set") + .to_string(), + ) + .arg("--authrpc.jwtsecret") + .arg( + jwt_path + .to_str() + .expect("Failed to convert jwt_path to string"), + ) + .arg("--chain") + .arg( + self.chain_config_path + .as_ref() + .expect("chain_config_path not set"), + ) + .arg("--datadir") + .arg(self.data_dir.as_ref().expect("data_dir not set")) + .arg("--disable-discovery") + .arg("--color") + .arg("never") + .arg("--port") + .arg(self.network_port.expect("network_port not set").to_string()) + .arg("--ipcdisable"); + + if let Some(http_port) = self.http_port { + cmd.arg("--http") + .arg("--http.port") + .arg(http_port.to_string()); + } + + cmd + } + + #[allow(clippy::manual_async_fn)] + fn ready(&self, log_path: &Path) -> impl Future> + Send { + async move { + poll_logs( + log_path, + "Starting consensus engine", + Duration::from_millis(100), + Duration::from_secs(60), + ) + .await + } + } +} + +fn get_or_create_jwt_path(jwt_path: Option<&PathBuf>) -> PathBuf { + jwt_path.cloned().unwrap_or_else(|| { + let tmp_dir = std::env::temp_dir(); + let jwt_path = tmp_dir.join("jwt.hex"); + std::fs::write(&jwt_path, DEFAULT_JWT_TOKEN).expect("Failed to write JWT secret file"); + jwt_path + }) +} + +/// Helper function to poll logs periodically +pub async fn poll_logs( + log_path: &Path, + pattern: &str, + interval: Duration, + timeout: Duration, +) -> Result<(), service::Error> { + let start = std::time::Instant::now(); + + loop { + if start.elapsed() > timeout { + return Err(service::Error::Spawn(ErrorKind::TimedOut)); + } + + let mut file = File::open(log_path).map_err(|_| service::Error::Logs)?; + let mut contents = String::new(); + file.read_to_string(&mut contents) + .map_err(|_| service::Error::Logs)?; + + if contents.contains(pattern) { + return Ok(()); + } + + sleep(interval).await; + } +} diff --git a/crates/op-rbuilder/src/tests/framework/service.rs b/crates/op-rbuilder/src/tests/framework/service.rs new file mode 100644 index 000000000..6bc587c4c --- /dev/null +++ b/crates/op-rbuilder/src/tests/framework/service.rs @@ -0,0 +1,111 @@ +use std::{ + fs::{File, OpenOptions}, + future::Future, + io::{ErrorKind, Read}, + path::{Path, PathBuf}, + process::{Child, Command}, +}; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Binary not found")] + BinaryNotFound, + + #[error("Failed to spawn process")] + Spawn(ErrorKind), + + #[error("Failed initialize log streams")] + Logs, + + #[error("Service is already running")] + ServiceAlreadyRunning, +} + +pub struct ServiceInstance { + process: Option, + pub log_path: PathBuf, +} + +impl ServiceInstance { + pub fn new(name: String, test_dir: PathBuf) -> Self { + let log_path = test_dir.join(format!("{name}.log")); + Self { + process: None, + log_path, + } + } + + pub fn start(&mut self, command: Command) -> Result<(), Error> { + if self.process.is_some() { + return Err(Error::ServiceAlreadyRunning); + } + + let log = open_log_file(&self.log_path)?; + let stdout = log.try_clone().map_err(|_| Error::Logs)?; + let stderr = log.try_clone().map_err(|_| Error::Logs)?; + + let mut cmd = command; + cmd.stdout(stdout).stderr(stderr); + + let child = match cmd.spawn() { + Ok(child) => Ok(child), + Err(e) => match e.kind() { + ErrorKind::NotFound => Err(Error::BinaryNotFound), + e => Err(Error::Spawn(e)), + }, + }?; + + self.process = Some(child); + Ok(()) + } + + pub fn stop(&mut self) -> Result<(), Error> { + if let Some(mut process) = self.process.take() { + return process.kill().map_err(|e| Error::Spawn(e.kind())); + } + Ok(()) + } + + /// Start a service using its configuration and wait for it to be ready + pub async fn start_with_config(&mut self, config: &T) -> Result<(), Error> { + self.start(config.command())?; + config.ready(&self.log_path).await?; + Ok(()) + } + + pub async fn find_log_line(&self, pattern: &str) -> eyre::Result<()> { + let mut file = + File::open(&self.log_path).map_err(|_| eyre::eyre!("Failed to open log file"))?; + let mut contents = String::new(); + file.read_to_string(&mut contents) + .map_err(|_| eyre::eyre!("Failed to read log file"))?; + + if contents.contains(pattern) { + Ok(()) + } else { + Err(eyre::eyre!("Pattern not found in log file: {}", pattern)) + } + } +} + +pub struct IntegrationFramework; + +pub trait Service { + /// Configure and return the command to run the service + fn command(&self) -> Command; + + /// Return a future that resolves when the service is ready + fn ready(&self, log_path: &Path) -> impl Future> + Send; +} + +fn open_log_file(path: &PathBuf) -> Result { + let prefix = path.parent().unwrap(); + std::fs::create_dir_all(prefix).map_err(|_| Error::Logs)?; + + OpenOptions::new() + .append(true) + .create(true) + .open(path) + .map_err(|_| Error::Logs) +} diff --git a/crates/op-rbuilder/src/tests/framework/txs.rs b/crates/op-rbuilder/src/tests/framework/txs.rs new file mode 100644 index 000000000..5c55e060f --- /dev/null +++ b/crates/op-rbuilder/src/tests/framework/txs.rs @@ -0,0 +1,127 @@ +use crate::tx_signer::Signer; +use alloy_consensus::TxEip1559; +use alloy_eips::{eip2718::Encodable2718, BlockNumberOrTag}; +use alloy_primitives::Bytes; +use alloy_provider::{PendingTransactionBuilder, Provider, RootProvider}; +use core::cmp::max; +use op_alloy_consensus::{OpTxEnvelope, OpTypedTransaction}; +use op_alloy_network::Optimism; +use reth_primitives::Recovered; + +use alloy_eips::eip1559::MIN_PROTOCOL_BASE_FEE; + +use super::BUILDER_PRIVATE_KEY; + +#[derive(Clone)] +pub struct TransactionBuilder { + provider: RootProvider, + signer: Option, + nonce: Option, + base_fee: Option, + tx: TxEip1559, +} + +impl TransactionBuilder { + pub fn new(provider: RootProvider) -> Self { + Self { + provider, + signer: None, + nonce: None, + base_fee: None, + tx: TxEip1559 { + chain_id: 901, + gas_limit: 210000, + ..Default::default() + }, + } + } + + pub fn with_signer(mut self, signer: Signer) -> Self { + self.signer = Some(signer); + self + } + + pub fn with_chain_id(mut self, chain_id: u64) -> Self { + self.tx.chain_id = chain_id; + self + } + + pub fn with_nonce(mut self, nonce: u64) -> Self { + self.tx.nonce = nonce; + self + } + + pub fn with_gas_limit(mut self, gas_limit: u64) -> Self { + self.tx.gas_limit = gas_limit; + self + } + + pub fn with_max_fee_per_gas(mut self, max_fee_per_gas: u128) -> Self { + self.tx.max_fee_per_gas = max_fee_per_gas; + self + } + + pub fn with_max_priority_fee_per_gas(mut self, max_priority_fee_per_gas: u128) -> Self { + self.tx.max_priority_fee_per_gas = max_priority_fee_per_gas; + self + } + + pub fn with_input(mut self, input: Bytes) -> Self { + self.tx.input = input; + self + } + + pub async fn build(mut self) -> Recovered { + let signer = match self.signer { + Some(signer) => signer, + None => Signer::try_from_secret( + BUILDER_PRIVATE_KEY + .parse() + .expect("invalid hardcoded builder private key"), + ) + .expect("Failed to create signer from hardcoded private key"), + }; + + let nonce = match self.nonce { + Some(nonce) => nonce, + None => self + .provider + .get_transaction_count(signer.address) + .pending() + .await + .expect("Failed to get transaction count"), + }; + + let base_fee = match self.base_fee { + Some(base_fee) => base_fee, + None => { + let previous_base_fee = self + .provider + .get_block_by_number(BlockNumberOrTag::Latest) + .await + .expect("failed to get latest block") + .expect("latest block should exist") + .header + .base_fee_per_gas + .expect("base fee should be present in latest block"); + + max(previous_base_fee as u128, MIN_PROTOCOL_BASE_FEE as u128) + } + }; + + self.tx.nonce = nonce; + self.tx.max_fee_per_gas = base_fee + self.tx.max_priority_fee_per_gas; + + signer + .sign_tx(OpTypedTransaction::Eip1559(self.tx)) + .expect("Failed to sign transaction") + } + + pub async fn send(self) -> eyre::Result> { + let provider = self.provider.clone(); + let transaction = self.build().await; + Ok(provider + .send_raw_transaction(transaction.encoded_2718().as_slice()) + .await?) + } +} diff --git a/crates/op-rbuilder/src/tests/mod.rs b/crates/op-rbuilder/src/tests/mod.rs new file mode 100644 index 000000000..05ed01b61 --- /dev/null +++ b/crates/op-rbuilder/src/tests/mod.rs @@ -0,0 +1,9 @@ +// base +mod framework; +pub use framework::*; + +#[cfg(not(feature = "flashblocks"))] +mod vanilla; + +#[cfg(feature = "flashblocks")] +mod flashblocks; diff --git a/crates/op-rbuilder/src/tests/vanilla/mod.rs b/crates/op-rbuilder/src/tests/vanilla/mod.rs new file mode 100644 index 000000000..f45fa23fd --- /dev/null +++ b/crates/op-rbuilder/src/tests/vanilla/mod.rs @@ -0,0 +1,7 @@ +#![cfg(test)] + +mod ordering; +mod revert; +mod smoke; + +use super::*; diff --git a/crates/op-rbuilder/src/tests/vanilla/ordering.rs b/crates/op-rbuilder/src/tests/vanilla/ordering.rs new file mode 100644 index 000000000..f524e57d4 --- /dev/null +++ b/crates/op-rbuilder/src/tests/vanilla/ordering.rs @@ -0,0 +1,66 @@ +use crate::tests::framework::{TestHarnessBuilder, ONE_ETH}; +use alloy_consensus::Transaction; +use futures::{future::join_all, stream, StreamExt}; + +/// This test ensures that the transactions are ordered by fee priority in the block. +#[tokio::test] +async fn fee_priority_ordering() -> eyre::Result<()> { + let harness = TestHarnessBuilder::new("integration_test_fee_priority_ordering") + .build() + .await?; + + let mut generator = harness.block_generator().await?; + let accounts = generator.create_funded_accounts(10, ONE_ETH).await?; + let base_fee = harness.latest_base_fee().await; + + // generate transactions with randomized tips + let txs = join_all(accounts.iter().map(|signer| { + harness + .create_transaction() + .with_signer(*signer) + .with_max_priority_fee_per_gas(rand::random_range(1..50)) + .send() + })) + .await + .into_iter() + .collect::>>()? + .into_iter() + .map(|tx| *tx.tx_hash()) + .collect::>(); + + generator.generate_block().await?; + + // verify all transactions are included in the block + assert!( + stream::iter(txs.iter()) + .all(|tx_hash| async { + harness + .latest_block() + .await + .transactions + .hashes() + .any(|hash| hash == *tx_hash) + }) + .await, + "not all transactions included in the block" + ); + + // verify all transactions are ordered by fee priority + let txs_tips = harness + .latest_block() + .await + .into_transactions_vec() + .into_iter() + .skip(1) // skip the deposit transaction + .take(txs.len()) // skip the last builder transaction + .map(|tx| tx.effective_tip_per_gas(base_fee as u64)) + .rev() // we want to check descending order + .collect::>(); + + assert!( + txs_tips.is_sorted(), + "Transactions not ordered by fee priority" + ); + + Ok(()) +} diff --git a/crates/op-rbuilder/src/tests/vanilla/revert.rs b/crates/op-rbuilder/src/tests/vanilla/revert.rs new file mode 100644 index 000000000..6976ce576 --- /dev/null +++ b/crates/op-rbuilder/src/tests/vanilla/revert.rs @@ -0,0 +1,127 @@ +use crate::tests::TestHarnessBuilder; +use alloy_provider::Provider; + +/// This test ensures that the transactions that get reverted an not included in the block +/// are emitted as a log on the builder. +#[tokio::test] +async fn monitor_transaction_drops() -> eyre::Result<()> { + let harness = TestHarnessBuilder::new("monitor_transaction_drops") + .with_revert_protection() + .build() + .await?; + + let mut generator = harness.block_generator().await?; + + // send 10 reverting transactions + let mut pending_txn = Vec::new(); + for _ in 0..10 { + let txn = harness.send_revert_transaction().await?; + pending_txn.push(txn); + } + + // generate 10 blocks + for _ in 0..10 { + generator.generate_block().await?; + let latest_block = harness.latest_block().await; + + // blocks should only include two transactions (deposit + builder) + assert_eq!(latest_block.transactions.len(), 2); + } + + // check that the builder emitted logs for the reverted transactions + // with the monitoring logic + // TODO: this is not ideal, lets find a different way to detect this + // Each time a transaction is dropped, it emits a log like this + // 'Transaction event received target="monitoring" tx_hash="" kind="discarded"' + let builder_logs = std::fs::read_to_string(harness.builder_log_path())?; + + for txn in pending_txn { + let txn_log = format!( + "Transaction event received target=\"monitoring\" tx_hash=\"{}\" kind=\"discarded\"", + txn.tx_hash() + ); + + assert!(builder_logs.contains(txn_log.as_str())); + } + + Ok(()) +} + +#[tokio::test] +async fn revert_protection_disabled() -> eyre::Result<()> { + let harness = TestHarnessBuilder::new("revert_protection_disabled") + .build() + .await?; + + let mut generator = harness.block_generator().await?; + + for _ in 0..10 { + let valid_tx = harness.send_valid_transaction().await?; + let reverting_tx = harness.send_revert_transaction().await?; + let block_hash = generator.generate_block().await?; + + let block = harness + .provider()? + .get_block_by_hash(block_hash) + .await? + .expect("block"); + + assert!( + block + .transactions + .hashes() + .any(|hash| hash == *valid_tx.tx_hash()), + "successful transaction missing from block" + ); + + assert!( + block + .transactions + .hashes() + .any(|hash| hash == *reverting_tx.tx_hash()), + "reverted transaction missing from block" + ); + } + + Ok(()) +} + +#[tokio::test] +async fn revert_protection() -> eyre::Result<()> { + let harness = TestHarnessBuilder::new("revert_protection") + .with_revert_protection() + .build() + .await?; + + let mut generator = harness.block_generator().await?; + + for _ in 0..10 { + let valid_tx = harness.send_valid_transaction().await?; + let reverting_tx = harness.send_revert_transaction().await?; + let block_hash = generator.generate_block().await?; + + let block = harness + .provider()? + .get_block_by_hash(block_hash) + .await? + .expect("block"); + + assert!( + block + .transactions + .hashes() + .any(|hash| hash == *valid_tx.tx_hash()), + "successful transaction missing from block" + ); + + assert!( + !block + .transactions + .hashes() + .any(|hash| hash == *reverting_tx.tx_hash()), + "reverted transaction unexpectedly included in block" + ); + } + + Ok(()) +} diff --git a/crates/op-rbuilder/src/tests/vanilla/smoke.rs b/crates/op-rbuilder/src/tests/vanilla/smoke.rs new file mode 100644 index 000000000..33109c4a3 --- /dev/null +++ b/crates/op-rbuilder/src/tests/vanilla/smoke.rs @@ -0,0 +1,130 @@ +use super::framework::TestHarnessBuilder; +use alloy_provider::Provider; +use std::collections::HashSet; + +/// This is a smoke test that ensures that transactions are included in blocks +/// and that the block generator is functioning correctly. +#[tokio::test] +async fn chain_produces_blocks() -> eyre::Result<()> { + let harness = TestHarnessBuilder::new("chain_produces_blocks") + .build() + .await?; + + let mut generator = harness.block_generator().await?; + + const SAMPLE_SIZE: usize = 10; + + // ensure that each block has at least two transactions when + // no user transactions are sent. + // the deposit transaction and the block generator's transaction + for _ in 0..SAMPLE_SIZE { + generator + .generate_block() + .await + .expect("Failed to generate block"); + let transactions = harness.latest_block().await.transactions; + assert!( + transactions.len() == 2, + "Block should have exactly two transactions" + ); + } + + // ensure that transactions are included in blocks and each block has all the transactions + // sent to it during its block time + the two mandatory transactions + for _ in 0..SAMPLE_SIZE { + let count = rand::random_range(1..8); + let mut tx_hashes = HashSet::new(); + for _ in 0..count { + let tx = harness + .send_valid_transaction() + .await + .expect("Failed to send transaction"); + let tx_hash = *tx.tx_hash(); + tx_hashes.insert(tx_hash); + } + generator + .generate_block() + .await + .expect("Failed to generate block"); + let transactions = harness.latest_block().await.transactions; + + assert!( + transactions.len() == 2 + count, + "Block should have {} transactions", + 2 + count + ); + + for tx_hash in tx_hashes { + assert!( + transactions.hashes().any(|hash| hash == *tx_hash), + "Transaction {} should be included in the block", + tx_hash + ); + } + } + + Ok(()) +} + +/// Ensures that payloads are generated correctly even when the builder is busy +/// with other requests, such as fcu or getPayload. +#[tokio::test] +async fn get_payload_close_to_fcu() -> eyre::Result<()> { + let test_harness = TestHarnessBuilder::new("get_payload_close_to_fcu") + .build() + .await?; + let mut block_generator = test_harness.block_generator().await?; + + // add some transactions to the pool so that the builder + // is busy when we send the fcu/getPayload requests + for _ in 0..10 { + // Note, for this test it is okay if they are not valid + let _ = test_harness.send_valid_transaction().await?; + } + + let result = tokio::time::timeout( + std::time::Duration::from_secs(1), + block_generator.submit_payload(None, 0, true), + ) + .await; + + // ensure we didn't timeout + let result = result.expect("Submit payload timed out"); + + // ensure we got a payload + assert!(result.is_ok(), "Failed to get payload: {:?}", result); + + Ok(()) +} + +/// This test validates that if we flood the builder with many transactions +/// and we request short block times, the builder can still eventually resolve all the transactions +#[tokio::test] +async fn transaction_flood_no_sleep() -> eyre::Result<()> { + let test_harness = TestHarnessBuilder::new("transaction_flood_no_sleep") + .build() + .await?; + + let mut block_generator = test_harness.block_generator().await?; + let provider = test_harness.provider()?; + + // Send 200 valid transactions to the builder + // More than this and there is an issue with the RPC endpoint not being able to handle the load + let mut transactions = vec![]; + for _ in 0..200 { + let tx = test_harness.send_valid_transaction().await?; + let tx_hash = *tx.tx_hash(); + transactions.push(tx_hash); + } + + // After a 10 blocks all the transactions should be included in a block + for _ in 0..10 { + block_generator.submit_payload(None, 0, true).await.unwrap(); + } + + for tx in transactions { + provider.get_transaction_receipt(tx).await?; + } + + Ok(()) +}