Skip to content

Commit

Permalink
Allow backoff to not penalize all workloads in mixed mode
Browse files Browse the repository at this point in the history
  • Loading branch information
sadhansood committed Sep 12, 2022
1 parent f0404c8 commit ba61fa6
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 41 deletions.
144 changes: 130 additions & 14 deletions crates/sui-benchmark/src/bin/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use sui_benchmark::workloads::workload::get_latest;
use sui_benchmark::workloads::workload::CombinationWorkload;
use sui_benchmark::workloads::workload::Payload;
use sui_benchmark::workloads::workload::Workload;
use sui_benchmark::workloads::workload::WorkloadInfo;
use sui_benchmark::workloads::workload::WorkloadType;
use sui_config::gateway::GatewayConfig;
use sui_config::Config;
Expand Down Expand Up @@ -99,6 +100,12 @@ struct Opts {
/// Whether or no to download TXes during follow
#[clap(long, global = true)]
pub download_txes: bool,
/// Run in disjoint_mode when we don't want different workloads
/// to interfere with each other. This mode should be used when
/// we don't want backoff to penalize all workloads even if only
/// one (or some) is slow.
#[clap(long, parse(try_from_str), default_value = "true", global = true)]
pub disjoint_mode: bool,
}

#[derive(Debug, Clone, Parser, Eq, PartialEq, EnumString)]
Expand Down Expand Up @@ -139,12 +146,15 @@ pub enum RunSpec {
},
}

fn make_workload(
fn make_combination_workload(
target_qps: u64,
num_workers: u64,
in_flight_ratio: u64,
primary_gas_id: ObjectID,
primary_gas_account_owner: SuiAddress,
primary_gas_account_keypair: Arc<AccountKeyPair>,
opts: &Opts,
) -> Box<dyn Workload<dyn Payload>> {
) -> WorkloadInfo {
let mut workloads = HashMap::<WorkloadType, (u32, Box<dyn Workload<dyn Payload>>)>::new();
match opts.run_spec {
RunSpec::Bench {
Expand Down Expand Up @@ -176,7 +186,70 @@ fn make_workload(
}
}
}
CombinationWorkload::new_boxed(workloads)
let workload = CombinationWorkload::new_boxed(workloads);
WorkloadInfo {
target_qps,
num_workers,
max_in_flight_ops: in_flight_ratio * target_qps,
workload,
}
}

fn make_shared_counter_workload(
weight: f32,
target_qps: u64,
num_workers: u64,
in_flight_ratio: u64,
primary_gas_id: ObjectID,
owner: SuiAddress,
keypair: Arc<AccountKeyPair>,
) -> Option<WorkloadInfo> {
let target_qps = (weight * target_qps as f32) as u64;
let num_workers = std::cmp::max(1, (weight * num_workers as f32) as u64);
let max_in_flight_ops = (target_qps * in_flight_ratio) as u64;
if target_qps == 0 || max_in_flight_ops == 0 {
None
} else {
let workload = SharedCounterWorkload::new_boxed(primary_gas_id, owner, keypair, None);
Some(WorkloadInfo {
target_qps,
num_workers,
max_in_flight_ops,
workload,
})
}
}

fn make_transfer_object_workload(
weight: f32,
target_qps: u64,
min_qps: u64,
num_workers: u64,
in_flight_ratio: u64,
num_transfer_accounts: u64,
primary_gas_id: &ObjectID,
owner: SuiAddress,
keypair: Arc<AccountKeyPair>,
) -> Option<WorkloadInfo> {
let target_qps = std::cmp::max(min_qps, (weight * target_qps as f32) as u64);
let num_workers = std::cmp::max(1, (weight * num_workers as f32) as u64);
let max_in_flight_ops = (target_qps * in_flight_ratio) as u64;
if target_qps == 0 || max_in_flight_ops == 0 {
None
} else {
let workload = TransferObjectWorkload::new_boxed(
num_transfer_accounts,
*primary_gas_id,
owner,
keypair,
);
Some(WorkloadInfo {
target_qps,
num_workers,
max_in_flight_ops,
workload,
})
}
}

#[tokio::main]
Expand Down Expand Up @@ -362,23 +435,66 @@ async fn main() -> Result<()> {
AuthAggMetrics::new(&registry),
SafeClientMetrics::new(&registry),
);
let mut workload = make_workload(primary_gas_id, owner, keypair, &opts);
workload.init(&aggregator).await;
let driver = match opts.run_spec {
match opts.run_spec {
RunSpec::Bench {
target_qps,
num_workers,
in_flight_ratio,
stat_collection_interval,
shared_counter,
transfer_object,
..
} => BenchDriver::new(
target_qps,
in_flight_ratio,
num_workers,
stat_collection_interval,
),
};
driver.run(workload, aggregator, &registry).await
} => {
let workloads = if !opts.disjoint_mode {
let mut combination_workload = make_combination_workload(
target_qps,
num_workers,
in_flight_ratio,
primary_gas_id,
owner,
keypair,
&opts,
);
combination_workload.workload.init(&aggregator).await;
vec![combination_workload]
} else {
let mut workloads = vec![];
let shared_counter_weight =
shared_counter as f32 / (shared_counter + transfer_object) as f32;
let mut total_qps = 0;
if let Some(mut shared_counter_workload) = make_shared_counter_workload(
shared_counter_weight,
target_qps,
num_workers,
in_flight_ratio,
primary_gas_id,
owner,
keypair.clone(),
) {
shared_counter_workload.workload.init(&aggregator).await;
total_qps += shared_counter_workload.target_qps;
workloads.push(shared_counter_workload);
}
if let Some(mut transfer_object_workload) = make_transfer_object_workload(
shared_counter_weight,
target_qps,
target_qps - total_qps,
num_workers,
in_flight_ratio,
opts.num_transfer_accounts,
&primary_gas_id,
owner,
keypair,
) {
transfer_object_workload.workload.init(&aggregator).await;
workloads.push(transfer_object_workload);
}
workloads
};
let driver = BenchDriver::new(stat_collection_interval);
driver.run(workloads, aggregator, &registry).await
}
}
})
});
let joined = handle.join();
Expand Down
67 changes: 44 additions & 23 deletions crates/sui-benchmark/src/drivers/bench_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::sync::OnceCell;

