Skip to content

Commit

Permalink
add benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
alecmocatta committed Jul 30, 2020
1 parent 7dd4127 commit 4d6515b
Show file tree
Hide file tree
Showing 11 changed files with 545 additions and 203 deletions.
23 changes: 20 additions & 3 deletions Cargo.toml
Expand Up @@ -30,6 +30,7 @@ parquet = ["amadeus-parquet", "amadeus-derive/parquet"]
postgres = ["amadeus-postgres", "amadeus-derive/postgres"]
csv = ["amadeus-serde", "amadeus-derive/serde"]
json = ["amadeus-serde", "amadeus-derive/serde"]
bench = ["serde-csv", "once_cell", "arrow-parquet", "rayon"]

[package.metadata.docs.rs]
features = ["constellation", "aws", "commoncrawl", "parquet", "postgres", "csv", "json"]
Expand All @@ -47,7 +48,6 @@ async-channel = "1.1"
bincode = { version = "1.3", optional = true }
constellation-rs = { version = "0.2.0-alpha.2", default-features = false, optional = true }
derive-new = "0.5"
doc-comment = "0.3"
futures = "0.3"
num_cpus = "1.13"
pin-project = "0.4"
Expand All @@ -56,16 +56,21 @@ serde_closure = "0.3"
serde_traitobject = { version = "0.2", optional = true }
tokio = { version = "0.2", features = ["rt-threaded", "rt-util", "blocking"] }

# Move to dev-dependencies once fixed: https://github.com/rust-lang/cargo/issues/1596
arrow-parquet = { package = "parquet", version = "1.0", default-features = false, features = ["brotli", "flate2", "lz4", "snap"], optional = true }
once_cell = { version = "1.4", optional = true }
rayon = { version = "1.3", optional = true }
serde-csv = { package = "csv", version = "1.0", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
web-sys = { version = "0.3", features = ["Blob", "Performance", "Response", "Window"] }

[dev-dependencies]
doc-comment = "0.3"
either = { version = "1.5", features = ["serde"] }
once_cell = "1.4"
rand = "0.7"
rayon = "1.3"
serde_json = "1.0"
streaming_algorithms = "0.3"
tokio = { version = "0.2", features = ["macros", "time"] }
Expand Down Expand Up @@ -167,3 +172,15 @@ name = "postgres_dist"
harness = false
required-features = ["postgres"]
test = false # TODO set up postgres on CI

[[bench]]
name = "csv"
required-features = ["bench", "csv"]

[[bench]]
name = "in_memory"
required-features = ["bench"]

[[bench]]
name = "parquet"
required-features = ["bench", "parquet"]
16 changes: 14 additions & 2 deletions amadeus-core/src/pool.rs
Expand Up @@ -20,7 +20,13 @@ pub trait ProcessPool: Clone + Send + Sync + RefUnwindSafe + UnwindSafe + Unpin
where
F: traits::FnOnce(&Self::ThreadPool) -> Fut + ProcessSend + 'static,
Fut: Future<Output = T> + 'static,
T: ProcessSend + 'static;
T: ProcessSend + 'static,
{
#[allow(unsafe_code)]
unsafe {
self.spawn_unchecked(work)
}
}

/// # Safety
///
Expand All @@ -40,7 +46,13 @@ pub trait ThreadPool: Clone + Send + Sync + RefUnwindSafe + UnwindSafe + Unpin {
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = T> + 'static,
T: Send + 'static;
T: Send + 'static,
{
#[allow(unsafe_code)]
unsafe {
self.spawn_unchecked(work)
}
}

/// # Safety
///
Expand Down
8 changes: 4 additions & 4 deletions azure-pipelines.yml
Expand Up @@ -22,17 +22,17 @@ jobs:
rust_toolchain: nightly
rust_lint_toolchain: nightly-2020-07-26
rust_flags: ''
rust_features_clippy: ';aws;commoncrawl;parquet;postgres;csv;json;constellation aws commoncrawl parquet postgres csv json'
rust_features: 'constellation aws commoncrawl parquet postgres csv json'
rust_features_clippy: ';aws;commoncrawl;parquet;postgres;csv;json;constellation aws commoncrawl parquet postgres csv json bench'
rust_features: 'constellation aws commoncrawl parquet postgres csv json bench'
rust_doc_features: 'constellation aws commoncrawl parquet postgres csv json'
rust_target_check: ''
rust_target_build: ''
rust_target_run: ''
matrix:
windows:
imageName: 'windows-latest'
rust_features_clippy: ';aws;commoncrawl;parquet;postgres;csv;json;aws commoncrawl parquet postgres csv json'
rust_features: 'aws commoncrawl parquet postgres csv json'
rust_features_clippy: ';aws;commoncrawl;parquet;postgres;csv;json;aws commoncrawl parquet postgres csv json bench'
rust_features: 'aws commoncrawl parquet postgres csv json bench'
rust_doc_features: 'aws commoncrawl parquet postgres csv json'
rust_target_run: 'x86_64-pc-windows-msvc'
mac:
Expand Down
130 changes: 130 additions & 0 deletions benches/csv.rs
@@ -0,0 +1,130 @@
#![cfg(nightly)]
#![feature(test)]
#![allow(clippy::suspicious_map)]

extern crate test;

use once_cell::sync::Lazy;
use serde::Deserialize;
use std::{fs, future::Future, path::PathBuf};
use test::Bencher;
use tokio::runtime::Runtime;

use amadeus::prelude::*;

static RT: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.unwrap()
});
static POOL: Lazy<ThreadPool> = Lazy::new(|| ThreadPool::new(None).unwrap());

