Skip to content

Commit

Permalink
Simplify concurrency / iterations model
Browse files Browse the repository at this point in the history
  • Loading branch information
fcsonline committed Jul 3, 2020
1 parent a9efb99 commit 7ab694b
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 42 deletions.
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -164,9 +164,9 @@ cargo build --release

This is the list of all features supported by the current version of `drill`:

- **Multi thread:** run your benchmarks setting as many concurrent threads as you want.
- **Concurrency:** run your benchmarks choosing the number of concurrent iterations.
- **Multi iterations:** specify the number of iterations you want to run the benchmark.
- **Ramp-up:** specify the amount of time it will take `drill` to start all threads.
- **Ramp-up:** specify the amount of time it will take `drill` to start all iterations.
- **Delay:** introduce controlled delay between requests. Example: [assigns.yml](./example/assigns.yml)
- **Dynamic urls:** execute requests with dynamic interpolations in the url, like `/api/users/{{ item }}`
- **Dynamic headers:** execute requests with dynamic headers. Example: [headers.yml](./example/headers.yml)
Expand Down
6 changes: 3 additions & 3 deletions SYNTAX.md
Expand Up @@ -25,10 +25,10 @@ plan:

### Benchmark main properties

- `concurrency`: Number of concurrent iterations. (Optional, default: 1)
- `base`: Base url for all relative URL's in your plan. (Optional)
- `iterations`: Number of loops is going to do each thread (Optional, default: 1)
- `rampup`: Amount of time it will take to start all threads. (Optional)
- `iterations`: Number of loops is going to do (Optional, default: 1)
- `concurrency`: Number of concurrent iterations. (Optional, default: max)
- `rampup`: Amount of time it will take to start all iterations. (Optional)
- `plan`: List of items to do in your benchmark. (Required)

#### Plan items
Expand Down
4 changes: 2 additions & 2 deletions example/throughput.yml
@@ -1,8 +1,8 @@
---

concurrency: 50
concurrency: 50000
base: 'http://localhost:9000'
iterations: 1000
iterations: 50000

plan:
- name: Fetch users
Expand Down
58 changes: 29 additions & 29 deletions src/benchmark.rs
Expand Up @@ -2,6 +2,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time;

use futures::stream::{self, StreamExt};

use serde_json::{json, Value};
use tokio::{runtime, time::delay_for};

Expand All @@ -19,74 +21,72 @@ pub type Context = HashMap<String, Value>;
pub type Reports = Vec<Report>;
pub type Pool = HashMap<String, Client>;

