From 4d6515b4c1aabe3ad1b7317c11fd2370caba143d Mon Sep 17 00:00:00 2001 From: alecmocatta Date: Thu, 30 Jul 2020 14:17:54 +0100 Subject: [PATCH] add benchmarks --- Cargo.toml | 23 ++++- amadeus-core/src/pool.rs | 16 +++- azure-pipelines.yml | 8 +- benches/csv.rs | 130 ++++++++++++++++++++++++++++ benches/in_memory.rs | 72 ++++++++++++++++ benches/parquet.rs | 123 ++++++++++++++++++++++++++ tests/csv.rs | 4 +- tests/csv_dist.rs | 4 +- tests/csv_wasm.rs | 4 +- tests/parquet.rs | 182 +++++++++++++++++++-------------------- tests/parquet_dist.rs | 182 +++++++++++++++++++-------------------- 11 files changed, 545 insertions(+), 203 deletions(-) create mode 100644 benches/csv.rs create mode 100644 benches/in_memory.rs create mode 100644 benches/parquet.rs diff --git a/Cargo.toml b/Cargo.toml index 22cc07db..d0cd9b54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ parquet = ["amadeus-parquet", "amadeus-derive/parquet"] postgres = ["amadeus-postgres", "amadeus-derive/postgres"] csv = ["amadeus-serde", "amadeus-derive/serde"] json = ["amadeus-serde", "amadeus-derive/serde"] +bench = ["serde-csv", "once_cell", "arrow-parquet", "rayon"] [package.metadata.docs.rs] features = ["constellation", "aws", "commoncrawl", "parquet", "postgres", "csv", "json"] @@ -47,7 +48,6 @@ async-channel = "1.1" bincode = { version = "1.3", optional = true } constellation-rs = { version = "0.2.0-alpha.2", default-features = false, optional = true } derive-new = "0.5" -doc-comment = "0.3" futures = "0.3" num_cpus = "1.13" pin-project = "0.4" @@ -56,16 +56,21 @@ serde_closure = "0.3" serde_traitobject = { version = "0.2", optional = true } tokio = { version = "0.2", features = ["rt-threaded", "rt-util", "blocking"] } +# Move to dev-dependencies once fixed: https://github.com/rust-lang/cargo/issues/1596 +arrow-parquet = { package = "parquet", version = "1.0", default-features = false, features = ["brotli", "flate2", "lz4", "snap"], optional = true } +once_cell = { version = "1.4", optional = true } +rayon = { version = "1.3", optional = true } +serde-csv = { package = "csv", version = "1.0", optional = true } + [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen = "0.2" wasm-bindgen-futures = "0.4" web-sys = { version = "0.3", features = ["Blob", "Performance", "Response", "Window"] } [dev-dependencies] +doc-comment = "0.3" either = { version = "1.5", features = ["serde"] } -once_cell = "1.4" rand = "0.7" -rayon = "1.3" serde_json = "1.0" streaming_algorithms = "0.3" tokio = { version = "0.2", features = ["macros", "time"] } @@ -167,3 +172,15 @@ name = "postgres_dist" harness = false required-features = ["postgres"] test = false # TODO set up postgres on CI + +[[bench]] +name = "csv" +required-features = ["bench", "csv"] + +[[bench]] +name = "in_memory" +required-features = ["bench"] + +[[bench]] +name = "parquet" +required-features = ["bench", "parquet"] diff --git a/amadeus-core/src/pool.rs b/amadeus-core/src/pool.rs index a7a60616..6e6c3a63 100644 --- a/amadeus-core/src/pool.rs +++ b/amadeus-core/src/pool.rs @@ -20,7 +20,13 @@ pub trait ProcessPool: Clone + Send + Sync + RefUnwindSafe + UnwindSafe + Unpin where F: traits::FnOnce(&Self::ThreadPool) -> Fut + ProcessSend + 'static, Fut: Future + 'static, - T: ProcessSend + 'static; + T: ProcessSend + 'static, + { + #[allow(unsafe_code)] + unsafe { + self.spawn_unchecked(work) + } + } /// # Safety /// @@ -40,7 +46,13 @@ pub trait ThreadPool: Clone + Send + Sync + RefUnwindSafe + UnwindSafe + Unpin { where F: FnOnce() -> Fut + Send + 'static, Fut: Future + 'static, - T: Send + 'static; + T: Send + 'static, + { + #[allow(unsafe_code)] + unsafe { + self.spawn_unchecked(work) + } + } /// # Safety /// diff --git a/azure-pipelines.yml b/azure-pipelines.yml index d265da45..8fcf189a 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -22,8 +22,8 @@ jobs: rust_toolchain: nightly rust_lint_toolchain: nightly-2020-07-26 rust_flags: '' - rust_features_clippy: ';aws;commoncrawl;parquet;postgres;csv;json;constellation aws commoncrawl parquet postgres csv json' - rust_features: 'constellation aws commoncrawl parquet postgres csv json' + rust_features_clippy: ';aws;commoncrawl;parquet;postgres;csv;json;constellation aws commoncrawl parquet postgres csv json bench' + rust_features: 'constellation aws commoncrawl parquet postgres csv json bench' rust_doc_features: 'constellation aws commoncrawl parquet postgres csv json' rust_target_check: '' rust_target_build: '' @@ -31,8 +31,8 @@ jobs: matrix: windows: imageName: 'windows-latest' - rust_features_clippy: ';aws;commoncrawl;parquet;postgres;csv;json;aws commoncrawl parquet postgres csv json' - rust_features: 'aws commoncrawl parquet postgres csv json' + rust_features_clippy: ';aws;commoncrawl;parquet;postgres;csv;json;aws commoncrawl parquet postgres csv json bench' + rust_features: 'aws commoncrawl parquet postgres csv json bench' rust_doc_features: 'aws commoncrawl parquet postgres csv json' rust_target_run: 'x86_64-pc-windows-msvc' mac: diff --git a/benches/csv.rs b/benches/csv.rs new file mode 100644 index 00000000..f9188fef --- /dev/null +++ b/benches/csv.rs @@ -0,0 +1,130 @@ +#![cfg(nightly)] +#![feature(test)] +#![allow(clippy::suspicious_map)] + +extern crate test; + +use once_cell::sync::Lazy; +use serde::Deserialize; +use std::{fs, future::Future, path::PathBuf}; +use test::Bencher; +use tokio::runtime::Runtime; + +use amadeus::prelude::*; + +static RT: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .build() + .unwrap() +}); +static POOL: Lazy = Lazy::new(|| ThreadPool::new(None).unwrap()); + +#[derive(Data, Clone, Deserialize, PartialEq, PartialOrd, Debug)] +struct GameDerived { + a: String, + b: String, + c: String, + d: String, + e: u32, + f: String, +} + +#[derive(Data, Clone, PartialEq, PartialOrd, Debug)] +struct GameDerived2 { + a: String, + b: String, + c: String, + d: String, + e: u64, + f: String, +} + +#[bench] +fn csv_typed(b: &mut Bencher) { + let file = "amadeus-testing/csv/game.csv"; // 2,600,000 bytes + run(b, file, || async { + let rows = Csv::<_, GameDerived>::new(vec![PathBuf::from(file)]) + .await + .unwrap(); + assert_eq!( + rows.par_stream() + .map(|row: Result<_, _>| row.unwrap()) + .count(&*POOL) + .await, + 100_000 + ); + }) +} + +#[bench] +fn csv_typed_serde(b: &mut Bencher) { + let file = "amadeus-testing/csv/game.csv"; // 2,600,000 bytes + run(b, file, || async { + let mut rows = serde_csv::ReaderBuilder::new() + .has_headers(false) + .from_path(file) + .unwrap(); + assert_eq!(rows.deserialize::().count(), 100_000); + }); +} + +#[bench] +fn csv_untyped(b: &mut Bencher) { + let file = "amadeus-testing/csv/game.csv"; // 2,600,000 bytes + run(b, file, || async { + let rows = Csv::<_, Value>::new(vec![PathBuf::from(file)]) + .await + .unwrap(); + assert_eq!( + rows.par_stream() + .map(|row: Result<_, _>| row.unwrap()) + .count(&*POOL) + .await, + 100_000 + ); + }) +} + +#[bench] +fn csv_untyped_serde(b: &mut Bencher) { + let file = "amadeus-testing/csv/game.csv"; // 2,600,000 bytes + run(b, file, || async { + let mut rows = serde_csv::ReaderBuilder::new() + .has_headers(false) + .from_path(file) + .unwrap(); + assert_eq!(rows.records().count(), 100_000); + }); +} + +#[bench] +fn csv_untyped_downcase(b: &mut Bencher) { + let file = "amadeus-testing/csv/game.csv"; // 2,600,000 bytes + run(b, file, || async { + let rows = Csv::<_, Value>::new(vec![PathBuf::from(file)]) + .await + .unwrap(); + assert_eq!( + rows.par_stream() + .map(|row: Result<_, _>| { + let _: GameDerived2 = row.unwrap().downcast().unwrap(); + }) + .count(&*POOL) + .await, + 100_000 + ); + }) +} + +fn run(b: &mut Bencher, file: &str, mut task: impl FnMut() -> F) +where + F: Future, +{ + RT.enter(|| { + let _ = Lazy::force(&POOL); + b.bytes = fs::metadata(file).unwrap().len(); + b.iter(|| RT.handle().block_on(task())) + }) +} diff --git a/benches/in_memory.rs b/benches/in_memory.rs new file mode 100644 index 00000000..ddea7636 --- /dev/null +++ b/benches/in_memory.rs @@ -0,0 +1,72 @@ +#![cfg(nightly)] +#![feature(test)] +#![allow(clippy::suspicious_map)] + +extern crate test; + +use once_cell::sync::Lazy; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; +use std::{future::Future, mem}; +use test::Bencher; +use tokio::runtime::Runtime; + +use amadeus::prelude::*; + +static RT: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .build() + .unwrap() +}); +static POOL: Lazy = Lazy::new(|| ThreadPool::new(None).unwrap()); + +#[bench] +fn vec(b: &mut Bencher) { + let rows: Vec = (0..1u32 << 28).collect(); + let len = rows.len() as u64; + let sum = len * (len - 1) / 2; + let bytes = len * mem::size_of::() as u64; + run(b, bytes, || async { + assert_eq!( + rows.par_stream() + .map(|x| x as u64) + .sum::<_, u64>(&*POOL) + .await, + sum + ); + }) +} + +#[bench] +fn iter(b: &mut Bencher) { + let rows: Vec = (0..1u32 << 28).collect(); + let len = rows.len() as u64; + let sum = len * (len - 1) / 2; + let bytes = len * mem::size_of::() as u64; + run(b, bytes, || async { + assert_eq!(rows.iter().map(|&x| x as u64).sum::(), sum); + }); +} + +#[bench] +fn rayon(b: &mut Bencher) { + let rows: Vec = (0..1u32 << 28).collect(); + let len = rows.len() as u64; + let sum = len * (len - 1) / 2; + let bytes = len * mem::size_of::() as u64; + run(b, bytes, || async { + assert_eq!(rows.par_iter().map(|&x| x as u64).sum::(), sum); + }); +} + +fn run(b: &mut Bencher, bytes: u64, mut task: impl FnMut() -> F) +where + F: Future, +{ + RT.enter(|| { + let _ = Lazy::force(&POOL); + b.bytes = bytes; + b.iter(|| RT.handle().block_on(task())) + }) +} diff --git a/benches/parquet.rs b/benches/parquet.rs new file mode 100644 index 00000000..57d325d9 --- /dev/null +++ b/benches/parquet.rs @@ -0,0 +1,123 @@ +#![cfg(nightly)] +#![feature(test)] +#![allow(clippy::suspicious_map)] + +extern crate test; + +use arrow_parquet::file::reader::{FileReader, SerializedFileReader}; +use once_cell::sync::Lazy; +use std::{fs, fs::File, future::Future, path::PathBuf}; +use test::Bencher; +use tokio::runtime::Runtime; + +use amadeus::prelude::*; + +static RT: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .build() + .unwrap() +}); +static POOL: Lazy = Lazy::new(|| ThreadPool::new(None).unwrap()); + +#[derive(Data, Clone, PartialEq, Debug)] +struct TenKayVeeTwo { + binary_field: List, + int32_field: i32, + int64_field: i64, + boolean_field: bool, + float_field: f32, + double_field: f64, + flba_field: List, // [u8;1024], + int96_field: DateTime, +} + +#[derive(Data, Clone, PartialEq, Debug)] +struct StockSimulated { + bp1: Option, + bp2: Option, + bp3: Option, + bp4: Option, + bp5: Option, + bs1: Option, + bs2: Option, + bs3: Option, + bs4: Option, + bs5: Option, + ap1: Option, + ap2: Option, + ap3: Option, + ap4: Option, + ap5: Option, + as1: Option, + as2: Option, + as3: Option, + as4: Option, + as5: Option, + valid: Option, + __index_level_0__: Option, +} + +#[bench] +fn parquet_10k(b: &mut Bencher) { + let file = "amadeus-testing/parquet/10k-v2.parquet"; // 669,034 bytes + run(b, file, || async { + let rows = Parquet::<_, TenKayVeeTwo>::new(PathBuf::from(file)) + .await + .unwrap(); + assert_eq!( + rows.par_stream() + .map(|row: Result<_, _>| row.unwrap()) + .count(&*POOL) + .await, + 10_000 + ); + }) +} + +#[bench] +fn parquet_stock(b: &mut Bencher) { + let file = "amadeus-testing/parquet/stock_simulated.parquet"; // 1,289,419 bytes + run(b, file, || async { + let rows = Parquet::<_, StockSimulated>::new(PathBuf::from(file)) + .await + .unwrap(); + assert_eq!( + rows.par_stream() + .map(|row: Result<_, _>| row.unwrap()) + .count(&*POOL) + .await, + 42_000 + ); + }) +} + +#[bench] +fn parquet_10k_arrow(b: &mut Bencher) { + let file = "amadeus-testing/parquet/10k-v2.parquet"; // 669,034 bytes + run(b, file, || async { + let parquet_reader = SerializedFileReader::new(File::open(file).unwrap()).unwrap(); + assert_eq!(parquet_reader.get_row_iter(None).unwrap().count(), 10_000); + }) +} + +#[bench] +fn parquet_stock_arrow(b: &mut Bencher) { + let file = "amadeus-testing/parquet/stock_simulated.parquet"; // 1,289,419 bytes + run(b, file, || async { + let parquet_reader = SerializedFileReader::new(File::open(file).unwrap()).unwrap(); + assert_eq!(parquet_reader.get_row_iter(None).unwrap().count(), 42_000); + }) +} + +fn run(b: &mut Bencher, file: &str, mut task: impl FnMut() -> F) +where + F: Future, +{ + RT.enter(|| { + let _ = Lazy::force(&POOL); + b.bytes = fs::metadata(file).unwrap().len(); + b.iter(|| RT.handle().block_on(task())) + }) +} diff --git a/tests/csv.rs b/tests/csv.rs index 987d98dd..b1b56a40 100644 --- a/tests/csv.rs +++ b/tests/csv.rs @@ -20,7 +20,7 @@ async fn csv() { f: String, } - let rows = Csv::<_, GameDerived>::new(vec![PathBuf::from("amadeus-testing/csv/game.csv")]) + let rows = Csv::<_, GameDerived>::new(PathBuf::from("amadeus-testing/csv/game.csv")) .await .unwrap(); assert_eq!( @@ -41,7 +41,7 @@ async fn csv() { f: String, } - let rows = Csv::<_, Value>::new(vec![PathBuf::from("amadeus-testing/csv/game.csv")]) + let rows = Csv::<_, Value>::new(PathBuf::from("amadeus-testing/csv/game.csv")) .await .unwrap(); assert_eq!( diff --git a/tests/csv_dist.rs b/tests/csv_dist.rs index 73a08769..3d214555 100644 --- a/tests/csv_dist.rs +++ b/tests/csv_dist.rs @@ -45,7 +45,7 @@ async fn run(pool: &P) -> Duration { f: String, } - let rows = Csv::<_, GameDerived>::new(vec![PathBuf::from("amadeus-testing/csv/game.csv")]) + let rows = Csv::<_, GameDerived>::new(PathBuf::from("amadeus-testing/csv/game.csv")) .await .unwrap(); assert_eq!( @@ -66,7 +66,7 @@ async fn run(pool: &P) -> Duration { f: String, } - let rows = Csv::<_, Value>::new(vec![PathBuf::from("amadeus-testing/csv/game.csv")]) + let rows = Csv::<_, Value>::new(PathBuf::from("amadeus-testing/csv/game.csv")) .await .unwrap(); assert_eq!( diff --git a/tests/csv_wasm.rs b/tests/csv_wasm.rs index f2bcf27c..4c51d47b 100644 --- a/tests/csv_wasm.rs +++ b/tests/csv_wasm.rs @@ -56,7 +56,7 @@ async fn csv() { f: String, } - let rows = Csv::<_, GameDerived>::new(vec![PathBuf::from("amadeus-testing/csv/game.csv")]) + let rows = Csv::<_, GameDerived>::new(PathBuf::from("amadeus-testing/csv/game.csv")) .await .unwrap(); assert_eq!( @@ -77,7 +77,7 @@ async fn csv() { f: String, } - let rows = Csv::<_, Value>::new(vec![PathBuf::from("amadeus-testing/csv/game.csv")]) + let rows = Csv::<_, Value>::new(PathBuf::from("amadeus-testing/csv/game.csv")) .await .unwrap(); assert_eq!( diff --git a/tests/parquet.rs b/tests/parquet.rs index 758eb082..c48da012 100644 --- a/tests/parquet.rs +++ b/tests/parquet.rs @@ -133,9 +133,9 @@ async fn parquet() { valid: Option, __index_level_0__: Option, } - let rows = Parquet::<_, StockSimulatedDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, StockSimulatedDerived>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -146,9 +146,9 @@ async fn parquet() { 42_000 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -169,9 +169,9 @@ async fn parquet() { __index_level_0__: Option, } - let rows = Parquet::<_, StockSimulatedDerivedProjection1>::new(vec![PathBuf::from( + let rows = Parquet::<_, StockSimulatedDerivedProjection1>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -182,9 +182,9 @@ async fn parquet() { 42_000 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -202,9 +202,9 @@ async fn parquet() { #[derive(Data, Clone, PartialEq, Debug)] struct StockSimulatedDerivedProjection2 {} - let rows = Parquet::<_, StockSimulatedDerivedProjection2>::new(vec![PathBuf::from( + let rows = Parquet::<_, StockSimulatedDerivedProjection2>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -215,9 +215,9 @@ async fn parquet() { 42_000 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -255,11 +255,10 @@ async fn parquet() { int96_field: DateTime, } - let rows = Parquet::<_, TenKayVeeTwo>::new(vec![PathBuf::from( - "amadeus-testing/parquet/10k-v2.parquet", - )]) - .await - .unwrap(); + let rows = + Parquet::<_, TenKayVeeTwo>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet")) + .await + .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) @@ -268,9 +267,9 @@ async fn parquet() { 10_000 ); - let rows = Parquet::<_, TenKayVeeTwoDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, TenKayVeeTwoDerived>::new(PathBuf::from( "amadeus-testing/parquet/10k-v2.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -281,11 +280,9 @@ async fn parquet() { 10_000 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( - "amadeus-testing/parquet/10k-v2.parquet", - )]) - .await - .unwrap(); + let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet")) + .await + .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { @@ -328,9 +325,9 @@ async fn parquet() { timestamp_col: Option, } - let rows = Parquet::<_, AlltypesDictionary>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesDictionary>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -341,9 +338,9 @@ async fn parquet() { 2 ); - let rows = Parquet::<_, AlltypesDictionaryDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesDictionaryDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -354,9 +351,9 @@ async fn parquet() { 2 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -401,9 +398,9 @@ async fn parquet() { timestamp_col: Option, } - let rows = Parquet::<_, AlltypesPlain>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesPlain>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -414,9 +411,9 @@ async fn parquet() { 8 ); - let rows = Parquet::<_, AlltypesPlainDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesPlainDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -427,9 +424,9 @@ async fn parquet() { 8 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -474,9 +471,9 @@ async fn parquet() { timestamp_col: Option, } - let rows = Parquet::<_, AlltypesPlainSnappy>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesPlainSnappy>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -487,9 +484,9 @@ async fn parquet() { 2 ); - let rows = Parquet::<_, AlltypesPlainSnappyDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesPlainSnappyDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -500,9 +497,9 @@ async fn parquet() { 2 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -555,9 +552,9 @@ async fn parquet() { a: Option>>>>>>, b: i32, } - let rows = Parquet::<_, NestedLists>::new(vec![PathBuf::from( + let rows = Parquet::<_, NestedLists>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -568,9 +565,9 @@ async fn parquet() { 3 ); - let rows = Parquet::<_, NestedListsDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NestedListsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -581,9 +578,9 @@ async fn parquet() { 3 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -610,9 +607,9 @@ async fn parquet() { b: i32, c: f64, } - let rows = Parquet::<_, NestedMaps>::new(vec![PathBuf::from( + let rows = Parquet::<_, NestedMaps>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -623,9 +620,9 @@ async fn parquet() { 6 ); - let rows = Parquet::<_, NestedMapsDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NestedMapsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -636,9 +633,9 @@ async fn parquet() { 6 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -704,9 +701,9 @@ async fn parquet() { f: String, } - let rows = Parquet::<_, Nonnullable>::new(vec![PathBuf::from( + let rows = Parquet::<_, Nonnullable>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -717,9 +714,9 @@ async fn parquet() { 1 ); - let rows = Parquet::<_, NonnullableDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NonnullableDerived>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -730,9 +727,9 @@ async fn parquet() { 1 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -777,9 +774,9 @@ async fn parquet() { Option>>,)>,)>>>, )>, } - let rows = Parquet::<_, Nullable>::new(vec![PathBuf::from( + let rows = Parquet::<_, Nullable>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -790,9 +787,9 @@ async fn parquet() { 7 ); - let rows = Parquet::<_, NullableDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NullableDerived>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -803,9 +800,9 @@ async fn parquet() { 7 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -826,9 +823,9 @@ async fn parquet() { struct NullsDerived { b_struct: Option<(Option,)>, } - let rows = Parquet::<_, Nulls>::new(vec![PathBuf::from( + let rows = Parquet::<_, Nulls>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -839,9 +836,9 @@ async fn parquet() { 8 ); - let rows = Parquet::<_, NullsDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NullsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -852,9 +849,9 @@ async fn parquet() { 8 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -877,9 +874,9 @@ async fn parquet() { #[amadeus(name = "phoneNumbers")] phone_numbers: Option<(List<(i64, Option)>,)>, } - let rows = Parquet::<_, Repeated>::new(vec![PathBuf::from( + let rows = Parquet::<_, Repeated>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -890,9 +887,9 @@ async fn parquet() { 6 ); - let rows = Parquet::<_, RepeatedDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, RepeatedDerived>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -903,9 +900,9 @@ async fn parquet() { 6 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -930,9 +927,9 @@ async fn parquet() { d: bool, e: Option>, } - let rows = Parquet::<_, TestDatapage>::new(vec![PathBuf::from( + let rows = Parquet::<_, TestDatapage>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -943,9 +940,9 @@ async fn parquet() { 5 ); - let rows = Parquet::<_, TestDatapageDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, TestDatapageDerived>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -956,9 +953,9 @@ async fn parquet() { 5 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -1001,11 +998,10 @@ async fn parquet() { __index_level_0__: Option, } - let rows = Parquet::<_, CommitsDerived>::new(vec![PathBuf::from( - "amadeus-testing/parquet/commits.parquet", - )]) - .await - .unwrap(); + let rows = + Parquet::<_, CommitsDerived>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet")) + .await + .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) @@ -1014,11 +1010,9 @@ async fn parquet() { 14_444 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( - "amadeus-testing/parquet/commits.parquet", - )]) - .await - .unwrap(); + let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet")) + .await + .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { diff --git a/tests/parquet_dist.rs b/tests/parquet_dist.rs index 9a851255..5ef4a762 100644 --- a/tests/parquet_dist.rs +++ b/tests/parquet_dist.rs @@ -138,9 +138,9 @@ async fn run(pool: &P) -> Duration { valid: Option, __index_level_0__: Option, } - let rows = Parquet::<_, StockSimulatedDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, StockSimulatedDerived>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -151,9 +151,9 @@ async fn run(pool: &P) -> Duration { 42_000 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -174,9 +174,9 @@ async fn run(pool: &P) -> Duration { __index_level_0__: Option, } - let rows = Parquet::<_, StockSimulatedDerivedProjection1>::new(vec![PathBuf::from( + let rows = Parquet::<_, StockSimulatedDerivedProjection1>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -187,9 +187,9 @@ async fn run(pool: &P) -> Duration { 42_000 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -207,9 +207,9 @@ async fn run(pool: &P) -> Duration { #[derive(Data, Clone, PartialEq, Debug)] struct StockSimulatedDerivedProjection2 {} - let rows = Parquet::<_, StockSimulatedDerivedProjection2>::new(vec![PathBuf::from( + let rows = Parquet::<_, StockSimulatedDerivedProjection2>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -220,9 +220,9 @@ async fn run(pool: &P) -> Duration { 42_000 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -260,11 +260,10 @@ async fn run(pool: &P) -> Duration { int96_field: DateTime, } - let rows = Parquet::<_, TenKayVeeTwo>::new(vec![PathBuf::from( - "amadeus-testing/parquet/10k-v2.parquet", - )]) - .await - .unwrap(); + let rows = + Parquet::<_, TenKayVeeTwo>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet")) + .await + .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) @@ -273,9 +272,9 @@ async fn run(pool: &P) -> Duration { 10_000 ); - let rows = Parquet::<_, TenKayVeeTwoDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, TenKayVeeTwoDerived>::new(PathBuf::from( "amadeus-testing/parquet/10k-v2.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -286,11 +285,9 @@ async fn run(pool: &P) -> Duration { 10_000 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( - "amadeus-testing/parquet/10k-v2.parquet", - )]) - .await - .unwrap(); + let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet")) + .await + .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { @@ -333,9 +330,9 @@ async fn run(pool: &P) -> Duration { timestamp_col: Option, } - let rows = Parquet::<_, AlltypesDictionary>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesDictionary>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -346,9 +343,9 @@ async fn run(pool: &P) -> Duration { 2 ); - let rows = Parquet::<_, AlltypesDictionaryDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesDictionaryDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -359,9 +356,9 @@ async fn run(pool: &P) -> Duration { 2 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -406,9 +403,9 @@ async fn run(pool: &P) -> Duration { timestamp_col: Option, } - let rows = Parquet::<_, AlltypesPlain>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesPlain>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -419,9 +416,9 @@ async fn run(pool: &P) -> Duration { 8 ); - let rows = Parquet::<_, AlltypesPlainDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesPlainDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -432,9 +429,9 @@ async fn run(pool: &P) -> Duration { 8 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -479,9 +476,9 @@ async fn run(pool: &P) -> Duration { timestamp_col: Option, } - let rows = Parquet::<_, AlltypesPlainSnappy>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesPlainSnappy>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -492,9 +489,9 @@ async fn run(pool: &P) -> Duration { 2 ); - let rows = Parquet::<_, AlltypesPlainSnappyDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesPlainSnappyDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -505,9 +502,9 @@ async fn run(pool: &P) -> Duration { 2 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -560,9 +557,9 @@ async fn run(pool: &P) -> Duration { a: Option>>>>>>, b: i32, } - let rows = Parquet::<_, NestedLists>::new(vec![PathBuf::from( + let rows = Parquet::<_, NestedLists>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -573,9 +570,9 @@ async fn run(pool: &P) -> Duration { 3 ); - let rows = Parquet::<_, NestedListsDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NestedListsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -586,9 +583,9 @@ async fn run(pool: &P) -> Duration { 3 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -615,9 +612,9 @@ async fn run(pool: &P) -> Duration { b: i32, c: f64, } - let rows = Parquet::<_, NestedMaps>::new(vec![PathBuf::from( + let rows = Parquet::<_, NestedMaps>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -628,9 +625,9 @@ async fn run(pool: &P) -> Duration { 6 ); - let rows = Parquet::<_, NestedMapsDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NestedMapsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -641,9 +638,9 @@ async fn run(pool: &P) -> Duration { 6 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -709,9 +706,9 @@ async fn run(pool: &P) -> Duration { f: String, } - let rows = Parquet::<_, Nonnullable>::new(vec![PathBuf::from( + let rows = Parquet::<_, Nonnullable>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -722,9 +719,9 @@ async fn run(pool: &P) -> Duration { 1 ); - let rows = Parquet::<_, NonnullableDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NonnullableDerived>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -735,9 +732,9 @@ async fn run(pool: &P) -> Duration { 1 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -782,9 +779,9 @@ async fn run(pool: &P) -> Duration { Option>>,)>,)>>>, )>, } - let rows = Parquet::<_, Nullable>::new(vec![PathBuf::from( + let rows = Parquet::<_, Nullable>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -795,9 +792,9 @@ async fn run(pool: &P) -> Duration { 7 ); - let rows = Parquet::<_, NullableDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NullableDerived>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -808,9 +805,9 @@ async fn run(pool: &P) -> Duration { 7 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -831,9 +828,9 @@ async fn run(pool: &P) -> Duration { struct NullsDerived { b_struct: Option<(Option,)>, } - let rows = Parquet::<_, Nulls>::new(vec![PathBuf::from( + let rows = Parquet::<_, Nulls>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -844,9 +841,9 @@ async fn run(pool: &P) -> Duration { 8 ); - let rows = Parquet::<_, NullsDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NullsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -857,9 +854,9 @@ async fn run(pool: &P) -> Duration { 8 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -882,9 +879,9 @@ async fn run(pool: &P) -> Duration { #[amadeus(name = "phoneNumbers")] phone_numbers: Option<(List<(i64, Option)>,)>, } - let rows = Parquet::<_, Repeated>::new(vec![PathBuf::from( + let rows = Parquet::<_, Repeated>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -895,9 +892,9 @@ async fn run(pool: &P) -> Duration { 6 ); - let rows = Parquet::<_, RepeatedDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, RepeatedDerived>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -908,9 +905,9 @@ async fn run(pool: &P) -> Duration { 6 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -935,9 +932,9 @@ async fn run(pool: &P) -> Duration { d: bool, e: Option>, } - let rows = Parquet::<_, TestDatapage>::new(vec![PathBuf::from( + let rows = Parquet::<_, TestDatapage>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -948,9 +945,9 @@ async fn run(pool: &P) -> Duration { 5 ); - let rows = Parquet::<_, TestDatapageDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, TestDatapageDerived>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -961,9 +958,9 @@ async fn run(pool: &P) -> Duration { 5 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -1006,11 +1003,10 @@ async fn run(pool: &P) -> Duration { __index_level_0__: Option, } - let rows = Parquet::<_, CommitsDerived>::new(vec![PathBuf::from( - "amadeus-testing/parquet/commits.parquet", - )]) - .await - .unwrap(); + let rows = + Parquet::<_, CommitsDerived>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet")) + .await + .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) @@ -1019,11 +1015,9 @@ async fn run(pool: &P) -> Duration { 14_444 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( - "amadeus-testing/parquet/commits.parquet", - )]) - .await - .unwrap(); + let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet")) + .await + .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value {