#[derive(Data, Clone, Deserialize, PartialEq, PartialOrd, Debug)]
struct GameDerived {
a: String,
b: String,
c: String,
d: String,
e: u32,
f: String,
}

#[derive(Data, Clone, PartialEq, PartialOrd, Debug)]
struct GameDerived2 {
a: String,
b: String,
c: String,
d: String,
e: u64,
f: String,
}

#[bench]
fn csv_typed(b: &mut Bencher) {
let file = "amadeus-testing/csv/game.csv"; // 2,600,000 bytes
run(b, file, || async {
let rows = Csv::<_, GameDerived>::new(vec![PathBuf::from(file)])
.await
.unwrap();
assert_eq!(
rows.par_stream()
.map(|row: Result<_, _>| row.unwrap())
.count(&*POOL)
.await,
100_000
);
})
}

#[bench]
fn csv_typed_serde(b: &mut Bencher) {
let file = "amadeus-testing/csv/game.csv"; // 2,600,000 bytes
run(b, file, || async {
let mut rows = serde_csv::ReaderBuilder::new()
.has_headers(false)
.from_path(file)
.unwrap();
assert_eq!(rows.deserialize::<GameDerived>().count(), 100_000);
});
}

#[bench]
fn csv_untyped(b: &mut Bencher) {
let file = "amadeus-testing/csv/game.csv"; // 2,600,000 bytes
run(b, file, || async {
let rows = Csv::<_, Value>::new(vec![PathBuf::from(file)])
.await
.unwrap();
assert_eq!(
rows.par_stream()
.map(|row: Result<_, _>| row.unwrap())
.count(&*POOL)
.await,
100_000
);
})
}

#[bench]
fn csv_untyped_serde(b: &mut Bencher) {
let file = "amadeus-testing/csv/game.csv"; // 2,600,000 bytes
run(b, file, || async {
let mut rows = serde_csv::ReaderBuilder::new()
.has_headers(false)
.from_path(file)
.unwrap();
assert_eq!(rows.records().count(), 100_000);
});
}

#[bench]
fn csv_untyped_downcase(b: &mut Bencher) {
let file = "amadeus-testing/csv/game.csv"; // 2,600,000 bytes
run(b, file, || async {
let rows = Csv::<_, Value>::new(vec![PathBuf::from(file)])
.await
.unwrap();
assert_eq!(
rows.par_stream()
.map(|row: Result<_, _>| {
let _: GameDerived2 = row.unwrap().downcast().unwrap();
})
.count(&*POOL)
.await,
100_000
);
})
}

fn run<F>(b: &mut Bencher, file: &str, mut task: impl FnMut() -> F)
where
F: Future<Output = ()>,
{
RT.enter(|| {
let _ = Lazy::force(&POOL);
b.bytes = fs::metadata(file).unwrap().len();
b.iter(|| RT.handle().block_on(task()))
})
}
72 changes: 72 additions & 0 deletions benches/in_memory.rs
@@ -0,0 +1,72 @@
#![cfg(nightly)]
#![feature(test)]
#![allow(clippy::suspicious_map)]

extern crate test;

use once_cell::sync::Lazy;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use std::{future::Future, mem};
use test::Bencher;
use tokio::runtime::Runtime;

use amadeus::prelude::*;

static RT: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.unwrap()
});
static POOL: Lazy<ThreadPool> = Lazy::new(|| ThreadPool::new(None).unwrap());

#[bench]
fn vec(b: &mut Bencher) {
let rows: Vec<u32> = (0..1u32 << 28).collect();
let len = rows.len() as u64;
let sum = len * (len - 1) / 2;
let bytes = len * mem::size_of::<u32>() as u64;
run(b, bytes, || async {
assert_eq!(
rows.par_stream()
.map(|x| x as u64)
.sum::<_, u64>(&*POOL)
.await,
sum
);
})
}

#[bench]
fn iter(b: &mut Bencher) {
let rows: Vec<u32> = (0..1u32 << 28).collect();
let len = rows.len() as u64;
let sum = len * (len - 1) / 2;
let bytes = len * mem::size_of::<u32>() as u64;
run(b, bytes, || async {
assert_eq!(rows.iter().map(|&x| x as u64).sum::<u64>(), sum);
});
}

#[bench]
fn rayon(b: &mut Bencher) {
let rows: Vec<u32> = (0..1u32 << 28).collect();
let len = rows.len() as u64;
let sum = len * (len - 1) / 2;
let bytes = len * mem::size_of::<u32>() as u64;
run(b, bytes, || async {
assert_eq!(rows.par_iter().map(|&x| x as u64).sum::<u64>(), sum);
});
}

fn run<F>(b: &mut Bencher, bytes: u64, mut task: impl FnMut() -> F)
where
F: Future<Output = ()>,
{
RT.enter(|| {
let _ = Lazy::force(&POOL);
b.bytes = bytes;
b.iter(|| RT.handle().block_on(task()))
})
}

0 comments on commit 4d6515b

Please sign in to comment.