diff --git a/Cargo.lock b/Cargo.lock index 197dee80062a6..17e69e1215099 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -251,6 +251,7 @@ dependencies = [ "aptos-cli-common", "aptos-config", "aptos-crypto", + "aptos-experimental-bulk-txn-submit", "aptos-faucet-core", "aptos-framework", "aptos-gas-profiling", @@ -1488,6 +1489,30 @@ dependencies = [ "thiserror", ] +[[package]] +name = "aptos-experimental-bulk-txn-submit" +version = "0.0.0" +dependencies = [ + "anyhow", + "aptos-config", + "aptos-crypto", + "aptos-global-constants", + "aptos-logger", + "aptos-sdk", + "aptos-transaction-emitter-lib", + "aptos-transaction-generator-lib", + "bcs 0.1.4", + "chrono", + "clap 4.4.14", + "futures", + "itertools 0.12.1", + "rand 0.7.3", + "reqwest", + "serde", + "serde_json", + "tokio", +] + [[package]] name = "aptos-experimental-layered-map" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 6babe1e5819f5..ec721c6aa22d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -131,6 +131,7 @@ members = [ "execution/executor-service", "execution/executor-test-helpers", "execution/executor-types", + "experimental/bulk-txn-submit", "experimental/execution/ptx-executor", "experimental/runtimes", "experimental/storage/layered-map", @@ -324,6 +325,7 @@ aptos-enum-conversion-derive = { path = "crates/aptos-enum-conversion-derive" } aptos-executor-service = { path = "execution/executor-service" } aptos-executor-test-helpers = { path = "execution/executor-test-helpers" } aptos-executor-types = { path = "execution/executor-types" } +aptos-experimental-bulk-txn-submit = { path = "experimental/bulk-txn-submit" } aptos-experimental-layered-map = { path = "experimental/storage/layered-map" } aptos-experimental-ptx-executor = { path = "experimental/execution/ptx-executor" } aptos-experimental-runtimes = { path = "experimental/runtimes" } diff --git a/crates/aptos/Cargo.toml b/crates/aptos/Cargo.toml index 69d8112f01d8f..68b390ae536f9 100644 --- a/crates/aptos/Cargo.toml +++ b/crates/aptos/Cargo.toml @@ -22,6 +22,7 @@ aptos-cached-packages = { workspace = true } aptos-cli-common = { workspace = true } aptos-config = { workspace = true } aptos-crypto = { workspace = true } +aptos-experimental-bulk-txn-submit = { workspace = true } aptos-faucet-core = { workspace = true } aptos-framework = { workspace = true } aptos-gas-profiling = { workspace = true } diff --git a/crates/aptos/src/common/types.rs b/crates/aptos/src/common/types.rs index 354a49ac0023a..189132e454018 100644 --- a/crates/aptos/src/common/types.rs +++ b/crates/aptos/src/common/types.rs @@ -1526,11 +1526,11 @@ pub struct TransactionOptions { impl TransactionOptions { /// Builds a rest client - fn rest_client(&self) -> CliTypedResult { + pub(crate) fn rest_client(&self) -> CliTypedResult { self.rest_options.client(&self.profile_options) } - pub fn get_transaction_account_type(&self) -> CliTypedResult { + pub(crate) fn get_transaction_account_type(&self) -> CliTypedResult { if self.private_key_options.private_key.is_some() || self.private_key_options.private_key_file.is_some() { @@ -1602,14 +1602,35 @@ impl TransactionOptions { .into_inner()) } - /// Submit a transaction - pub async fn submit_transaction( + pub(crate) fn get_now_timestamp_checked( &self, - payload: TransactionPayload, - ) -> CliTypedResult { - let client = self.rest_client()?; - let (sender_public_key, sender_address) = self.get_public_key_and_address()?; + onchain_timestamp_usecs: u64, + ) -> CliTypedResult { + // Retrieve local time, and ensure it's within an expected skew of the blockchain + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|err| CliError::UnexpectedError(err.to_string()))? + .as_secs(); + let now_usecs = now * US_IN_SECS; + // Warn local user that clock is skewed behind the blockchain. + // There will always be a little lag from real time to blockchain time + if now_usecs < onchain_timestamp_usecs - ACCEPTED_CLOCK_SKEW_US { + eprintln!("Local clock is is skewed from blockchain clock. Clock is more than {} seconds behind the blockchain {}", ACCEPTED_CLOCK_SKEW_US, onchain_timestamp_usecs / US_IN_SECS ); + } + Ok(now) + } + + pub(crate) async fn compute_gas_price_and_max_gas( + &self, + payload: &TransactionPayload, + client: &Client, + sender_address: &AccountAddress, + sender_public_key: &Ed25519PublicKey, + sequence_number: u64, + chain_id: ChainId, + expiration_time_secs: u64, + ) -> CliTypedResult<(u64, u64)> { // Ask to confirm price if the gas unit price is estimated above the lowest value when // it is automatically estimated let ask_to_confirm_price; @@ -1623,27 +1644,6 @@ impl TransactionOptions { gas_unit_price }; - // Get sequence number for account - let (account, state) = get_account_with_state(&client, sender_address).await?; - let sequence_number = account.sequence_number; - - // Retrieve local time, and ensure it's within an expected skew of the blockchain - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map_err(|err| CliError::UnexpectedError(err.to_string()))? - .as_secs(); - let now_usecs = now * US_IN_SECS; - - // Warn local user that clock is skewed behind the blockchain. - // There will always be a little lag from real time to blockchain time - if now_usecs < state.timestamp_usecs - ACCEPTED_CLOCK_SKEW_US { - eprintln!("Local clock is is skewed from blockchain clock. Clock is more than {} seconds behind the blockchain {}", ACCEPTED_CLOCK_SKEW_US, state.timestamp_usecs / US_IN_SECS ); - } - let expiration_time_secs = now + self.gas_options.expiration_secs; - - let chain_id = ChainId::new(state.chain_id); - // TODO: Check auth key against current private key and provide a better message - let max_gas = if let Some(max_gas) = self.gas_options.max_gas { // If the gas unit price was estimated ask, but otherwise you've chosen hwo much you want to spend if ask_to_confirm_price { @@ -1657,7 +1657,7 @@ impl TransactionOptions { let unsigned_transaction = transaction_factory .payload(payload.clone()) - .sender(sender_address) + .sender(*sender_address) .sequence_number(sequence_number) .expiration_timestamp_secs(expiration_time_secs) .build(); @@ -1698,6 +1698,39 @@ impl TransactionOptions { adjusted_max_gas }; + Ok((gas_unit_price, max_gas)) + } + + /// Submit a transaction + pub async fn submit_transaction( + &self, + payload: TransactionPayload, + ) -> CliTypedResult { + let client = self.rest_client()?; + let (sender_public_key, sender_address) = self.get_public_key_and_address()?; + + // Get sequence number for account + let (account, state) = get_account_with_state(&client, sender_address).await?; + let sequence_number = account.sequence_number; + + let now = self.get_now_timestamp_checked(state.timestamp_usecs)?; + let expiration_time_secs = now + self.gas_options.expiration_secs; + + let chain_id = ChainId::new(state.chain_id); + // TODO: Check auth key against current private key and provide a better message + + let (gas_unit_price, max_gas) = self + .compute_gas_price_and_max_gas( + &payload, + &client, + &sender_address, + &sender_public_key, + sequence_number, + chain_id, + expiration_time_secs, + ) + .await?; + // Sign and submit transaction let transaction_factory = TransactionFactory::new(chain_id) .with_gas_unit_price(gas_unit_price) diff --git a/crates/aptos/src/move_tool/mod.rs b/crates/aptos/src/move_tool/mod.rs index 5d31321d96c05..31b44b4114004 100644 --- a/crates/aptos/src/move_tool/mod.rs +++ b/crates/aptos/src/move_tool/mod.rs @@ -1,6 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +use self::submit_repeatedly::submit_repeatedly; use crate::{ account::derive_resource_account::ResourceAccountSeed, common::{ @@ -72,6 +73,7 @@ mod manifest; pub mod package_hooks; mod show; pub mod stored_package; +mod submit_repeatedly; /// Tool for Move related operations /// @@ -99,6 +101,7 @@ pub enum MoveTool { Publish(PublishPackage), Run(RunFunction), RunScript(RunScript), + RunRepeatedly(RunFunctionRepeatedly), #[clap(subcommand, hide = true)] Show(show::ShowTool), Test(TestPackage), @@ -131,6 +134,7 @@ impl MoveTool { MoveTool::Prove(tool) => tool.execute_serialized().await, MoveTool::Publish(tool) => tool.execute_serialized().await, MoveTool::Run(tool) => tool.execute_serialized().await, + MoveTool::RunRepeatedly(tool) => tool.execute_serialized().await, MoveTool::RunScript(tool) => tool.execute_serialized().await, MoveTool::Show(tool) => tool.execute_serialized().await, MoveTool::Test(tool) => tool.execute_serialized().await, @@ -1446,6 +1450,43 @@ impl CliCommand for RunFunction { } } +/// Run a Move function +#[derive(Parser)] +pub struct RunFunctionRepeatedly { + #[clap(flatten)] + pub(crate) entry_function_args: EntryFunctionArguments, + #[clap(flatten)] + pub(crate) txn_options: TransactionOptions, + + #[clap(long)] + num_times: usize, + + #[clap(long, default_value = "10")] + single_request_api_batch_size: usize, + + #[clap(long, default_value = "10")] + parallel_requests_outstanding: usize, +} + +#[async_trait] +impl CliCommand for RunFunctionRepeatedly { + fn command_name(&self) -> &'static str { + "RunFunctionRepeatedly" + } + + async fn execute(self) -> CliTypedResult { + submit_repeatedly( + &self.txn_options, + TransactionPayload::EntryFunction(self.entry_function_args.try_into()?), + self.num_times, + self.single_request_api_batch_size, + self.parallel_requests_outstanding, + ) + .await + .map(|v| format!("Committed {} txns", v)) + } +} + /// Run a view function #[derive(Parser)] pub struct ViewFunction { diff --git a/crates/aptos/src/move_tool/submit_repeatedly.rs b/crates/aptos/src/move_tool/submit_repeatedly.rs new file mode 100644 index 0000000000000..e6fd254fd86b9 --- /dev/null +++ b/crates/aptos/src/move_tool/submit_repeatedly.rs @@ -0,0 +1,96 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::common::{ + types::{AccountType, CliError, CliTypedResult, TransactionOptions}, + utils::{get_account_with_state, prompt_yes_with_override}, +}; +use aptos_experimental_bulk_txn_submit::{ + coordinator::execute_txn_list, workloads::FixedPayloadSignedTransactionBuilder, +}; +use aptos_sdk::{transaction_builder::TransactionFactory, types::LocalAccount}; +use aptos_types::{chain_id::ChainId, transaction::TransactionPayload}; +use std::time::Duration; + +/// For transaction payload and options, either get gas profile or submit for execution. +pub async fn submit_repeatedly( + txn_options_ref: &TransactionOptions, + payload: TransactionPayload, + num_times: usize, + single_request_api_batch_size: usize, + parallel_requests_outstanding: usize, +) -> CliTypedResult { + if txn_options_ref.profile_gas || txn_options_ref.benchmark || txn_options_ref.local { + return Err(CliError::UnexpectedError( + "Cannot perform profiling, benchmarking or local execution for submit repeatedly." + .to_string(), + )); + } + + let client = txn_options_ref.rest_client()?; + let (sender_public_key, sender_address) = txn_options_ref.get_public_key_and_address()?; + + // Get sequence number for account + let (account, state) = get_account_with_state(&client, sender_address).await?; + let sequence_number = account.sequence_number; + + let sender_account = match txn_options_ref.get_transaction_account_type()? { + AccountType::Local => { + let (private_key, _) = txn_options_ref.get_key_and_address()?; + LocalAccount::new(sender_address, private_key, sequence_number) + }, + AccountType::HardwareWallet => { + return Err(CliError::UnexpectedError( + "Cannot use hardware wallet to submit repeatedly.".to_string(), + )); + }, + }; + + let now = txn_options_ref.get_now_timestamp_checked(state.timestamp_usecs)?; + let expiration_time_secs = now + txn_options_ref.gas_options.expiration_secs; + + let chain_id = ChainId::new(state.chain_id); + // TODO: Check auth key against current private key and provide a better message + + let (gas_unit_price, max_gas) = txn_options_ref + .compute_gas_price_and_max_gas( + &payload, + &client, + &sender_address, + &sender_public_key, + sequence_number, + chain_id, + expiration_time_secs, + ) + .await?; + + // Sign and submit transaction + let transaction_factory = TransactionFactory::new(chain_id) + .with_gas_unit_price(gas_unit_price) + .with_max_gas_amount(max_gas) + .with_transaction_expiration_time(txn_options_ref.gas_options.expiration_secs); + + prompt_yes_with_override( + &format!( + "About to submit {} transactions and spend up to {} APT. Continue?", + num_times, + num_times as f32 * gas_unit_price as f32 * max_gas as f32 / 1e8 + ), + txn_options_ref.prompt_options, + )?; + + let results = execute_txn_list( + vec![sender_account], + vec![client], + (0..num_times).map(|_| ()).collect::>(), + single_request_api_batch_size, + parallel_requests_outstanding, + Duration::from_secs_f32(0.05), + transaction_factory, + FixedPayloadSignedTransactionBuilder::new(payload), + true, + ) + .await?; + + Ok(results.into_iter().filter(|v| *v == "success").count()) +} diff --git a/crates/transaction-emitter-lib/src/args.rs b/crates/transaction-emitter-lib/src/args.rs index 45fe819576cbb..99ff392327ac5 100644 --- a/crates/transaction-emitter-lib/src/args.rs +++ b/crates/transaction-emitter-lib/src/args.rs @@ -194,7 +194,7 @@ pub struct EmitArgs { #[clap(long, default_value = "false")] /// Skip minting account during initialization - pub skip_minting_accounts: bool, + pub skip_funding_accounts: bool, #[clap(long)] pub latency_polling_interval_s: Option, diff --git a/crates/transaction-emitter-lib/src/cluster.rs b/crates/transaction-emitter-lib/src/cluster.rs index b0cd4e73a4be5..a1584b3ef992f 100644 --- a/crates/transaction-emitter-lib/src/cluster.rs +++ b/crates/transaction-emitter-lib/src/cluster.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{emitter::query_sequence_number, instance::Instance, ClusterArgs}; +use crate::{emitter::load_specific_account, instance::Instance, ClusterArgs}; use anyhow::{anyhow, bail, format_err, Result}; use aptos_crypto::{ ed25519::{Ed25519PrivateKey, Ed25519PublicKey}, @@ -9,9 +9,7 @@ use aptos_crypto::{ }; use aptos_logger::{info, warn}; use aptos_rest_client::{Client as RestClient, State}; -use aptos_sdk::types::{ - account_config::aptos_test_root_address, chain_id::ChainId, AccountKey, LocalAccount, -}; +use aptos_sdk::types::{chain_id::ChainId, AccountKey, LocalAccount}; use futures::{stream::FuturesUnordered, StreamExt}; use rand::seq::SliceRandom; use std::{convert::TryFrom, time::Instant}; @@ -200,22 +198,7 @@ impl Cluster { } pub async fn load_coin_source_account(&self, client: &RestClient) -> Result { - let account_key = self.account_key(); - let address = if self.coin_source_is_root { - aptos_test_root_address() - } else { - account_key.authentication_key().account_address() - }; - - let sequence_number = query_sequence_number(client, address).await.map_err(|e| { - format_err!( - "query_sequence_number on {:?} for account {} failed: {:?}", - client, - address, - e - ) - })?; - Ok(LocalAccount::new(address, account_key, sequence_number)) + load_specific_account(self.account_key(), self.coin_source_is_root, client).await } pub fn random_instance(&self) -> Instance { diff --git a/crates/transaction-emitter-lib/src/emitter/account_minter.rs b/crates/transaction-emitter-lib/src/emitter/account_minter.rs index a96e91d11eb1d..875b3a5cfbf86 100644 --- a/crates/transaction-emitter-lib/src/emitter/account_minter.rs +++ b/crates/transaction-emitter-lib/src/emitter/account_minter.rs @@ -1,8 +1,13 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{emitter::local_account_generator::LocalAccountGenerator, EmitJobRequest}; +use super::{ + local_account_generator::LocalAccountGenerator, parse_seed, + transaction_executor::RestApiReliableTransactionSubmitter, +}; +use crate::EmitJobRequest; use anyhow::{anyhow, bail, format_err, Context, Result}; +use aptos_config::config::DEFAULT_MAX_SUBMIT_TRANSACTION_BATCH_SIZE; use aptos_crypto::{ ed25519::{Ed25519PrivateKey, Ed25519PublicKey}, encoding_type::EncodingType, @@ -16,13 +21,10 @@ use aptos_sdk::{ }, }; use aptos_transaction_generator_lib::{ - CounterState, ReliableTransactionSubmitter, RootAccountHandle, SEND_AMOUNT, -}; -use core::{ - cmp::min, - result::Result::{Err, Ok}, + CounterState, ReliableTransactionSubmitter, RootAccountHandle, }; -use futures::StreamExt; +use core::result::Result::{Err, Ok}; +use futures::{future::try_join_all, StreamExt}; use rand::{rngs::StdRng, Rng, SeedableRng}; use std::{ path::Path, @@ -33,7 +35,8 @@ use std::{ pub struct SourceAccountManager<'t> { pub source_account: &'t LocalAccount, pub txn_executor: &'t dyn ReliableTransactionSubmitter, - pub req: &'t EmitJobRequest, + pub mint_to_root: bool, + pub prompt_before_spending: bool, pub txn_factory: TransactionFactory, } @@ -55,7 +58,7 @@ impl<'t> SourceAccountManager<'t> { .txn_executor .get_account_balance(self.source_account.address()) .await?; - Ok(if self.req.mint_to_root { + Ok(if self.mint_to_root { // We have a root account, so amount of funds minted is not a problem // We can have multiple txn emitter running simultaneously, each coming to this check at the same time. // So they might all pass the check, but not be able to consume funds they need. So we check more conservatively @@ -102,7 +105,7 @@ impl<'t> SourceAccountManager<'t> { )); } - if self.req.prompt_before_spending { + if self.prompt_before_spending { if !prompt_yes(&format!( "plan will consume in total {} balance for {}, are you sure you want to proceed", amount, @@ -151,7 +154,7 @@ impl<'t> SourceAccountManager<'t> { pub struct AccountMinter<'t> { txn_factory: TransactionFactory, - rng: StdRng, + account_rng: StdRng, source_account: &'t SourceAccountManager<'t>, } @@ -159,39 +162,12 @@ impl<'t> AccountMinter<'t> { pub fn new( source_account: &'t SourceAccountManager<'t>, txn_factory: TransactionFactory, - rng: StdRng, + account_rng: StdRng, ) -> Self { Self { source_account, txn_factory, - rng, - } - } - - pub fn get_needed_balance_per_account(&self, req: &EmitJobRequest, num_accounts: usize) -> u64 { - if let Some(val) = req.coins_per_account_override { - info!(" with {} balance each because of override", val); - val - } else { - // round up: - let txnx_per_account = - (req.expected_max_txns + num_accounts as u64 - 1) / num_accounts as u64; - let min_balance = req.max_gas_per_txn * req.gas_price; - let coins_per_account = txnx_per_account - .checked_mul(SEND_AMOUNT + req.get_expected_gas_per_txn() * req.gas_price) - .unwrap() - .checked_add(min_balance) - .unwrap(); // extra coins for secure to pay none zero gas price - - info!( - " with {} balance each because of expecting {} txns per account, with {} gas at {} gas price per txn, and min balance {}", - coins_per_account, - txnx_per_account, - req.get_expected_gas_per_txn(), - req.gas_price, - min_balance, - ); - coins_per_account + account_rng, } } @@ -249,21 +225,22 @@ impl<'t> AccountMinter<'t> { pub async fn create_and_fund_accounts( &mut self, txn_executor: &dyn ReliableTransactionSubmitter, - req: &EmitJobRequest, account_generator: Box, - max_submit_batch_size: usize, local_accounts: Vec>, + coins_per_account: u64, + max_submit_batch_size: usize, + mint_to_root: bool, + create_secondary_source_account: bool, ) -> Result<()> { let num_accounts = local_accounts.len(); info!( - "Account creation plan created for {} accounts and {} txns:", - num_accounts, req.expected_max_txns, + "Account creation plan created for {} accounts and {} coins per account", + num_accounts, coins_per_account, ); let expected_num_seed_accounts = (num_accounts / 50).clamp(1, (num_accounts as f32).sqrt() as usize + 1); - let coins_per_account = self.get_needed_balance_per_account(req, num_accounts); let expected_children_per_seed_account = (num_accounts + expected_num_seed_accounts - 1) / expected_num_seed_accounts; @@ -275,7 +252,7 @@ impl<'t> AccountMinter<'t> { self.txn_factory.get_gas_unit_price(), ); let coins_for_source = Self::funds_needed_for_multi_transfer( - if req.mint_to_root { "root" } else { "source" }, + if mint_to_root { "root" } else { "source" }, expected_num_seed_accounts as u64, coins_per_seed_account, self.txn_factory.get_max_gas_amount(), @@ -288,8 +265,8 @@ impl<'t> AccountMinter<'t> { .await? { // recheck value makes sense for auto-approval. - let max_allowed = (3 * req.expected_max_txns as u128) - .checked_mul((req.get_expected_gas_per_txn() * req.gas_price).into()) + let max_allowed = (3 * coins_per_account as u128) + .checked_mul(num_accounts as u128) .unwrap(); assert!(coins_for_source as u128 <= max_allowed, "Overhead too large to consume funds without approval - estimated total coins needed for load test ({}) are larger than expected_max_txns * expected_gas_per_txn, multiplied by 3 to account for rounding up and overheads ({})", @@ -298,7 +275,7 @@ impl<'t> AccountMinter<'t> { ); } - let new_source_account = if !req.coordination_delay_between_instances.is_zero() { + let new_source_account = if create_secondary_source_account { Some( self.create_new_source_account(txn_executor, coins_for_source) .await?, @@ -405,16 +382,14 @@ impl<'t> AccountMinter<'t> { "Creating and funding seeds accounts (txn {} gas price)", self.txn_factory.get_gas_unit_price() ); - let mut i = 0; - let mut seed_accounts = vec![]; - while i < seed_account_num { - let batch_size = min(max_submit_batch_size, seed_account_num - i); - let mut rng = StdRng::from_rng(self.rng()).unwrap(); - let mut batch = account_generator - .gen_local_accounts(txn_executor, batch_size, &mut rng) - .await?; + + let seed_accounts = account_generator + .gen_local_accounts(txn_executor, seed_account_num, self.account_rng()) + .await?; + + for chunk in seed_accounts.chunks(max_submit_batch_size) { let txn_factory = &self.txn_factory; - let create_requests: Vec<_> = batch + let create_requests: Vec<_> = chunk .iter() .map(|account| { create_and_fund_account_request( @@ -432,9 +407,6 @@ impl<'t> AccountMinter<'t> { txn_executor .execute_transactions_with_counter(&create_requests, counters) .await?; - - i += batch_size; - seed_accounts.append(&mut batch); } Ok(seed_accounts) @@ -477,7 +449,7 @@ impl<'t> AccountMinter<'t> { .await?, ); - let new_source_account = LocalAccount::generate(self.rng()); + let new_source_account = LocalAccount::generate(self.account_rng()); let txn = create_and_fund_account_request( self.source_account.get_root_account(), coins_for_source, @@ -510,8 +482,8 @@ impl<'t> AccountMinter<'t> { bail!("Couldn't create new source account"); } - pub fn rng(&mut self) -> &mut StdRng { - &mut self.rng + pub fn account_rng(&mut self) -> &mut StdRng { + &mut self.account_rng } } @@ -585,3 +557,151 @@ pub fn prompt_yes(prompt: &str) -> bool { } result.unwrap() } + +pub struct BulkAccountCreationConfig { + max_submit_batch_size: usize, + skip_funding_accounts: bool, + seed: Option<[u8; 32]>, + mint_to_root: bool, + prompt_before_spending: bool, + create_secondary_source_account: bool, + expected_gas_per_transfer: u64, + expected_gas_per_account_create: u64, +} + +impl BulkAccountCreationConfig { + pub fn new( + max_submit_batch_size: usize, + skip_funding_accounts: bool, + seed: Option<&str>, + mint_to_root: bool, + prompt_before_spending: bool, + create_secondary_source_account: bool, + expected_gas_per_transfer: u64, + expected_gas_per_account_create: u64, + ) -> Self { + Self { + max_submit_batch_size, + skip_funding_accounts, + seed: seed.map(parse_seed), + mint_to_root, + prompt_before_spending, + create_secondary_source_account, + expected_gas_per_transfer, + expected_gas_per_account_create, + } + } +} + +impl From<&EmitJobRequest> for BulkAccountCreationConfig { + fn from(req: &EmitJobRequest) -> Self { + Self { + max_submit_batch_size: DEFAULT_MAX_SUBMIT_TRANSACTION_BATCH_SIZE, + skip_funding_accounts: req.skip_funding_accounts, + seed: req.account_minter_seed, + mint_to_root: req.mint_to_root, + prompt_before_spending: req.prompt_before_spending, + create_secondary_source_account: !req.coordination_delay_between_instances.is_zero(), + expected_gas_per_transfer: req.get_expected_gas_per_transfer(), + expected_gas_per_account_create: req.get_expected_gas_per_account_create(), + } + } +} + +pub async fn bulk_create_accounts( + coin_source_account: &LocalAccount, + txn_executor: &RestApiReliableTransactionSubmitter, + txn_factory: &TransactionFactory, + account_generator: Box, + config: BulkAccountCreationConfig, + num_accounts: usize, + coins_per_account: u64, +) -> Result> { + let source_account_manager = SourceAccountManager { + source_account: coin_source_account, + txn_executor, + mint_to_root: config.mint_to_root, + prompt_before_spending: config.prompt_before_spending, + txn_factory: txn_factory.clone(), + }; + + let seed = config.seed.unwrap_or_else(|| { + let mut rng = StdRng::from_entropy(); + rng.gen() + }); + info!( + "AccountMinter Seed (reuse accounts by passing into --account-minter-seed): {:?}", + seed + ); + + let accounts = account_generator + .gen_local_accounts(txn_executor, num_accounts, &mut StdRng::from_seed(seed)) + .await?; + info!( + "Generated and fetched re-usable accounts for seed {:?}", + seed + ); + + let all_accounts_already_exist = accounts.iter().all(|account| account.sequence_number() > 0); + let send_money_gas = if all_accounts_already_exist { + config.expected_gas_per_transfer + } else { + config.expected_gas_per_account_create + }; + + let mut account_minter = AccountMinter::new( + &source_account_manager, + txn_factory.clone().with_max_gas_amount(send_money_gas), + // Wrap seed once, to not have conflicts between worker and seed accounts. + // We also don't want to continue from the same rng, as number of accounts will affect + // seed accounts. + StdRng::from_seed(StdRng::from_seed(seed).gen()), + ); + + if !config.skip_funding_accounts { + let accounts: Vec<_> = accounts.into_iter().map(Arc::new).collect(); + account_minter + .create_and_fund_accounts( + txn_executor, + account_generator, + accounts.clone(), + coins_per_account, + config.max_submit_batch_size, + config.mint_to_root, + config.create_secondary_source_account, + ) + .await?; + let accounts: Vec<_> = accounts + .into_iter() + .map(|a| Arc::try_unwrap(a).unwrap()) + .collect(); + info!("Accounts created and funded"); + Ok(accounts) + } else { + info!( + "Account reuse plan created for {} accounts and min balance {}", + accounts.len(), + coins_per_account, + ); + + let balance_futures = accounts + .iter() + .map(|account| txn_executor.get_account_balance(account.address())); + let balances: Vec<_> = try_join_all(balance_futures).await?; + accounts + .iter() + .zip(balances) + .for_each(|(account, balance)| { + assert!( + balance >= coins_per_account, + "Account {} has balance {} < needed_min_balance {}", + account.address(), + balance, + coins_per_account + ); + }); + + info!("Skipping funding accounts"); + Ok(accounts) + } +} diff --git a/crates/transaction-emitter-lib/src/emitter/mod.rs b/crates/transaction-emitter-lib/src/emitter/mod.rs index 532b89ef77683..7de8c592a99c6 100644 --- a/crates/transaction-emitter-lib/src/emitter/mod.rs +++ b/crates/transaction-emitter-lib/src/emitter/mod.rs @@ -8,8 +8,8 @@ pub mod submission_worker; pub mod transaction_executor; use crate::emitter::{ - account_minter::{AccountMinter, SourceAccountManager}, - local_account_generator::{create_account_generator, LocalAccountGenerator}, + account_minter::{bulk_create_accounts, SourceAccountManager}, + local_account_generator::create_account_generator, stats::{DynamicStatsTracking, TxnStats}, submission_worker::SubmissionWorker, transaction_executor::RestApiReliableTransactionSubmitter, @@ -22,11 +22,12 @@ use aptos_rest_client::{aptos_api_types::AptosErrorCode, error::RestError, Clien use aptos_sdk::{ move_types::account_address::AccountAddress, transaction_builder::{aptos_stdlib, TransactionFactory}, - types::{transaction::SignedTransaction, LocalAccount}, + types::{transaction::SignedTransaction, AccountKey, LocalAccount}, }; use aptos_transaction_generator_lib::{ - create_txn_generator_creator, AccountType, ReliableTransactionSubmitter, TransactionType, + create_txn_generator_creator, AccountType, TransactionType, SEND_AMOUNT, }; +use aptos_types::account_config::aptos_test_root_address; use futures::future::{try_join_all, FutureExt}; use once_cell::sync::Lazy; use rand::{rngs::StdRng, seq::IteratorRandom, Rng}; @@ -46,6 +47,9 @@ use tokio::{runtime::Handle, task::JoinHandle, time}; // Max is 100k TPS for 3 hours const MAX_TXNS: u64 = 1_000_000_000; +pub const EXPECTED_GAS_PER_TRANSFER: u64 = 10; +pub const EXPECTED_GAS_PER_ACCOUNT_CREATE: u64 = 2000 + 10; + const MAX_RETRIES: usize = 12; // This retry policy is used for important client calls necessary for setting @@ -144,6 +148,7 @@ pub struct EmitJobRequest { transaction_mix_per_phase: Vec>, account_type: AccountType, + max_gas_per_txn: u64, init_max_gas_per_txn: Option, @@ -159,7 +164,7 @@ pub struct EmitJobRequest { init_gas_price_multiplier: u64, mint_to_root: bool, - skip_minting_accounts: bool, + skip_funding_accounts: bool, txn_expiration_time_secs: u64, init_expiration_multiplier: f64, @@ -189,15 +194,15 @@ impl Default for EmitJobRequest { init_max_gas_per_txn: None, init_gas_price_multiplier: 2, mint_to_root: false, - skip_minting_accounts: false, + skip_funding_accounts: false, txn_expiration_time_secs: 60, init_expiration_multiplier: 3.0, init_retry_interval: Duration::from_secs(10), num_accounts_mode: NumAccountsMode::TransactionsPerAccount(20), expected_max_txns: MAX_TXNS, expected_gas_per_txn: None, - expected_gas_per_transfer: 10, - expected_gas_per_account_create: 2000 + 5, + expected_gas_per_transfer: EXPECTED_GAS_PER_TRANSFER, + expected_gas_per_account_create: EXPECTED_GAS_PER_ACCOUNT_CREATE, prompt_before_spending: false, coordination_delay_between_instances: Duration::from_secs(0), latency_polling_interval: Duration::from_millis(300), @@ -332,8 +337,8 @@ impl EmitJobRequest { self } - pub fn skip_minting_accounts(mut self) -> Self { - self.skip_minting_accounts = true; + pub fn skip_funding_accounts(mut self) -> Self { + self.skip_funding_accounts = true; self } @@ -460,7 +465,7 @@ impl EmitJobRequest { }; info!( - " Transaction emitter targeting {} TPS, expecting {} TPS", + " Transaction emitter targetting {} TPS, expecting {} TPS", tps, num_accounts * transactions_per_account / wait_seconds as usize ); @@ -699,20 +704,21 @@ impl TxnEmitter { .with_transaction_expiration_time(init_expiration_time); let init_retries: usize = usize::try_from(init_expiration_time / req.init_retry_interval.as_secs()).unwrap(); - let seed = req.account_minter_seed.unwrap_or_else(|| self.rng.gen()); let account_generator = create_account_generator(req.account_type); - let mut all_accounts = create_accounts( + let mut all_accounts = bulk_create_accounts( root_account, + &RestApiReliableTransactionSubmitter::new( + req.rest_clients.clone(), + init_retries, + req.init_retry_interval, + ), &init_txn_factory, account_generator, - &req, - mode_params.max_submit_batch_size, - req.skip_minting_accounts, - seed, + (&req).into(), num_accounts, - init_retries, + get_needed_balance_per_account_from_req(&req, num_accounts), ) .await?; @@ -720,16 +726,17 @@ impl TxnEmitter { let stats = Arc::new(DynamicStatsTracking::new(stats_tracking_phases)); let tokio_handle = Handle::current(); - let txn_executor = RestApiReliableTransactionSubmitter { - rest_clients: req.rest_clients.clone(), - max_retries: init_retries, - retry_after: req.init_retry_interval, - }; + let txn_executor = RestApiReliableTransactionSubmitter::new( + req.rest_clients.clone(), + init_retries, + req.init_retry_interval, + ); let source_account_manager = SourceAccountManager { source_account: root_account, txn_executor: &txn_executor, - req: &req, txn_factory: init_txn_factory.clone(), + mint_to_root: req.mint_to_root, + prompt_before_spending: req.prompt_before_spending, }; let (txn_generator_creator, _, _) = create_txn_generator_creator( &req.transaction_mix_per_phase, @@ -1062,8 +1069,9 @@ pub async fn query_sequence_numbers<'a, I>( where I: Iterator, { - let futures = addresses - .map(|address| RETRY_POLICY.retry(move || get_account_if_exists(client, *address))); + let futures = addresses.map(|address| { + RETRY_POLICY.retry(move || get_account_address_and_seq_num(client, *address)) + }); let (seq_nums, timestamps): (Vec<_>, Vec<_>) = try_join_all(futures) .await @@ -1076,14 +1084,23 @@ where Ok((seq_nums, timestamps.into_iter().min().unwrap())) } -async fn get_account_if_exists( +async fn get_account_address_and_seq_num( client: &RestClient, address: AccountAddress, ) -> Result<((AccountAddress, u64), u64)> { + get_account_seq_num(client, address) + .await + .map(|(seq_num, ts)| ((address, seq_num), ts)) +} + +pub async fn get_account_seq_num( + client: &RestClient, + address: AccountAddress, +) -> Result<(u64, u64)> { let result = client.get_account_bcs(address).await; match &result { Ok(resp) => Ok(( - (address, resp.inner().sequence_number()), + resp.inner().sequence_number(), Duration::from_micros(resp.state().timestamp_usecs).as_secs(), )), Err(e) => { @@ -1091,7 +1108,7 @@ async fn get_account_if_exists( if let RestError::Api(api_error) = e { if let AptosErrorCode::AccountNotFound = api_error.error.error_code { return Ok(( - (address, 0), + 0, Duration::from_micros(api_error.state.as_ref().unwrap().timestamp_usecs) .as_secs(), )); @@ -1103,6 +1120,28 @@ async fn get_account_if_exists( } } +pub async fn load_specific_account( + account_key: AccountKey, + is_root: bool, + client: &RestClient, +) -> Result { + let address = if is_root { + aptos_test_root_address() + } else { + account_key.authentication_key().account_address() + }; + + let sequence_number = query_sequence_number(client, address).await.map_err(|e| { + format_err!( + "query_sequence_number on {:?} for account {} failed: {:?}", + client, + address, + e + ) + })?; + Ok(LocalAccount::new(address, account_key, sequence_number)) +} + pub fn gen_transfer_txn_request( sender: &mut LocalAccount, receiver: &AccountAddress, @@ -1128,103 +1167,50 @@ pub fn parse_seed(seed_string: &str) -> [u8; 32] { .expect("failed to convert to array") } -pub async fn create_accounts( - root_account: &LocalAccount, - txn_factory: &TransactionFactory, - account_generator: Box, - req: &EmitJobRequest, - max_submit_batch_size: usize, - skip_minting_accounts: bool, - seed: [u8; 32], +pub fn get_needed_balance_per_account( + num_workload_transactions: u64, + gas_per_workload_transaction: u64, + octas_per_workload_transaction: u64, num_accounts: usize, - retries: usize, -) -> Result> { - info!( - "Using reliable/retriable init transaction executor with {} retries, every {}s", - retries, - req.init_retry_interval.as_secs_f32() - ); + gas_price: u64, + max_gas_per_txn: u64, +) -> u64 { + // round up: + let txnx_per_account = + (num_workload_transactions + num_accounts as u64 - 1) / num_accounts as u64; + let coins_per_account = txnx_per_account + .checked_mul(octas_per_workload_transaction + gas_per_workload_transaction * gas_price) + .unwrap() + .checked_add(max_gas_per_txn * gas_price) + .unwrap(); info!( - "AccountMinter Seed (reuse accounts by passing into --account-minter-seed): {:?}", - seed - ); - let txn_executor = RestApiReliableTransactionSubmitter { - rest_clients: req.rest_clients.clone(), - max_retries: retries, - retry_after: req.init_retry_interval, - }; - let source_account_manager = SourceAccountManager { - source_account: root_account, - txn_executor: &txn_executor, - req, - txn_factory: txn_factory.clone(), - }; - - let mut rng = StdRng::from_seed(seed); - - let accounts = account_generator - .gen_local_accounts(&txn_executor, num_accounts, &mut rng) - .await?; - - info!("Generated re-usable accounts for seed {:?}", seed); - - let all_accounts_already_exist = accounts.iter().all(|account| account.sequence_number() > 0); - let send_money_gas = if all_accounts_already_exist { - req.get_expected_gas_per_transfer() - } else { - req.get_expected_gas_per_account_create() - }; - - let mut account_minter = AccountMinter::new( - &source_account_manager, - txn_factory.clone().with_max_gas_amount(send_money_gas), - StdRng::from_seed(seed), + "Needed {} balance for each account because of expecting {} txns per account with {} gas and {} octas, with leaving {} gas for max_txn_gas, all at {} gas price", + coins_per_account, + txnx_per_account, + gas_per_workload_transaction, + octas_per_workload_transaction, + max_gas_per_txn, + gas_price, ); + coins_per_account +} - if !skip_minting_accounts { - let accounts: Vec<_> = accounts.into_iter().map(Arc::new).collect(); - account_minter - .create_and_fund_accounts( - &txn_executor, - req, - account_generator, - max_submit_batch_size, - accounts.clone(), - ) - .await?; - let accounts: Vec<_> = accounts - .into_iter() - .map(|a| Arc::try_unwrap(a).unwrap()) - .collect(); - info!("Accounts created and funded"); - Ok(accounts) - } else { +pub fn get_needed_balance_per_account_from_req(req: &EmitJobRequest, num_accounts: usize) -> u64 { + if let Some(val) = req.coins_per_account_override { info!( - "Account reuse plan created for {} accounts and {} txns:", - accounts.len(), - req.expected_max_txns, + "Needed {} balance for each account because of override", + val ); - - let needed_min_balance = account_minter.get_needed_balance_per_account(req, accounts.len()); - let balance_futures = accounts - .iter() - .map(|account| txn_executor.get_account_balance(account.address())); - let balances: Vec<_> = try_join_all(balance_futures).await?; - accounts - .iter() - .zip(balances) - .for_each(|(account, balance)| { - assert!( - balance >= needed_min_balance, - "Account {} has balance {} < needed_min_balance {}", - account.address(), - balance, - needed_min_balance - ); - }); - - info!("Skipping minting accounts"); - Ok(accounts) + val + } else { + get_needed_balance_per_account( + req.expected_max_txns, + req.get_expected_gas_per_txn(), + SEND_AMOUNT, + num_accounts, + req.gas_price, + req.max_gas_per_txn, + ) } } diff --git a/crates/transaction-emitter-lib/src/emitter/transaction_executor.rs b/crates/transaction-emitter-lib/src/emitter/transaction_executor.rs index 9db067bbe6e0f..4a2cae20c5f13 100644 --- a/crates/transaction-emitter-lib/src/emitter/transaction_executor.rs +++ b/crates/transaction-emitter-lib/src/emitter/transaction_executor.rs @@ -3,7 +3,7 @@ use super::RETRY_POLICY; use anyhow::{Context, Result}; -use aptos_logger::{debug, sample, sample::SampleRate, warn}; +use aptos_logger::{debug, info, sample, sample::SampleRate, warn}; use aptos_rest_client::{aptos_api_types::AptosErrorCode, error::RestError, Client as RestClient}; use aptos_sdk::{ move_types::account_address::AccountAddress, types::transaction::SignedTransaction, @@ -19,12 +19,25 @@ use std::{ // Reliable/retrying transaction executor, used for initializing pub struct RestApiReliableTransactionSubmitter { - pub rest_clients: Vec, - pub max_retries: usize, - pub retry_after: Duration, + rest_clients: Vec, + max_retries: usize, + retry_after: Duration, } impl RestApiReliableTransactionSubmitter { + pub fn new(rest_clients: Vec, max_retries: usize, retry_after: Duration) -> Self { + info!( + "Using reliable/retriable init transaction executor with {} retries, every {}s", + max_retries, + retry_after.as_secs_f32() + ); + Self { + rest_clients, + max_retries, + retry_after, + } + } + fn random_rest_client(&self) -> &RestClient { let mut rng = thread_rng(); self.rest_clients.choose(&mut rng).unwrap() diff --git a/crates/transaction-emitter-lib/src/wrappers.rs b/crates/transaction-emitter-lib/src/wrappers.rs index 9f6659e5e9b98..c1447682c3e74 100644 --- a/crates/transaction-emitter-lib/src/wrappers.rs +++ b/crates/transaction-emitter-lib/src/wrappers.rs @@ -5,18 +5,19 @@ use crate::{ args::{ClusterArgs, EmitArgs}, cluster::Cluster, emitter::{ - create_accounts, local_account_generator::PrivateKeyAccountGenerator, parse_seed, - stats::TxnStats, EmitJobMode, EmitJobRequest, NumAccountsMode, TxnEmitter, + account_minter::bulk_create_accounts, get_needed_balance_per_account_from_req, + local_account_generator::PrivateKeyAccountGenerator, stats::TxnStats, + transaction_executor::RestApiReliableTransactionSubmitter, EmitJobMode, EmitJobRequest, + NumAccountsMode, TxnEmitter, }, instance::Instance, CreateAccountsArgs, }; use anyhow::{bail, Context, Result}; -use aptos_config::config::DEFAULT_MAX_SUBMIT_TRANSACTION_BATCH_SIZE; use aptos_logger::{error, info}; use aptos_sdk::transaction_builder::TransactionFactory; use aptos_transaction_generator_lib::{args::TransactionTypeArg, WorkflowProgress}; -use rand::{rngs::StdRng, Rng, SeedableRng}; +use rand::{rngs::StdRng, SeedableRng}; use std::time::{Duration, Instant}; pub async fn emit_transactions( @@ -153,8 +154,8 @@ pub async fn emit_transactions_with_cluster( .latency_polling_interval(Duration::from_secs_f32(latency_polling_interval_s)); } - if args.skip_minting_accounts { - emit_job_request = emit_job_request.skip_minting_accounts(); + if args.skip_funding_accounts { + emit_job_request = emit_job_request.skip_funding_accounts(); } let stats = emitter @@ -180,30 +181,32 @@ pub async fn create_accounts_command( let txn_factory = TransactionFactory::new(cluster.chain_id) .with_transaction_expiration_time(60) .with_max_gas_amount(create_accounts_args.max_gas_per_txn); - let emit_job_request = - EmitJobRequest::new(cluster.all_instances().map(Instance::rest_client).collect()) - .init_gas_price_multiplier(1) - .expected_gas_per_txn(create_accounts_args.max_gas_per_txn) - .max_gas_per_txn(create_accounts_args.max_gas_per_txn) - .coins_per_account_override(0) - .expected_max_txns(0) - .prompt_before_spending(); - let seed = match &create_accounts_args.account_minter_seed { - Some(str) => parse_seed(str), - None => StdRng::from_entropy().gen(), - }; - - create_accounts( + let rest_clients = cluster + .all_instances() + .map(Instance::rest_client) + .collect::>(); + let mut emit_job_request = EmitJobRequest::new(rest_clients.clone()) + .init_gas_price_multiplier(1) + .expected_gas_per_txn(create_accounts_args.max_gas_per_txn) + .max_gas_per_txn(create_accounts_args.max_gas_per_txn) + .coins_per_account_override(0) + .expected_max_txns(0) + .prompt_before_spending(); + + if let Some(seed) = &create_accounts_args.account_minter_seed { + emit_job_request = emit_job_request.account_minter_seed(seed); + } + + bulk_create_accounts( &coin_source_account, + &RestApiReliableTransactionSubmitter::new(rest_clients, 6, Duration::from_secs(10)), &txn_factory, Box::new(PrivateKeyAccountGenerator), - &emit_job_request, - DEFAULT_MAX_SUBMIT_TRANSACTION_BATCH_SIZE, - false, - seed, + (&emit_job_request).into(), create_accounts_args.count, - 4, + get_needed_balance_per_account_from_req(&emit_job_request, create_accounts_args.count), ) .await?; + Ok(()) } diff --git a/experimental/bulk-txn-submit/Cargo.toml b/experimental/bulk-txn-submit/Cargo.toml new file mode 100644 index 0000000000000..703a0515ff359 --- /dev/null +++ b/experimental/bulk-txn-submit/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "aptos-experimental-bulk-txn-submit" +description = "" +version = "0.0.0" + +# Workspace inherited keys +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +anyhow = { workspace = true } +aptos-config = { workspace = true } +aptos-crypto = { workspace = true } +aptos-global-constants = { workspace = true } +aptos-logger = { workspace = true } +aptos-sdk = { workspace = true } +aptos-transaction-emitter-lib = { workspace = true } +aptos-transaction-generator-lib = { workspace = true } +bcs = { workspace = true } +chrono = { workspace = true } +clap = { workspace = true } +futures = { workspace = true } +itertools = { workspace = true } +rand = { workspace = true } +reqwest = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } diff --git a/experimental/bulk-txn-submit/src/coordinator.rs b/experimental/bulk-txn-submit/src/coordinator.rs new file mode 100644 index 0000000000000..48aff3140b9b7 --- /dev/null +++ b/experimental/bulk-txn-submit/src/coordinator.rs @@ -0,0 +1,801 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + indexer::continuously_update_indexer_delay, + metrics::{spawn_async_tracking, Tracking}, + workloads::SignedTransactionBuilder, +}; +use anyhow::{bail, Result}; +use aptos_config::config::DEFAULT_MAX_SUBMIT_TRANSACTION_BATCH_SIZE; +use aptos_logger::{error, info, sample, sample::SampleRate, warn}; +use aptos_sdk::{ + move_types::account_address::AccountAddress, + rest_client::{ + aptos_api_types::{AptosError, AptosErrorCode, TransactionOnChainData}, + error::{AptosErrorResponse, RestError}, + Client, + }, + transaction_builder::{aptos_stdlib, TransactionFactory}, + types::LocalAccount, +}; +use aptos_transaction_emitter_lib::{ + emitter::{ + account_minter::{bulk_create_accounts, prompt_yes, BulkAccountCreationConfig}, + get_account_seq_num, get_needed_balance_per_account, + local_account_generator::{LocalAccountGenerator, PrivateKeyAccountGenerator}, + parse_seed, + transaction_executor::RestApiReliableTransactionSubmitter, + EXPECTED_GAS_PER_ACCOUNT_CREATE, EXPECTED_GAS_PER_TRANSFER, + }, + Cluster, ClusterArgs, +}; +use aptos_transaction_generator_lib::ReliableTransactionSubmitter; +use clap::Parser; +use futures::{future::join_all, StreamExt}; +use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng}; +use std::{ + collections::BTreeMap, + sync::{ + atomic::{AtomicI64, AtomicUsize}, + Arc, + }, + time::{Duration, Instant}, +}; + +#[derive(Debug, Parser)] +pub struct TransactionFactoryArgs { + #[clap(long)] + pub octas_per_workload_transaction: u64, + + #[clap(long, default_value = "100")] + gas_price: u64, + + #[clap(long)] + init_gas_price: Option, + + #[clap(long, default_value = "10000")] + init_max_gas_per_txn: u64, + + #[clap(long, default_value = "60")] + expiration_time_s: u64, +} + +impl TransactionFactoryArgs { + pub fn get_workload_max_gas_amount(&self) -> u64 { + self.octas_per_workload_transaction / self.gas_price + 1 + } + + pub fn with_init_params(&self, factory: TransactionFactory) -> TransactionFactory { + factory + .with_gas_unit_price(self.init_gas_price.unwrap_or(self.gas_price)) + .with_transaction_expiration_time(self.expiration_time_s) + .with_max_gas_amount(self.init_max_gas_per_txn) + } + + pub fn with_params(&self, factory: TransactionFactory) -> TransactionFactory { + factory + .with_gas_unit_price(self.gas_price) + .with_transaction_expiration_time(self.expiration_time_s) + .with_max_gas_amount(self.get_workload_max_gas_amount()) + } +} + +#[derive(Parser, Debug)] +pub struct AccountsArgs { + /// Number of accounts to create + #[clap(long)] + pub num_worker_accounts: usize, + + /// Optional seed for accounts used. If no seed is provided, a random seed is used and printed. + #[clap(long)] + pub accounts_seed: Option, +} + +#[derive(Parser, Debug)] +pub struct SubmitArgs { + #[clap(flatten)] + pub cluster_args: ClusterArgs, + + #[clap(flatten)] + pub transaction_factory_args: TransactionFactoryArgs, + + #[clap(flatten)] + pub accounts_args: AccountsArgs, + + #[clap(long)] + skip_funding_accounts: bool, + + #[clap(long, default_value = "10")] + batch_size: usize, + #[clap(long, default_value = "0.3")] + poll_interval_s: f32, + + #[clap(long)] + output_file: Option, +} + +#[derive(Parser, Debug)] +pub struct CreateSampleAddresses { + /// Number of accounts to create + #[clap(long)] + num_addresses: usize, + + #[clap(long)] + output_file: String, +} + +#[derive(Parser, Debug)] +pub struct CleanAddresses { + #[clap(long)] + pub destinations_file: String, + + #[clap(long)] + pub output_file: String, +} + +pub async fn execute_submit>( + work: Vec, + args: SubmitArgs, + builder: B, + cluster: Cluster, + coin_source_account: LocalAccount, + detailed_progress: bool, +) -> Result<()> { + let clients = cluster + .all_instances() + .map(|i| i.rest_client()) + .collect::>(); + let txn_factory = TransactionFactory::new(cluster.chain_id); + + let needed_balance_per_account = get_needed_balance_per_account( + work.len() as u64, + 0, + args.transaction_factory_args.octas_per_workload_transaction, + args.accounts_args.num_worker_accounts, + args.transaction_factory_args.gas_price, + args.transaction_factory_args.get_workload_max_gas_amount(), + ); + + let worker_accounts = create_worker_accounts( + clients.clone(), + coin_source_account, + args.transaction_factory_args + .with_init_params(txn_factory.clone()), + args.accounts_args.num_worker_accounts, + args.accounts_args.accounts_seed.as_deref(), + args.skip_funding_accounts, + cluster.coin_source_is_root, + needed_balance_per_account, + ) + .await?; + + if !prompt_yes("About to submit transactions. Continue?") { + bail!("User aborted") + } + + let output = execute_txn_list( + worker_accounts, + clients, + work, + 10, + args.batch_size, + Duration::from_secs_f32(args.poll_interval_s), + args.transaction_factory_args.with_params(txn_factory), + builder, + detailed_progress, + ) + .await?; + + if let Some(output_file) = args.output_file { + std::fs::write(output_file, output.join("\n"))?; + } else { + for txn in output { + println!("{}", txn); + } + } + + Ok(()) +} + +pub async fn execute_return_worker_funds( + transaction_factory_args: TransactionFactoryArgs, + accounts_args: AccountsArgs, + cluster: Cluster, + coin_source_account: &LocalAccount, +) -> Result<()> { + let return_funds_retries = 5; + let return_funds_retry_interval = Duration::from_secs(3); + + let clients = cluster + .all_instances() + .map(|i| i.rest_client()) + .collect::>(); + + let txn_factory = + transaction_factory_args.with_params(TransactionFactory::new(cluster.chain_id)); + + let txn_executor = RestApiReliableTransactionSubmitter::new( + clients, + return_funds_retries, + return_funds_retry_interval, + ); + + let accounts = PrivateKeyAccountGenerator + .gen_local_accounts( + &txn_executor, + accounts_args.num_worker_accounts, + &mut StdRng::from_seed(parse_seed(&accounts_args.accounts_seed.unwrap())), + ) + .await?; + + let txn_executor_ref = &txn_executor; + let counter = txn_executor_ref.create_counter_state(); + let counter_ref = &counter; + let txn_factory_ref = &txn_factory; + let _ = futures::stream::iter(accounts.iter().map(|account| async move { + loop { + if let Ok(balance) = txn_executor_ref + .get_account_balance(account.address()) + .await + { + if balance > 2 * txn_factory_ref.get_max_gas_amount() * txn_factory_ref.get_gas_unit_price() { + let txn = account.sign_with_transaction_builder(txn_factory_ref.payload( + aptos_stdlib::aptos_coin_transfer( + coin_source_account.address(), + balance + - txn_factory_ref.get_max_gas_amount() + * txn_factory_ref.get_gas_unit_price(), + ), + )); + if txn_executor_ref + .execute_transactions_with_counter(&[txn], counter_ref) + .await + .is_ok() + { + break; + } + } else { + break; + } + } + } + Ok::<(), ()>(()) + })) + .buffered(100) + .collect::>() + .await + .into_iter() + .flatten() + .collect::>(); + + Ok(()) +} + +async fn create_worker_accounts( + clients: Vec, + coin_source_account: LocalAccount, + init_txn_factory: TransactionFactory, + num_accounts: usize, + accounts_seed: Option<&str>, + skip_funding_accounts: bool, + coin_source_is_root: bool, + needed_balance_per_account: u64, +) -> Result> { + let account_funding_retries = 5; + let account_funding_retry_interval = Duration::from_secs(3); + + bulk_create_accounts( + &coin_source_account, + &RestApiReliableTransactionSubmitter::new( + clients, + account_funding_retries, + account_funding_retry_interval, + ), + &init_txn_factory, + Box::new(PrivateKeyAccountGenerator), + BulkAccountCreationConfig::new( + DEFAULT_MAX_SUBMIT_TRANSACTION_BATCH_SIZE, + skip_funding_accounts, + accounts_seed, + coin_source_is_root, + true, + false, + EXPECTED_GAS_PER_TRANSFER, + EXPECTED_GAS_PER_ACCOUNT_CREATE, + ), + num_accounts, + needed_balance_per_account, + ) + .await +} + +struct AccountWork { + account: LocalAccount, + work: Vec, + initial_seq_num: u64, +} + +impl AccountWork { + fn new(account: LocalAccount, work: Vec) -> Self { + let initial_seq_num = account.sequence_number(); + Self { + account, + work, + initial_seq_num, + } + } +} + +pub async fn execute_txn_list>( + accounts: Vec, + clients: Vec, + work: Vec, + single_request_api_batch_size: usize, + parallel_requests_outstanding: usize, + poll_interval: Duration, + txn_factory: TransactionFactory, + builder: B, + detailed_progress: bool, +) -> Result> { + let mut work_chunks = (0..accounts.len()).map(|_| vec![]).collect::>(); + for (i, work) in work.into_iter().enumerate() { + work_chunks[i % accounts.len()].push(work); + } + assert_eq!(work_chunks.len(), accounts.len()); + + let accounts_with_work = work_chunks + .into_iter() + .zip(accounts.into_iter()) + .map(|(work, account)| AccountWork::new(account, work)) + .collect::>(); + let txn_factory = &txn_factory; + let builder = &builder; + let clients = &clients; + + let indexer_delay = Arc::new(AtomicI64::new(0)); + + let tracking = Arc::new(Tracking::new(indexer_delay.clone(), detailed_progress)); + let tracking_ref = tracking.as_ref(); + + let tracking_done = spawn_async_tracking(tracking.clone(), Duration::from_secs(2)); + + let _task = tokio::spawn(continuously_update_indexer_delay( + txn_factory.get_chain_id(), + indexer_delay.clone(), + )); + let indexer_delay_ref = &indexer_delay; + + let start_time = Instant::now(); + + join_all( + accounts_with_work + .iter() + .map(|account_with_work| async move { + submit_work_txns( + &account_with_work.account, + account_with_work.initial_seq_num, + &account_with_work.work, + single_request_api_batch_size, + parallel_requests_outstanding, + builder, + txn_factory, + clients, + poll_interval, + tracking_ref, + indexer_delay_ref, + &BackoffConfig::default(), + ) + .await; + }), + ) + .await; + + let elapsed = start_time.elapsed().as_secs_f64(); + tracking_done.store(true, std::sync::atomic::Ordering::Relaxed); + tokio::time::sleep(Duration::from_secs(1)).await; + + warn!("Done executing work, fetching outputs"); + tracking.print_stats(elapsed); + + let progress = Arc::new(AtomicUsize::new(0)); + let done_tracking = spawn_async_tracking(progress.clone(), Duration::from_secs(10)); + let progress_ref = progress.as_ref(); + let out = futures::stream::iter(accounts_with_work.into_iter().map( + |account_with_work| async move { + fetch_work_txn_output( + &account_with_work.account, + account_with_work.initial_seq_num, + &account_with_work.work, + clients, + progress_ref, + ) + .await + }, + )) + .buffered(400) + .collect::>() + .await + .into_iter() + .flatten() + .collect::>(); + + done_tracking.store(true, std::sync::atomic::Ordering::Relaxed); + tokio::time::sleep(Duration::from_secs(1)).await; + + for (status, infos) in group_pairs(out.iter().map(|txn| { + ( + txn.1 + .as_ref() + .map_or("missing".to_string(), |t| format!("{:?}", t.info.status())), + txn, + ) + })) + .into_iter() + { + let gas_used = infos + .iter() + .map(|txn| txn.1.as_ref().map_or(0, |t| t.info.gas_used())) + .collect::>(); + info!( + "{:?}: {} txns, gas used: min: {:?}, max: {:?}", + status, + infos.len(), + gas_used.iter().min().unwrap(), + gas_used.iter().max().unwrap(), + ); + } + + let result = out + .iter() + .map(|(data, txn)| builder.success_output(data, txn)) + .collect::>(); + + Ok(result) +} + +fn group_pairs(v: I) -> BTreeMap> +where + A: Ord, + I: IntoIterator, +{ + let mut result = BTreeMap::>::new(); + for (a, b) in v { + result.entry(a).or_default().push(b); + } + result +} + +fn start_sleep_duration() -> Duration { + Duration::from_secs_f64(rand::thread_rng().gen_range(0.0, 5.0)) +} + +struct SingleBackoffConfig { + lower_threshold: f64, + threshold_gap: f64, +} + +impl SingleBackoffConfig { + fn backoff_multiplier(&self, delay: f64) -> f64 { + 1.0 - ((delay - self.lower_threshold).max(0.0) / self.threshold_gap) + .sqrt() + .min(1.0) + + // if delay > self.lower_threshold { + // // the bigger the delay, the more likely we should wait + // // if delay is above lower_threshold + threshold_gap, we completely pause the submission + // if rand::thread_rng().gen_bool( + // ((delay - self.lower_threshold) / self.threshold_gap) + // .sqrt() + // .min(1.0), + // ) { + // return true; + // } + // } + // false + } +} + +struct BackoffConfig { + indexer_backoff: SingleBackoffConfig, + blockchain_backoff: SingleBackoffConfig, +} + +impl Default for BackoffConfig { + fn default() -> Self { + Self { + indexer_backoff: SingleBackoffConfig { + lower_threshold: 2.0, + threshold_gap: 4.0, + }, + blockchain_backoff: SingleBackoffConfig { + lower_threshold: 1.3, + threshold_gap: 2.0, + }, + } + } +} + +async fn submit_work_txns>( + account: &LocalAccount, + initial_seq_num: u64, + work: &Vec, + single_request_api_batch_size: usize, + parallel_requests_outstanding: usize, + builder: &B, + txn_factory: &TransactionFactory, + clients: &Vec, + poll_interval: Duration, + tracking: &Tracking, + indexer_delay_ref: &AtomicI64, + backoff_config: &BackoffConfig, +) { + tokio::time::sleep(start_sleep_duration()).await; + + let mut consecutive_rollback = 0; + + let mut indexer_backoffs = 0; + let mut blockchain_backoffs = 0; + + 'outer: loop { + let indexer_delay = indexer_delay_ref.load(std::sync::atomic::Ordering::Relaxed) as f64; + let indexer_backoff_multiplier = backoff_config.indexer_backoff.backoff_multiplier(indexer_delay); + let last_blockchain_latency = tracking.get_last_latency(); + let blockchain_backoff_multiplier = backoff_config.blockchain_backoff.backoff_multiplier(last_blockchain_latency); + + if indexer_backoff_multiplier < 1.0 { + indexer_backoffs += 1; + } + + if blockchain_backoff_multiplier < 1.0 { + blockchain_backoffs += 1; + } + + let cur_parallel_requests_float = parallel_requests_outstanding as f64 * blockchain_backoff_multiplier * indexer_backoff_multiplier; + assert!(cur_parallel_requests_float <= parallel_requests_outstanding as f64); + let cur_parallel_requests = if cur_parallel_requests_float < 1.0 { + let sleep_duration = poll_interval.div_f64(cur_parallel_requests_float.max(0.01)); + println!("Sleeping for {:?}", sleep_duration); + tokio::time::sleep(sleep_duration).await; + 1 + } else { + (cur_parallel_requests_float as usize).max(1) + }; + // println!("cur_parallel_requests {}", cur_parallel_requests); + // let cur_parallel_requests = parallel_requests_outstanding; + + let start_seq_num = account.sequence_number(); + let chunk_start = (start_seq_num - initial_seq_num) as usize; + if chunk_start >= work.len() { + break; + } + let chunk = + &work[chunk_start..(work.len().min(chunk_start + cur_parallel_requests))]; + + let txns = chunk + .iter() + .map(|data| builder.build(data, account, txn_factory)) + .collect::>(); + + let client = pick_client(clients); + + let before_submitted_instant = Instant::now(); + + let min_failed = join_all(txns.chunks(single_request_api_batch_size).map( + |batch| async move { + match client.submit_batch_bcs(batch).await { + Err(e) => { + warn!("Error submitting batch: {:?}", e); + Some(batch.first().unwrap().sequence_number()) + }, + Ok(r) => { + let result = r.into_inner(); + if !result.transaction_failures.is_empty() { + warn!("Failed submission: {:?}", result.transaction_failures); + let first_failure = result + .transaction_failures + .iter() + .map(|tf| tf.transaction_index) + .min() + .unwrap(); + Some(batch[first_failure].sequence_number()) + } else { + None + } + }, + } + }, + )) + .await + .into_iter() + .flatten() + .min(); + + if let Some(min_failed) = min_failed { + account.set_sequence_number(min_failed); + if start_seq_num == account.sequence_number() { + tokio::time::sleep(poll_interval).await; + continue; + } + } + + let submitted_time = tracking.submitted(txns.len(), before_submitted_instant); + let end_onchain_ts = txns + .iter() + .map(|txn| txn.expiration_timestamp_secs()) + .max() + .unwrap(); + + let mut max_seen = start_seq_num; + loop { + match get_account_seq_num(client, account.address()).await { + Ok((seq_num, onchain_ts)) => { + if seq_num > max_seen { + consecutive_rollback = 0; + tracking + .committed_succesfully((seq_num - max_seen) as usize, submitted_time); + max_seen = seq_num; + } + + if seq_num == account.sequence_number() { + break; + } + assert!( + seq_num < account.sequence_number(), + "seq_num: {}, account.seq_num: {}", + seq_num, + account.sequence_number() + ); + + if onchain_ts > end_onchain_ts { + consecutive_rollback += 1; + + sample!( + SampleRate::Duration(Duration::from_secs(10)), + warn!("Rolling back account {} seq num from {} to {} (cur fetched {}). {} > {}. Consecutive rollback index {}", account.address(), account.sequence_number(), max_seen, seq_num, onchain_ts, end_onchain_ts, consecutive_rollback) + ); + account.set_sequence_number(max_seen); + if let Some(txn) = txns.iter().find(|txn| txn.sequence_number() == max_seen) + { + match client + .get_transaction_by_hash_bcs(txn.clone().committed_hash()) + .await + { + Ok(res) => { + sample!( + SampleRate::Duration(Duration::from_secs(1)), + warn!("Rollback txn status: {:?}", res.into_inner()) + ); + }, + Err(RestError::Api(AptosErrorResponse { + error: + AptosError { + error_code: AptosErrorCode::TransactionNotFound, + .. + }, + .. + })) => { + // no info to show + }, + Err(e) => { + sample!( + SampleRate::Duration(Duration::from_secs(1)), + warn!("Rollback error status: {:?}", e) + ); + }, + } + } + if consecutive_rollback >= 10 { + sample!( + SampleRate::Duration(Duration::from_secs(1)), + warn!( + "Too many consecutive rollbacks. Aborting {}", + account.address() + ) + ); + break 'outer; + } + break; + } + }, + Err(e) => { + sample!( + SampleRate::Duration(Duration::from_secs(1)), + warn!("Error getting account seq num: {:?}", e) + ); + }, + } + + tokio::time::sleep(poll_interval).await; + } + } + if indexer_backoffs > 0 || blockchain_backoffs > 0 { + warn!("Applied {} blockchain and {} indexer backoffs", blockchain_backoffs, indexer_backoffs); + } +} + +async fn fetch_work_txn_output( + account: &LocalAccount, + initial_seq_num: u64, + work: &[T], + clients: &Vec, + progress: &AtomicUsize, +) -> Vec<(T, Option)> { + tokio::time::sleep(start_sleep_duration()).await; + + let mut start = initial_seq_num; + let mut out = vec![]; + loop { + let client = pick_client(clients); + match client + .get_account_transactions_bcs(account.address(), Some(start), None) + .await + { + Ok(transactions) => { + for txn in transactions.inner().iter() { + out.push((work[out.len()].clone(), Some(txn.clone()))); + } + + let len = transactions.inner().len(); + progress.fetch_add(len, std::sync::atomic::Ordering::Relaxed); + + start += len as u64; + if start >= account.sequence_number() { + break; + } + if len == 0 { + sample!( + SampleRate::Duration(Duration::from_secs(1)), + error!( + "Account {} seq num {}..{} (work len {}), no more transasctions fetched at {}", + account.address(), + initial_seq_num, + account.sequence_number(), + work.len(), + start, + ), + ); + + break; + } + }, + Err(e) => { + sample!( + SampleRate::Duration(Duration::from_secs(1)), + warn!("Error getting account transactions: {:?}", e) + ); + }, + } + } + + while out.len() < work.len() { + out.push((work[out.len()].clone(), None)); + } + + out +} + +pub fn create_sample_addresses(args: CreateSampleAddresses) -> Result<()> { + let sample_address = AccountAddress::from_str_strict( + "0xabcd000000000000000000000000000000000000000000000000000000000000", + )?; + let addresses = (0..args.num_addresses) + .map(|mut i| { + let mut vec = sample_address.into_bytes(); + let mut index = 20; + while i > 0 { + vec[index] = (i % 256) as u8; + i /= 256; + index -= 1; + } + AccountAddress::new(vec).to_standard_string() + }) + .collect::>(); + + std::fs::write(args.output_file, addresses.join("\n"))?; + Ok(()) +} + +pub fn pick_client(clients: &Vec) -> &Client { + clients.choose(&mut rand::thread_rng()).unwrap() +} diff --git a/experimental/bulk-txn-submit/src/event_lookup.rs b/experimental/bulk-txn-submit/src/event_lookup.rs new file mode 100644 index 0000000000000..f2f8afa4a8937 --- /dev/null +++ b/experimental/bulk-txn-submit/src/event_lookup.rs @@ -0,0 +1,82 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::{bail, Result}; +use aptos_sdk::types::{account_address::AccountAddress, contract_event::ContractEvent}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct DepositMoveStruct { + account: AccountAddress, + amount: u64, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct AggregatorSnapshotu64MoveStruct { + value: u64, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct MintMoveStruct { + collection: AccountAddress, + index: AggregatorSnapshotu64MoveStruct, + token: AccountAddress, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct BurnMoveStruct { + collection: AccountAddress, + index: u64, + token: AccountAddress, + previous_owner: AccountAddress, +} + +pub fn get_mint_token_addr(events: &[ContractEvent]) -> Result { + let mint_event: MintMoveStruct = search_single_event_data( + events, + "0000000000000000000000000000000000000000000000000000000000000004::collection::Mint", + )?; + Ok(mint_event.token) +} + +pub fn get_burn_token_addr(events: &[ContractEvent]) -> Result { + let burn_event: BurnMoveStruct = search_single_event_data( + events, + "0000000000000000000000000000000000000000000000000000000000000004::collection::Burn", + )?; + Ok(burn_event.token) +} + +pub fn search_event(events: &[ContractEvent], type_tag: &str) -> Vec { + events + .iter() + .filter(|event| event.type_tag().to_canonical_string() == type_tag) + .cloned() + .collect::>() +} + +pub fn search_single_event_data(events: &[ContractEvent], type_tag: &str) -> Result +where + T: serde::de::DeserializeOwned, +{ + let matching_events = search_event(events, type_tag); + if matching_events.len() != 1 { + bail!( + "Expected 1 event, found: {}, events: {:?}", + matching_events.len(), + events + .iter() + .map(|event| event.type_tag().to_canonical_string()) + .collect::>() + ); + } + let event = matching_events + .first() + .ok_or_else(|| anyhow::anyhow!("No deposit event found"))?; + Ok(bcs::from_bytes::(event.event_data())?) +} + +pub fn get_deposit_dst(events: &[ContractEvent]) -> Result { + let deposit_event: DepositMoveStruct = search_single_event_data(events, "0000000000000000000000000000000000000000000000000000000000000001::coin::Deposit<0000000000000000000000000000000000000000000000000000000000000001::aptos_coin::AptosCoin>")?; + Ok(deposit_event.account) +} diff --git a/experimental/bulk-txn-submit/src/indexer.rs b/experimental/bulk-txn-submit/src/indexer.rs new file mode 100644 index 0000000000000..010304abcd6bc --- /dev/null +++ b/experimental/bulk-txn-submit/src/indexer.rs @@ -0,0 +1,83 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::Result; +use aptos_logger::warn; +use aptos_sdk::types::chain_id::ChainId; +use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; +use serde::{Deserialize, Serialize}; +use std::sync::{atomic::AtomicI64, Arc}; + +#[derive(Serialize, Deserialize, Debug)] +struct IndexerStatusResponse { + processor_status: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +struct ProcessorStatus { + processor: String, + last_updated: String, + last_success_version: u64, + last_transaction_timestamp: String, +} + +fn indexer_status_url(chain_id: ChainId) -> &'static str { + if chain_id.is_mainnet() { + "https://indexer.mainnet.aptoslabs.com/api/rest/get_latest_processor_status" + } else if chain_id.is_testnet() { + "https://indexer-testnet.staging.gcp.aptosdev.com/api/rest/get_latest_processor_status" + } else { + "https://indexer-devnet.staging.gcp.aptosdev.com/api/rest/get_latest_processor_status" + } +} + +async fn fetch_indexer_delay(chain_id: ChainId) -> Result { + let resp = reqwest::get(indexer_status_url(chain_id)) + .await? + .text() + .await?; + + let parsed: IndexerStatusResponse = serde_json::from_str(&resp)?; + + let timestamps = parsed + .processor_status + .iter() + .map(|status| { + let now_parsed: DateTime = Utc.from_utc_datetime(&NaiveDateTime::parse_from_str( + &status.last_transaction_timestamp, + "%Y-%m-%dT%H:%M:%S%.f", + )?); + Ok(Utc::now().signed_duration_since(now_parsed).num_seconds()) + // }).collect::>>(); + }) + .collect::, chrono::ParseError>>()?; + // info!("Indexer delay by processor: {:?}", timestamps); + Ok(timestamps.into_iter().max().unwrap()) +} + +pub(crate) async fn continuously_update_indexer_delay( + chain_id: ChainId, + delay_state: Arc, +) { + loop { + match fetch_indexer_delay(chain_id).await { + Ok(delay) => { + delay_state.store(delay, std::sync::atomic::Ordering::Relaxed); + }, + Err(e) => { + warn!("fetch_indexer_delay error: {:?}", e); + }, + } + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } +} + +#[ignore] +#[tokio::test] +async fn test_fetch_indexer_delay() { + for _ in 0..1000 { + println!("{}", fetch_indexer_delay(ChainId::testnet()).await.unwrap()); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } +} diff --git a/experimental/bulk-txn-submit/src/lib.rs b/experimental/bulk-txn-submit/src/lib.rs new file mode 100644 index 0000000000000..b7fb7b12a31fd --- /dev/null +++ b/experimental/bulk-txn-submit/src/lib.rs @@ -0,0 +1,8 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod coordinator; +pub mod event_lookup; +pub mod indexer; +pub mod metrics; +pub mod workloads; diff --git a/experimental/bulk-txn-submit/src/main.rs b/experimental/bulk-txn-submit/src/main.rs new file mode 100644 index 0000000000000..203b1b102efc2 --- /dev/null +++ b/experimental/bulk-txn-submit/src/main.rs @@ -0,0 +1,133 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::{Context, Result}; +use aptos_experimental_bulk_txn_submit::{ + coordinator::{ + create_sample_addresses, execute_return_worker_funds, execute_submit, CleanAddresses, + CreateSampleAddresses, SubmitArgs, + }, + workloads::{ + create_account_addresses_work, CreateAndTransferAptSignedTransactionBuilder, + TransferAptSignedTransactionBuilder, + }, +}; +use aptos_logger::{Level, Logger}; +use aptos_sdk::move_types::account_address::AccountAddress; +use aptos_transaction_emitter_lib::Cluster; +use clap::{Parser, Subcommand}; +use rand::{seq::SliceRandom, thread_rng}; +use std::collections::HashSet; + +#[derive(Parser, Debug)] +struct Args { + #[clap(subcommand)] + command: DemoCommand, +} + +#[derive(Subcommand, Debug)] +enum DemoCommand { + Submit(Submit), + CreateSampleAddresses(CreateSampleAddresses), + CleanAddresses(CleanAddresses), +} + +#[derive(Parser, Debug)] +pub struct Submit { + #[clap(flatten)] + submit_args: SubmitArgs, + #[clap(subcommand)] + work_args: WorkTypeSubcommand, +} + +#[derive(Subcommand, Debug)] +pub enum WorkTypeSubcommand { + TransferApt(DestinationsArg), + CreateAndTransferApt(DestinationsArg), + ReturnWorkerFunds, +} + +#[derive(Parser, Debug)] +pub struct DestinationsArg { + #[clap(long)] + destinations_file: String, +} + +#[tokio::main] +pub async fn main() -> Result<()> { + Logger::builder().level(Level::Info).build(); + + let args = Args::parse(); + + match args.command { + DemoCommand::Submit(args) => create_work_and_execute(args).await, + DemoCommand::CreateSampleAddresses(args) => create_sample_addresses(args), + DemoCommand::CleanAddresses(args) => clean_addresses(args), + } +} + +async fn create_work_and_execute(args: Submit) -> Result<()> { + let cluster = Cluster::try_from_cluster_args(&args.submit_args.cluster_args) + .await + .context("Failed to build cluster")?; + let coin_source_account = cluster + .load_coin_source_account(&cluster.random_instance().rest_client()) + .await?; + + match &args.work_args { + WorkTypeSubcommand::TransferApt(destinations) => { + let work = create_account_addresses_work(&destinations.destinations_file, false)?; + execute_submit( + work, + args.submit_args, + TransferAptSignedTransactionBuilder, + cluster, + coin_source_account, + false, + ) + .await + }, + WorkTypeSubcommand::CreateAndTransferApt(destinations) => { + let work = create_account_addresses_work(&destinations.destinations_file, false)?; + execute_submit( + work, + args.submit_args, + CreateAndTransferAptSignedTransactionBuilder, + cluster, + coin_source_account, + false, + ) + .await + }, + WorkTypeSubcommand::ReturnWorkerFunds => { + execute_return_worker_funds( + args.submit_args.transaction_factory_args, + args.submit_args.accounts_args, + cluster, + &coin_source_account, + ) + .await + }, + } +} + +fn clean_addresses(args: CleanAddresses) -> Result<()> { + let work = create_account_addresses_work(&args.destinations_file, false)?; + println!("Input: {}", work.len()); + let mut unique = work + .into_iter() + .collect::>() + .into_iter() + .collect::>(); + unique.shuffle(&mut thread_rng()); + println!("Output: {}", unique.len()); + std::fs::write( + args.output_file, + unique + .iter() + .map(AccountAddress::to_standard_string) + .collect::>() + .join("\n"), + )?; + Ok(()) +} diff --git a/experimental/bulk-txn-submit/src/metrics.rs b/experimental/bulk-txn-submit/src/metrics.rs new file mode 100644 index 0000000000000..dfda3fbcd6ee3 --- /dev/null +++ b/experimental/bulk-txn-submit/src/metrics.rs @@ -0,0 +1,206 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_logger::info; +use std::{ + sync::{ + atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize}, + Arc, + }, + time::{Duration, Instant}, +}; + +pub(crate) trait PrintProgress { + fn print_progress(&self, elapsed: Duration); +} + +pub(crate) fn spawn_async_tracking( + tracking: Arc, + interval: Duration, +) -> Arc { + let done = Arc::new(AtomicBool::new(false)); + let done_clone = done.clone(); + tokio::spawn(async move { + let mut previous = Instant::now(); + while !done_clone.load(std::sync::atomic::Ordering::Relaxed) { + tokio::time::sleep(interval).await; + let current = Instant::now(); + tracking.print_progress(current - previous); + previous = current; + } + }); + done +} + +impl PrintProgress for AtomicUsize { + fn print_progress(&self, _elapsed: Duration) { + info!( + "Progress: {}", + self.load(std::sync::atomic::Ordering::Relaxed) + ); + } +} + +const LATENCY_PRECISION: f64 = 0.01; + +pub(crate) struct Tracking { + submitted: AtomicUsize, + sum_submission_duration: AtomicU64, + done: AtomicUsize, + sum_latency: AtomicU64, + + last_printed_submitted: AtomicUsize, + last_printed_sum_submission_duration: AtomicU64, + last_printed_done: AtomicUsize, + last_printed_sum_latency: AtomicU64, + + last_printed_latency: AtomicU64, + + indexer_delay: Arc, + detailed: bool, +} + +impl Tracking { + pub fn new(indexer_delay: Arc, detailed: bool) -> Self { + Self { + submitted: AtomicUsize::new(0), + sum_submission_duration: AtomicU64::new(0), + done: AtomicUsize::new(0), + sum_latency: AtomicU64::new(0), + last_printed_submitted: AtomicUsize::new(0), + last_printed_sum_submission_duration: AtomicU64::new(0), + last_printed_done: AtomicUsize::new(0), + last_printed_sum_latency: AtomicU64::new(0), + last_printed_latency: AtomicU64::new(0), + indexer_delay, + detailed, + } + } + + pub fn submitted(&self, num: usize, before_submission: Instant) -> Instant { + self.submitted + .fetch_add(num, std::sync::atomic::Ordering::Relaxed); + let now = Instant::now(); + let submission_time = now - before_submission; + + self.sum_submission_duration.fetch_add( + (submission_time.as_secs_f64() / LATENCY_PRECISION) as u64 * num as u64, + std::sync::atomic::Ordering::Relaxed, + ); + now + } + + pub fn committed_succesfully(&self, num: usize, submitted_time: Instant) { + self.done + .fetch_add(num, std::sync::atomic::Ordering::Relaxed); + self.sum_latency.fetch_add( + (submitted_time.elapsed().as_secs_f64() / LATENCY_PRECISION) as u64 * num as u64, + std::sync::atomic::Ordering::Relaxed, + ); + } + + pub fn print_stats(&self, elapsed: f64) { + let submitted = self.submitted.load(std::sync::atomic::Ordering::Relaxed); + let done = self.done.load(std::sync::atomic::Ordering::Relaxed); + let sum_latency = self.sum_latency.load(std::sync::atomic::Ordering::Relaxed); + info!( + "Submitted: {}, Done: {}, Avg latency: {}, Avg TPS: {} (including warm up and checking for committed transactions)", + submitted, + done, + sum_latency as f64 / done as f64 * LATENCY_PRECISION, + done as f64 / elapsed + ); + } + + pub fn get_last_latency(&self) -> f64 { + let last_done = self + .last_printed_done + .load(std::sync::atomic::Ordering::Relaxed); + let cur_done = self.done.load(std::sync::atomic::Ordering::Relaxed); + + let last_sum_latency = self + .last_printed_sum_latency + .load(std::sync::atomic::Ordering::Relaxed); + let cur_sum_latency = self.sum_latency.load(std::sync::atomic::Ordering::Relaxed); + + let committed = cur_done - last_done; + + let last_latency = self + .last_printed_latency + .load(std::sync::atomic::Ordering::Relaxed) as f64 + * LATENCY_PRECISION; + + if committed > 0 { + last_latency.min( + ((cur_sum_latency - last_sum_latency) as f64 / committed as f64) + * LATENCY_PRECISION, + ) + } else { + last_latency + } + } +} + +impl PrintProgress for Tracking { + fn print_progress(&self, elapsed: Duration) { + let cur_submitted = self.submitted.load(std::sync::atomic::Ordering::Relaxed); + let last_submitted = self + .last_printed_submitted + .swap(cur_submitted, std::sync::atomic::Ordering::Relaxed); + + let cur_done = self.done.load(std::sync::atomic::Ordering::Relaxed); + let last_done = self + .last_printed_done + .swap(cur_done, std::sync::atomic::Ordering::Relaxed); + + let cur_sum_latency = self.sum_latency.load(std::sync::atomic::Ordering::Relaxed); + let last_sum_latency = self + .last_printed_sum_latency + .swap(cur_sum_latency, std::sync::atomic::Ordering::Relaxed); + + let cur_sum_submission_duration = self + .sum_submission_duration + .load(std::sync::atomic::Ordering::Relaxed); + let last_sum_submission_duration = self.last_printed_sum_submission_duration.swap( + cur_sum_submission_duration, + std::sync::atomic::Ordering::Relaxed, + ); + + let submitted = cur_submitted - last_submitted; + let committed = cur_done - last_done; + + let submission_duration = if submitted > 0 { + ((cur_sum_submission_duration - last_sum_submission_duration) as f64 / submitted as f64) + as u64 + } else { + 0 + }; + let latency = if committed > 0 { + ((cur_sum_latency - last_sum_latency) as f64 / committed as f64) as u64 + } else { + 0 + }; + self.last_printed_latency + .store(latency, std::sync::atomic::Ordering::Relaxed); + let indexer_latency = self + .indexer_delay + .load(std::sync::atomic::Ordering::Relaxed); + let details = if self.detailed { + format!( + ", blockchain latency {:.1}, submission duration {:.1}, indexer latency {}", + latency as f64 * LATENCY_PRECISION, + submission_duration as f64 * LATENCY_PRECISION, + indexer_latency, + ) + } else { + "".to_string() + }; + println!( + "Blockchain: progress: {} done, committed TPS: {:.2}, submitted TPS {:.2}{}", + cur_done, + committed as f32 / elapsed.as_secs_f32(), + submitted as f32 / elapsed.as_secs_f32(), + details, + ); + } +} diff --git a/experimental/bulk-txn-submit/src/workloads.rs b/experimental/bulk-txn-submit/src/workloads.rs new file mode 100644 index 0000000000000..abab28563502b --- /dev/null +++ b/experimental/bulk-txn-submit/src/workloads.rs @@ -0,0 +1,206 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::event_lookup::get_deposit_dst; +use anyhow::{anyhow, Result}; +use aptos_sdk::{ + move_types::account_address::AccountAddress, + rest_client::aptos_api_types::TransactionOnChainData, + transaction_builder::{aptos_stdlib, TransactionFactory}, + types::{ + serde_helper::bcs_utils::bcs_size_of_byte_array, + transaction::{SignedTransaction, TransactionPayload}, + LocalAccount, + }, +}; +use rand::{distributions::Alphanumeric, Rng}; +use std::{fs::read_to_string, path::Path}; + +pub trait SignedTransactionBuilder { + fn build( + &self, + data: &T, + account: &LocalAccount, + txn_factory: &TransactionFactory, + ) -> SignedTransaction; + + fn success_output(&self, data: &T, txn_out: &Option) -> String; +} + +pub fn create_account_addresses_work( + destinations_file: &str, + only_success: bool, +) -> Result> { + read_to_string(Path::new(destinations_file))? + .lines() + .filter(|s| !only_success || s.ends_with("\tsuccess")) + .filter_map(|s| s.split('\t').next()) + .filter(|s| !s.is_empty()) + .map(|text| { + AccountAddress::from_str_strict(text) + .map_err(|e| anyhow!("failed to parse {}, {:?}", text, e)) + }) + .collect::, _>>() +} + +fn parse_line_vec(line: &str) -> Result<(AccountAddress, AccountAddress)> { + let mut parts = line.split('\t'); + let first = parts + .next() + .ok_or_else(|| anyhow::anyhow!("No first part"))?; + let second = parts + .next() + .ok_or_else(|| anyhow::anyhow!("No second part"))?; + Ok(( + AccountAddress::from_str_strict(first) + .map_err(|e| anyhow!("failed to parse {}, {:?}", first, e))?, + AccountAddress::from_str_strict(second) + .map_err(|e| anyhow!("failed to parse {}, {:?}", second, e))?, + )) +} + +pub async fn create_account_address_pairs_work( + destinations_file: &str, + only_success: bool, +) -> Result> { + read_to_string(Path::new(destinations_file))? + .lines() + .filter(|s| !only_success || s.ends_with("\tsuccess")) + .map(parse_line_vec) + .collect::, _>>() +} + +pub fn rand_string(len: usize) -> String { + let res = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(len) + .map(char::from) + .collect(); + assert_eq!( + bcs::serialized_size(&res).unwrap(), + bcs_size_of_byte_array(len) + ); + res +} + +// Example transaction builders: + +pub struct PayloadSignedTransactionBuilder; + +impl SignedTransactionBuilder for PayloadSignedTransactionBuilder { + fn build( + &self, + data: &TransactionPayload, + account: &LocalAccount, + txn_factory: &TransactionFactory, + ) -> SignedTransaction { + account.sign_with_transaction_builder(txn_factory.payload(data.clone())) + } + + fn success_output( + &self, + _data: &TransactionPayload, + txn_out: &Option, + ) -> String { + match txn_out { + Some(_txn_out) => "success", + None => "failure", + } + .to_string() + } +} + +pub struct FixedPayloadSignedTransactionBuilder { + pub payload: TransactionPayload, +} + +impl FixedPayloadSignedTransactionBuilder { + pub fn new(payload: TransactionPayload) -> Self { + Self { payload } + } +} + +impl SignedTransactionBuilder<()> for FixedPayloadSignedTransactionBuilder { + fn build( + &self, + _data: &(), + account: &LocalAccount, + txn_factory: &TransactionFactory, + ) -> SignedTransaction { + account.sign_with_transaction_builder(txn_factory.payload(self.payload.clone())) + } + + fn success_output(&self, _data: &(), txn_out: &Option) -> String { + match txn_out { + Some(_txn_out) => "success", + None => "failure", + } + .to_string() + } +} + +pub struct TransferAptSignedTransactionBuilder; + +impl SignedTransactionBuilder for TransferAptSignedTransactionBuilder { + fn build( + &self, + data: &AccountAddress, + account: &LocalAccount, + txn_factory: &TransactionFactory, + ) -> SignedTransaction { + account.sign_with_transaction_builder( + txn_factory.payload(aptos_stdlib::aptos_coin_transfer(*data, 1)), + ) + } + + fn success_output( + &self, + data: &AccountAddress, + txn_out: &Option, + ) -> String { + let (status, dst) = match txn_out { + Some(txn_out) => match get_deposit_dst(&txn_out.events) { + Ok(dst) => { + assert_eq!(&dst, data); + ("success".to_string(), dst.to_standard_string()) + }, + Err(e) => (e.to_string(), data.to_standard_string()), + }, + None => ("missing".to_string(), data.to_standard_string()), + }; + format!("{}\t{}", dst, status) + } +} + +pub struct CreateAndTransferAptSignedTransactionBuilder; + +impl SignedTransactionBuilder for CreateAndTransferAptSignedTransactionBuilder { + fn build( + &self, + data: &AccountAddress, + account: &LocalAccount, + txn_factory: &TransactionFactory, + ) -> SignedTransaction { + account.sign_with_transaction_builder( + txn_factory.payload(aptos_stdlib::aptos_account_transfer(*data, 1)), + ) + } + + fn success_output( + &self, + data: &AccountAddress, + txn_out: &Option, + ) -> String { + let (status, dst) = match txn_out { + Some(txn_out) => match get_deposit_dst(&txn_out.events) { + Ok(dst) => { + assert_eq!(&dst, data); + ("success", dst.to_standard_string()) + }, + Err(_e) => ("error", data.to_standard_string()), + }, + None => ("missing", data.to_standard_string()), + }; + format!("{}\t{}", dst, status) + } +} diff --git a/sdk/src/transaction_builder.rs b/sdk/src/transaction_builder.rs index bb28f20c1c95e..9f5e958fef9b8 100644 --- a/sdk/src/transaction_builder.rs +++ b/sdk/src/transaction_builder.rs @@ -137,6 +137,10 @@ impl TransactionFactory { self.transaction_expiration_time } + pub fn get_chain_id(&self) -> ChainId { + self.chain_id + } + pub fn payload(&self, payload: TransactionPayload) -> TransactionBuilder { self.transaction_builder(payload) }