use crate::drivers::driver::Driver;
use crate::workloads::workload::Payload;
use crate::workloads::workload::Workload;
use crate::workloads::workload::WorkloadInfo;
use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -112,49 +112,71 @@ async fn print_start_benchmark() {
.await;
}

pub struct BenchDriver {
pub num_requests_per_worker: u64,
pub num_workers: u64,
pub struct BenchWorker {
pub num_requests: u64,
pub target_qps: u64,
pub payload: Vec<Box<dyn Payload>>,
}

pub struct BenchDriver {
pub stat_collection_interval: u64,
}

impl BenchDriver {
pub fn new(
target_qps: u64,
in_flight_ratio: u64,
num_workers: u64,
stat_collection_interval: u64,
) -> BenchDriver {
let max_in_flight_ops = target_qps as usize * in_flight_ratio as usize;
pub fn new(stat_collection_interval: u64) -> BenchDriver {
BenchDriver {
num_requests_per_worker: max_in_flight_ops as u64 / num_workers,
num_workers,
target_qps,
stat_collection_interval,
}
}
pub async fn make_workers(
&self,
workload_info: &WorkloadInfo,
aggregator: &AuthorityAggregator<NetworkAuthorityClient>,
) -> Vec<BenchWorker> {
let mut num_requests = workload_info.max_in_flight_ops / workload_info.num_workers;
let mut target_qps = workload_info.target_qps / workload_info.num_workers;
let mut workers = vec![];
for i in 0..workload_info.num_workers {
if i == workload_info.num_workers - 1 {
num_requests += workload_info.max_in_flight_ops % workload_info.num_workers;
target_qps += workload_info.target_qps % workload_info.num_workers;
}
if num_requests > 0 && target_qps > 0 {
workers.push(BenchWorker {
num_requests,
target_qps,
payload: workload_info
.workload
.make_test_payloads(num_requests, aggregator)
.await,
});
}
}
workers
}
}

#[async_trait]
impl Driver<()> for BenchDriver {
async fn run(
&self,
workload: Box<dyn Workload<dyn Payload>>,
workloads: Vec<WorkloadInfo>,
aggregator: AuthorityAggregator<NetworkAuthorityClient>,
registry: &Registry,
) -> Result<(), anyhow::Error> {
let mut tasks = Vec::new();
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
let request_delay_micros = (1_000_000 * self.num_workers) / self.target_qps;
let mut bench_workers = vec![];
for workload in workloads.iter() {
bench_workers.extend(self.make_workers(workload, &aggregator).await);
}
let num_workers = bench_workers.len() as u64;
let stat_delay_micros = 1_000_000 * self.stat_collection_interval;
let barrier = Arc::new(Barrier::new(self.num_workers as usize));
let metrics = Arc::new(BenchMetrics::new(registry));
for i in 0..self.num_workers {
eprintln!("Starting worker: {}", i);
let mut free_pool = workload
.make_test_payloads(self.num_requests_per_worker, &aggregator)
.await;
let barrier = Arc::new(Barrier::new(num_workers as usize));
for (i, worker) in bench_workers.into_iter().enumerate() {
let request_delay_micros = 1_000_000 / worker.target_qps;
let mut free_pool = worker.payload;
let tx_cloned = tx.clone();
let cloned_barrier = barrier.clone();
let metrics_cloned = metrics.clone();
Expand Down Expand Up @@ -330,7 +352,6 @@ impl Driver<()> for BenchDriver {
tasks.push(runner);
}

let num_workers = self.num_workers;
tasks.push(tokio::spawn(async move {
let mut stat_collection: BTreeMap<usize, Stats> = BTreeMap::new();
let mut counter = 0;
Expand Down
7 changes: 3 additions & 4 deletions crates/sui-benchmark/src/drivers/driver.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::workloads::workload::Payload;
use crate::workloads::workload::Workload;
use async_trait::async_trait;
use prometheus::Registry;
use sui_core::authority_aggregator::AuthorityAggregator;
use sui_core::authority_client::NetworkAuthorityClient;

use crate::workloads::workload::WorkloadInfo;

#[async_trait]
pub trait Driver<T> {
async fn run(
&self,
workload: Box<dyn Workload<dyn Payload>>,
workload: Vec<WorkloadInfo>,
aggregator: AuthorityAggregator<NetworkAuthorityClient>,
registry: &Registry,
) -> Result<T, anyhow::Error>;
Expand Down
7 changes: 7 additions & 0 deletions crates/sui-benchmark/src/workloads/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,10 @@ impl CombinationWorkload {
Box::new(CombinationWorkload { workloads })
}
}

pub struct WorkloadInfo {
pub target_qps: u64,
pub num_workers: u64,
pub max_in_flight_ops: u64,
pub workload: Box<dyn Workload<dyn Payload>>,
}

0 comments on commit ba61fa6

Please sign in to comment.