From 599a7c9be153de7bdbbbcf8b079e5d6eb3950e0b Mon Sep 17 00:00:00 2001 From: alecmocatta Date: Sun, 19 Jul 2020 18:48:12 +0100 Subject: [PATCH] add parquet --- Cargo.toml | 13 ++++ amadeus-parquet/Cargo.toml | 2 +- .../src/internal/util/hash_util.rs | 5 +- azure-pipelines.yml | 34 ++++++++- tests/csv_wasm.rs | 7 +- tests/parquet_wasm.rs | 69 +++++++++++++++++++ 6 files changed, 120 insertions(+), 10 deletions(-) create mode 100644 tests/parquet_wasm.rs diff --git a/Cargo.toml b/Cargo.toml index 91b22769..03a7472f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,11 +66,16 @@ rand = "0.7" serde_json = "1.0" streaming_algorithms = "0.3" tokio = { version = "0.2", features = ["macros", "time"] } + +[target.'cfg(target_arch = "wasm32")'.dev-dependencies] wasm-bindgen-test = "0.3" [build-dependencies] rustversion = "1.0" +[patch.crates-io] +lz4 = { git = "https://github.com/alecmocatta/lz4-rs", branch = "wasm" } + [[example]] name = "cloudfront_logs" required-features = ["aws"] @@ -126,6 +131,10 @@ name = "parquet_dist" harness = false required-features = ["parquet"] +[[test]] +name = "parquet_wasm" +required-features = ["parquet"] + [[test]] name = "csv" required-features = ["csv"] @@ -135,6 +144,10 @@ name = "csv_dist" harness = false required-features = ["csv"] +[[test]] +name = "csv_wasm" +required-features = ["csv"] + [[test]] name = "json" required-features = ["json"] diff --git a/amadeus-parquet/Cargo.toml b/amadeus-parquet/Cargo.toml index c82d060b..05c51d1f 100644 --- a/amadeus-parquet/Cargo.toml +++ b/amadeus-parquet/Cargo.toml @@ -39,7 +39,7 @@ serde_closure = "0.3" snap = "1.0" sum = "0.1" thrift = "0.13" -zstd = "0.4" +zstd = { version = "0.5", features = ["wasm"] } [dev-dependencies] rand = "0.7" diff --git a/amadeus-parquet/src/internal/util/hash_util.rs b/amadeus-parquet/src/internal/util/hash_util.rs index 82f3bcef..7d0de64e 100644 --- a/amadeus-parquet/src/internal/util/hash_util.rs +++ b/amadeus-parquet/src/internal/util/hash_util.rs @@ -25,11 +25,10 @@ pub fn hash(data: &T, seed: u32) -> u32 { #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] { if is_x86_feature_detected!("sse4.2") { - unsafe { crc32_hash(data, seed) } - } else { - murmur_hash2_64a(data, seed as u64) as u32 + return unsafe { crc32_hash(data, seed) }; } } + murmur_hash2_64a(data, seed as u64) as u32 } const MURMUR_PRIME: u64 = 0xc6a4a7935bd1e995; diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 32952f64..39f73607 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -73,6 +73,33 @@ jobs: parameters: ordinal: 2 endpoint: alecmocatta + default: + rust_toolchain: nightly + rust_lint_toolchain: nightly-2020-07-12 + rust_flags: '' + rust_packages: '-p amadeus-core -p amadeus-derive -p amadeus-parquet -p amadeus-serde -p amadeus-types -p amadeus' + rust_features_clippy: ';parquet;csv;json;parquet csv json' + rust_features: 'parquet csv json' + rust_doc_features: 'parquet csv json' + rust_target_check: '' + rust_target_build: '' + rust_target_run: '' + matrix: + mac: + imageName: 'macos-latest' + rust_target_run: 'wasm32-unknown-unknown' + linux: + imageName: 'ubuntu-latest' + rust_target_run: 'wasm32-unknown-unknown' + # TODO: headless browser fails: driver status: exit code: 1 + # windows: + # imageName: 'windows-latest' + # rust_target_run: 'wasm32-unknown-unknown' + + - template: rust-n.yml@templates + parameters: + ordinal: 3 + endpoint: alecmocatta default: rust_toolchain: stable nightly rust_lint_toolchain: nightly-2020-07-12 @@ -91,6 +118,7 @@ jobs: linux: imageName: 'ubuntu-latest' rust_target_run: 'wasm32-unknown-unknown' - windows: - imageName: 'windows-latest' - rust_target_run: 'wasm32-unknown-unknown' + # TODO: headless browser fails: driver status: exit code: 1 + # windows: + # imageName: 'windows-latest' + # rust_target_run: 'wasm32-unknown-unknown' diff --git a/tests/csv_wasm.rs b/tests/csv_wasm.rs index 705ecb78..b2864f68 100644 --- a/tests/csv_wasm.rs +++ b/tests/csv_wasm.rs @@ -1,3 +1,4 @@ +#![cfg(target_arch = "wasm32")] #![allow(clippy::suspicious_map)] use std::path::PathBuf; @@ -26,7 +27,7 @@ async fn csv() { let timer = web_sys::window().unwrap().performance().unwrap(); let start = timer.now(); - let pool = ThreadPool::new(None).unwrap(); + let pool = &ThreadPool::new(None).unwrap(); #[derive(Data, Clone, PartialEq, PartialOrd, Debug)] struct GameDerived { @@ -44,7 +45,7 @@ async fn csv() { assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) - .count(&pool) + .count(pool) .await, 100_000 ); @@ -70,7 +71,7 @@ async fn csv() { let _: GameDerived2 = value.clone().downcast().unwrap(); value }) - .count(&pool) + .count(pool) .await, 100_000 ); diff --git a/tests/parquet_wasm.rs b/tests/parquet_wasm.rs new file mode 100644 index 00000000..3658f4c9 --- /dev/null +++ b/tests/parquet_wasm.rs @@ -0,0 +1,69 @@ +#![cfg(target_arch = "wasm32")] +#![allow(clippy::suspicious_map)] + +use std::path::PathBuf; +use wasm_bindgen::prelude::*; +use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure}; + +use amadeus::prelude::*; + +wasm_bindgen_test_configure!(run_in_browser); + +#[wasm_bindgen] +extern "C" { + #[wasm_bindgen(js_namespace = console)] + fn log(s: &str); +} +macro_rules! print { + ($($t:tt)*) => (log(&format_args!($($t)*).to_string())); +} +macro_rules! println { + ($fmt:expr) => (print!(concat!($fmt, "\n"))); + ($fmt:expr, $($t:tt)*) => (print!(concat!($fmt, "\n"), $($t)*)); +} + +#[no_mangle] +pub extern "C" fn malloc(_size: usize) -> *mut std::ffi::c_void { + panic!() +} +#[no_mangle] +pub extern "C" fn free(_ptr: *mut std::ffi::c_void) { + panic!() +} +#[no_mangle] +pub extern "C" fn calloc(_nmemb: usize, _size: usize) -> *mut std::ffi::c_void { + panic!() +} +#[no_mangle] +pub extern "C" fn realloc(_ptr: *mut std::ffi::c_void, _size: usize) -> *mut std::ffi::c_void { + panic!() +} + +#[wasm_bindgen_test] +async fn parquet() { + let timer = web_sys::window().unwrap().performance().unwrap(); + let start = timer.now(); + + let pool = &ThreadPool::new(None).unwrap(); + + let rows = Parquet::<_, Value>::new(vec![ + PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=02/part-00176-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet"), + PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=02/part-00176-ed461019-4a12-46fa-a3f3-246d58f0ee06.c000.snappy.parquet"), + PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=03/part-00137-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet"), + PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=04/part-00173-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet"), + PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=05/part-00025-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet"), + PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=05/part-00025-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet"), + PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=06/part-00185-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet"), + PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=07/part-00151-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet"), + ]).await.unwrap(); + assert_eq!( + rows.par_stream() + .map(|row: Result<_, _>| row.unwrap()) + .count(pool) + .await, + 207_535 + ); + + let elapsed = timer.now() - start; + println!("in {}s", elapsed / 1000.0); +}