Skip to content

Commit

Permalink
Merge pull request #404 from blockworks-foundation/feature/benchrunne…
Browse files Browse the repository at this point in the history
…r-status-from-websocket
  • Loading branch information
Lou-Kamades committed Jul 2, 2024
2 parents 78afa5d + ff82bd6 commit 5cc0173
Show file tree
Hide file tree
Showing 14 changed files with 318 additions and 110 deletions.
97 changes: 87 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ name = "benchnew"
path = "src/benchnew.rs"

[dependencies]

websocket-tungstenite-retry = { git = "https://github.com/grooviegermanikus/websocket-tungstenite-retry.git", tag = "v0.8.0" }
jsonrpsee-types = "0.22.2"

clap = { workspace = true }
csv = "1.2.1"
dirs = "5.0.0"
Expand Down
19 changes: 13 additions & 6 deletions bench/src/benches/confirmation_rate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use crate::benches::rpc_interface::{
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::{read_keypair_file, Keypair, Signature, Signer};
use url::Url;
use solana_lite_rpc_util::obfuscate_rpcurl;

Check warning on line 15 in bench/src/benches/confirmation_rate.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/bench/src/benches/confirmation_rate.rs

#[derive(Clone, Copy, Debug, Default, serde::Serialize)]
pub struct Metric {
Expand All @@ -28,6 +30,7 @@ pub struct Metric {
pub async fn confirmation_rate(
payer_path: &Path,
rpc_url: String,
tx_status_websocket_addr: Option<String>,
tx_params: BenchmarkTransactionParams,
max_timeout: Duration,
txs_per_run: usize,
Expand All @@ -37,16 +40,19 @@ pub async fn confirmation_rate(

assert!(num_of_runs > 0, "num_of_runs must be greater than 0");

Check warning on line 42 in bench/src/benches/confirmation_rate.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/bench/src/benches/confirmation_rate.rs
let rpc = Arc::new(RpcClient::new(rpc_url));
info!("RPC: {}", rpc.as_ref().url());
let rpc = Arc::new(RpcClient::new(rpc_url.clone()));
info!("RPC: {}", obfuscate_rpcurl(&rpc.as_ref().url()));

let ws_addr = tx_status_websocket_addr.unwrap_or_else(|| rpc_url.replace("http:", "ws:").replace("https:", "wss:"));
info!("WS ADDR: {}", obfuscate_rpcurl(&ws_addr));

let payer: Arc<Keypair> = Arc::new(read_keypair_file(payer_path).unwrap());
info!("Payer: {}", payer.pubkey().to_string());

let mut rpc_results = Vec::with_capacity(num_of_runs);

Check warning on line 52 in bench/src/benches/confirmation_rate.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/bench/src/benches/confirmation_rate.rs

for _ in 0..num_of_runs {
match send_bulk_txs_and_wait(&rpc, &payer, txs_per_run, &tx_params, max_timeout)
match send_bulk_txs_and_wait(&rpc, Url::parse(&ws_addr).expect("Invalid Url"), &payer, txs_per_run, &tx_params, max_timeout)
.await
.context("send bulk tx and wait")
{
Expand All @@ -72,6 +78,7 @@ pub async fn confirmation_rate(

pub async fn send_bulk_txs_and_wait(
rpc: &RpcClient,
tx_status_websocket_addr: Url,
payer: &Keypair,
num_txs: usize,
tx_params: &BenchmarkTransactionParams,
Expand All @@ -87,7 +94,7 @@ pub async fn send_bulk_txs_and_wait(

Check warning on line 94 in bench/src/benches/confirmation_rate.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/bench/src/benches/confirmation_rate.rs
trace!("Sending {} transactions in bulk ..", txs.len());
let tx_and_confirmations_from_rpc: Vec<(Signature, ConfirmationResponseFromRpc)> =
send_and_confirm_bulk_transactions(rpc, &txs, max_timeout)
send_and_confirm_bulk_transactions(rpc, tx_status_websocket_addr, payer.pubkey(), &txs, max_timeout)
.await
.context("send and confirm bulk tx")?;
trace!("Done sending {} transaction.", txs.len());
Expand Down Expand Up @@ -127,9 +134,9 @@ pub async fn send_bulk_txs_and_wait(
}
ConfirmationResponseFromRpc::Timeout(elapsed) => {
debug!(
"Signature {} not confirmed after {:.02}ms",
"Signature {} not confirmed after {:.03}s",
tx_sig,
elapsed.as_secs_f32() * 1000.0
elapsed.as_secs_f32()
);
tx_sent += 1;
tx_unconfirmed += 1;
Expand Down
28 changes: 24 additions & 4 deletions bench/src/benches/confirmation_slot.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::path::Path;
use std::str::FromStr;

Check warning on line 2 in bench/src/benches/confirmation_slot.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

unused import: `std::str::FromStr`

Check warning on line 2 in bench/src/benches/confirmation_slot.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

unused import: `std::str::FromStr`

Check warning on line 2 in bench/src/benches/confirmation_slot.rs

View workflow job for this annotation

GitHub Actions / Test lite-rpc against running Validator

unused import: `std::str::FromStr`
use std::time::Duration;

use crate::benches::rpc_interface::{

Check warning on line 5 in bench/src/benches/confirmation_slot.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/bench/src/benches/confirmation_slot.rs
Expand All @@ -13,8 +14,10 @@ use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::{read_keypair_file, Signature, Signer};
use solana_sdk::transaction::VersionedTransaction;
use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair};
use solana_sdk::pubkey::Pubkey;

Check warning on line 17 in bench/src/benches/confirmation_slot.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/bench/src/benches/confirmation_slot.rs
use tokio::time::{sleep, Instant};
use url::Url;
use crate::benches::tx_status_websocket_collector::start_tx_status_collector;

Check warning on line 20 in bench/src/benches/confirmation_slot.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

unused import: `crate::benches::tx_status_websocket_collector::start_tx_status_collector`

Check warning on line 20 in bench/src/benches/confirmation_slot.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

unused import: `crate::benches::tx_status_websocket_collector::start_tx_status_collector`

Check warning on line 20 in bench/src/benches/confirmation_slot.rs

View workflow job for this annotation

GitHub Actions / Test lite-rpc against running Validator

unused import: `crate::benches::tx_status_websocket_collector::start_tx_status_collector`

#[derive(Clone, Copy, Debug, Default)]
pub struct Metric {
Expand Down Expand Up @@ -46,6 +49,8 @@ pub async fn confirmation_slot(
payer_path: &Path,
rpc_a_url: String,
rpc_b_url: String,
tx_status_websocket_addr_a: Option<String>,
tx_status_websocket_addr_b: Option<String>,
tx_params: BenchmarkTransactionParams,
max_timeout: Duration,
num_of_runs: usize,
Expand All @@ -59,19 +64,32 @@ pub async fn confirmation_slot(
info!("RPC A: {}", obfuscate_rpcurl(&rpc_a_url));

Check warning on line 64 in bench/src/benches/confirmation_slot.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/bench/src/benches/confirmation_slot.rs
info!("RPC B: {}", obfuscate_rpcurl(&rpc_b_url));

let ws_addr_a = tx_status_websocket_addr_a.unwrap_or_else(|| rpc_a_url.replace("http:", "ws:").replace("https:", "wss:"));
let ws_addr_b = tx_status_websocket_addr_b.unwrap_or_else(|| rpc_b_url.replace("http:", "ws:").replace("https:", "wss:"));
let ws_addr_a = Url::parse(&ws_addr_a).expect("Invalid URL");
let ws_addr_b = Url::parse(&ws_addr_b).expect("Invalid URL");

let rpc_a_url =
Url::parse(&rpc_a_url).map_err(|e| anyhow!("Failed to parse RPC A URL: {}", e))?;
let rpc_b_url =
Url::parse(&rpc_b_url).map_err(|e| anyhow!("Failed to parse RPC B URL: {}", e))?;

let mut rng = create_rng(None);
let payer = read_keypair_file(payer_path).expect("payer file");
info!("Payer: {}", payer.pubkey().to_string());
let payer_pubkey = payer.pubkey();
info!("Payer: {}", payer_pubkey.to_string());
// let mut ping_thing_tasks = vec![];

// FIXME
// let (tx_status_map, jh_collector) = start_tx_status_collector(Url::parse(&tx_status_websocket_addr).unwrap(), payer.pubkey(), CommitmentConfig::confirmed()).await;

for _ in 0..num_of_runs {
let rpc_a = create_rpc_client(&rpc_a_url);
let rpc_b = create_rpc_client(&rpc_b_url);

let ws_addr_a = ws_addr_a.clone();
let ws_addr_b = ws_addr_b.clone();

// measure network time to reach the respective RPC endpoints,
// used to mitigate the difference in distance by delaying the txn sending
let time_a = rpc_roundtrip_duration(&rpc_a).await?.as_secs_f64();
Expand All @@ -95,13 +113,13 @@ pub async fn confirmation_slot(
let a_task = tokio::spawn(async move {

Check warning on line 113 in bench/src/benches/confirmation_slot.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

Diff in /home/runner/work/lite-rpc/lite-rpc/bench/src/benches/confirmation_slot.rs
sleep(Duration::from_secs_f64(a_delay)).await;
debug!("(A) sending tx {}", rpc_a_tx.signatures[0]);
send_and_confirm_transaction(&rpc_a, rpc_a_tx, max_timeout).await
send_and_confirm_transaction(&rpc_a, ws_addr_a, payer_pubkey, rpc_a_tx, max_timeout).await
});

let b_task = tokio::spawn(async move {
sleep(Duration::from_secs_f64(b_delay)).await;
debug!("(B) sending tx {}", rpc_b_tx.signatures[0]);
send_and_confirm_transaction(&rpc_b, rpc_b_tx, max_timeout).await
send_and_confirm_transaction(&rpc_b, ws_addr_b, payer_pubkey, rpc_b_tx, max_timeout).await
});

let (a, b) = tokio::join!(a_task, b_task);
Expand Down Expand Up @@ -156,11 +174,13 @@ async fn create_tx(

async fn send_and_confirm_transaction(
rpc: &RpcClient,
tx_status_websocket_addr: Url,
payer_pubkey: Pubkey,
tx: VersionedTransaction,
max_timeout: Duration,
) -> anyhow::Result<ConfirmationResponseFromRpc> {
let result_vec: Vec<(Signature, ConfirmationResponseFromRpc)> =
send_and_confirm_bulk_transactions(rpc, &[tx], max_timeout).await?;
send_and_confirm_bulk_transactions(rpc, tx_status_websocket_addr, payer_pubkey, &[tx], max_timeout).await?;
assert_eq!(result_vec.len(), 1, "expected 1 result");
let (_sig, confirmation_response) = result_vec.into_iter().next().unwrap();

Expand Down
1 change: 1 addition & 0 deletions bench/src/benches/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod api_load;
pub mod confirmation_rate;
pub mod confirmation_slot;
pub mod rpc_interface;
mod tx_status_websocket_collector;
Loading

0 comments on commit 5cc0173

Please sign in to comment.