async fn run_iterations(benchmark: Arc<Benchmark>, config: Arc<Config>, concurrency: i64) -> Vec<Report> {
let delay = config.rampup / config.concurrency;
delay_for(time::Duration::new((delay * concurrency) as u64, 0)).await;

let mut global_reports = Vec::new();
async fn run_iteration(benchmark: Arc<Benchmark>, config: Arc<Config>, iteration: i64) -> Vec<Report> {
let delay = config.rampup / config.iterations;
delay_for(time::Duration::new((delay * iteration) as u64, 0)).await;

let mut pool: Pool = Pool::new();
let mut context: Context = Context::new();
let mut reports: Vec<Report> = Vec::new();

for iteration in 0..config.iterations {
let mut context: Context = Context::new();
let mut reports: Vec<Report> = Vec::new();
let mut pool: Pool = Pool::new(); // TODO: Share pool between all iterations

context.insert("iteration".to_string(), json!(iteration.to_string()));
context.insert("base".to_string(), json!(config.base.to_string()));

for item in benchmark.iter() {
item.execute(&mut context, &mut reports, &mut pool, &config).await;
}
context.insert("iteration".to_string(), json!(iteration.to_string()));
context.insert("base".to_string(), json!(config.base.to_string()));

global_reports.push(reports);
for item in benchmark.iter() {
item.execute(&mut context, &mut reports, &mut pool, &config).await;
}

global_reports.concat()
reports
}

fn join<S: ToString>(l: Vec<S>, sep: &str) -> String {
l.iter().fold("".to_string(),
|a,b| if !a.is_empty() {a+sep} else {a} + &b.to_string()
)
l.iter().fold(
"".to_string(),
|a,b| if !a.is_empty() {a+sep} else {a} + &b.to_string()
)
}

pub fn execute(benchmark_path: &str, report_path_option: Option<&str>, relaxed_interpolations: bool, no_check_certificate: bool, quiet: bool, nanosec: bool) -> Result<Vec<Vec<Report>>, Vec<Vec<Report>>> {
let config = Arc::new(Config::new(benchmark_path, relaxed_interpolations, no_check_certificate, quiet, nanosec));

if report_path_option.is_some() {
println!("{}: {}. Ignoring {} and {} properties...", "Report mode".yellow(), "on".purple(), "threads".yellow(), "iterations".yellow());
println!("{}: {}. Ignoring {} and {} properties...", "Report mode".yellow(), "on".purple(), "concurrency".yellow(), "iterations".yellow());
} else {
println!("{} {}", "Threads".yellow(), config.threads.to_string().purple());
println!("{} {}", "Concurrency".yellow(), config.concurrency.to_string().purple());
println!("{} {}", "Iterations".yellow(), config.iterations.to_string().purple());
println!("{} {}", "Rampup".yellow(), config.rampup.to_string().purple());
}

println!("{} {}", "Base URL".yellow(), config.base.purple());
println!();

let threads = config.threads;
let threads = std::cmp::min(num_cpus::get(), config.concurrency as usize);
let mut rt = runtime::Builder::new().threaded_scheduler().enable_all().core_threads(threads).max_threads(threads).build().unwrap();
rt.block_on(async {
let mut list: Vec<Box<(dyn Runnable + Sync + Send)>> = Vec::new();

include::expand_from_filepath(benchmark_path, &mut list, Some("plan"));

let list_arc = Arc::new(list);
let mut children = vec![];

if let Some(report_path) = report_path_option {
let reports = run_iterations(list_arc.clone(), config, 0).await;
let reports = run_iteration(list_arc.clone(), config, 0).await;

writer::write_file(report_path, join(reports, ""));

Ok(Vec::new())
} else {
for index in 0..config.concurrency {
let children = (0..config.iterations).map(|iteration| {
let list_clone = list_arc.clone();
let config_clone = config.clone();
children.push(tokio::spawn(async move { run_iterations(list_clone, config_clone, index).await }));
}
let list_reports: Vec<Vec<Report>> = futures::future::join_all(children).await.into_iter().map(|x| x.unwrap()).collect();

run_iteration(list_clone, config_clone, iteration)
});

let buffered = stream::iter(children).buffer_unordered(config.concurrency as usize);
let list_reports: Vec<Vec<Report>> = buffered.collect::<Vec<_>>().await;

Ok(list_reports)
}
})
Expand Down
10 changes: 5 additions & 5 deletions src/config.rs
Expand Up @@ -4,14 +4,12 @@ use crate::benchmark::Context;
use crate::interpolator;
use crate::reader;

const NCONCURRENCY: i64 = 1;
const NITERATIONS: i64 = 1;
const NRAMPUP: i64 = 0;

pub struct Config {
pub base: String,
pub concurrency: i64,
pub threads: usize,
pub iterations: i64,
pub relaxed_interpolations: bool,
pub no_check_certificate: bool,
Expand All @@ -30,16 +28,18 @@ impl Config {
let context: Context = Context::new();
let interpolator = interpolator::Interpolator::new(&context);

let concurrency = read_i64_configuration(config_doc, &interpolator, "concurrency", NCONCURRENCY);
let threads = std::cmp::min(num_cpus::get(), concurrency as usize);
let iterations = read_i64_configuration(config_doc, &interpolator, "iterations", NITERATIONS);
let concurrency = read_i64_configuration(config_doc, &interpolator, "concurrency", iterations);
let rampup = read_i64_configuration(config_doc, &interpolator, "rampup", NRAMPUP);
let base = read_str_configuration(config_doc, &interpolator, "base", "");

if concurrency > iterations {
panic!("The concurrency can not be higher than the number of iterations")
}

Config {
base,
concurrency,
threads,
iterations,
relaxed_interpolations,
no_check_certificate,
Expand Down
1 change: 0 additions & 1 deletion src/main.rs
Expand Up @@ -142,7 +142,6 @@ fn show_stats(list_reports: &[Vec<Report>], stats_option: bool, nanosec: bool, d
let requests_per_second = global_stats.total_requests as f64 / duration;

println!();
println!("{:width2$} {}", "Concurrency Level".yellow(), list_reports.len().to_string().purple(), width2 = 25);
println!("{:width2$} {} {}", "Time taken for tests".yellow(), format!("{:.1}", duration).purple(), "seconds".purple(), width2 = 25);
println!("{:width2$} {}", "Total requests".yellow(), global_stats.total_requests.to_string().purple(), width2 = 25);
println!("{:width2$} {}", "Successful requests".yellow(), global_stats.successful_requests.to_string().purple(), width2 = 25);
Expand Down

0 comments on commit 7ab694b

Please sign in to comment.