Skip to content

Commit

Permalink
add parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
alecmocatta committed Jul 20, 2020
1 parent b2c2060 commit 599a7c9
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 10 deletions.
13 changes: 13 additions & 0 deletions Cargo.toml
Expand Up @@ -66,11 +66,16 @@ rand = "0.7"
serde_json = "1.0"
streaming_algorithms = "0.3"
tokio = { version = "0.2", features = ["macros", "time"] }

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen-test = "0.3"

[build-dependencies]
rustversion = "1.0"

[patch.crates-io]
lz4 = { git = "https://github.com/alecmocatta/lz4-rs", branch = "wasm" }

[[example]]
name = "cloudfront_logs"
required-features = ["aws"]
Expand Down Expand Up @@ -126,6 +131,10 @@ name = "parquet_dist"
harness = false
required-features = ["parquet"]

[[test]]
name = "parquet_wasm"
required-features = ["parquet"]

[[test]]
name = "csv"
required-features = ["csv"]
Expand All @@ -135,6 +144,10 @@ name = "csv_dist"
harness = false
required-features = ["csv"]

[[test]]
name = "csv_wasm"
required-features = ["csv"]

[[test]]
name = "json"
required-features = ["json"]
Expand Down
2 changes: 1 addition & 1 deletion amadeus-parquet/Cargo.toml
Expand Up @@ -39,7 +39,7 @@ serde_closure = "0.3"
snap = "1.0"
sum = "0.1"
thrift = "0.13"
zstd = "0.4"
zstd = { version = "0.5", features = ["wasm"] }

[dev-dependencies]
rand = "0.7"
Expand Down
5 changes: 2 additions & 3 deletions amadeus-parquet/src/internal/util/hash_util.rs
Expand Up @@ -25,11 +25,10 @@ pub fn hash<T: AsBytes>(data: &T, seed: u32) -> u32 {
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
{
if is_x86_feature_detected!("sse4.2") {
unsafe { crc32_hash(data, seed) }
} else {
murmur_hash2_64a(data, seed as u64) as u32
return unsafe { crc32_hash(data, seed) };
}
}
murmur_hash2_64a(data, seed as u64) as u32
}

const MURMUR_PRIME: u64 = 0xc6a4a7935bd1e995;
Expand Down
34 changes: 31 additions & 3 deletions azure-pipelines.yml
Expand Up @@ -73,6 +73,33 @@ jobs:
parameters:
ordinal: 2
endpoint: alecmocatta
default:
rust_toolchain: nightly
rust_lint_toolchain: nightly-2020-07-12
rust_flags: ''
rust_packages: '-p amadeus-core -p amadeus-derive -p amadeus-parquet -p amadeus-serde -p amadeus-types -p amadeus'
rust_features_clippy: ';parquet;csv;json;parquet csv json'
rust_features: 'parquet csv json'
rust_doc_features: 'parquet csv json'
rust_target_check: ''
rust_target_build: ''
rust_target_run: ''
matrix:
mac:
imageName: 'macos-latest'
rust_target_run: 'wasm32-unknown-unknown'
linux:
imageName: 'ubuntu-latest'
rust_target_run: 'wasm32-unknown-unknown'
# TODO: headless browser fails: driver status: exit code: 1
# windows:
# imageName: 'windows-latest'
# rust_target_run: 'wasm32-unknown-unknown'

- template: rust-n.yml@templates
parameters:
ordinal: 3
endpoint: alecmocatta
default:
rust_toolchain: stable nightly
rust_lint_toolchain: nightly-2020-07-12
Expand All @@ -91,6 +118,7 @@ jobs:
linux:
imageName: 'ubuntu-latest'
rust_target_run: 'wasm32-unknown-unknown'
windows:
imageName: 'windows-latest'
rust_target_run: 'wasm32-unknown-unknown'
# TODO: headless browser fails: driver status: exit code: 1
# windows:
# imageName: 'windows-latest'
# rust_target_run: 'wasm32-unknown-unknown'
7 changes: 4 additions & 3 deletions tests/csv_wasm.rs
@@ -1,3 +1,4 @@
#![cfg(target_arch = "wasm32")]
#![allow(clippy::suspicious_map)]

use std::path::PathBuf;
Expand Down Expand Up @@ -26,7 +27,7 @@ async fn csv() {
let timer = web_sys::window().unwrap().performance().unwrap();
let start = timer.now();

let pool = ThreadPool::new(None).unwrap();
let pool = &ThreadPool::new(None).unwrap();

#[derive(Data, Clone, PartialEq, PartialOrd, Debug)]
struct GameDerived {
Expand All @@ -44,7 +45,7 @@ async fn csv() {
assert_eq!(
rows.par_stream()
.map(|row: Result<_, _>| row.unwrap())
.count(&pool)
.count(pool)
.await,
100_000
);
Expand All @@ -70,7 +71,7 @@ async fn csv() {
let _: GameDerived2 = value.clone().downcast().unwrap();
value
})
.count(&pool)
.count(pool)
.await,
100_000
);
Expand Down
69 changes: 69 additions & 0 deletions tests/parquet_wasm.rs
@@ -0,0 +1,69 @@
#![cfg(target_arch = "wasm32")]
#![allow(clippy::suspicious_map)]

use std::path::PathBuf;
use wasm_bindgen::prelude::*;
use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure};

use amadeus::prelude::*;

wasm_bindgen_test_configure!(run_in_browser);

#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(js_namespace = console)]
fn log(s: &str);
}
macro_rules! print {
($($t:tt)*) => (log(&format_args!($($t)*).to_string()));
}
macro_rules! println {
($fmt:expr) => (print!(concat!($fmt, "\n")));
($fmt:expr, $($t:tt)*) => (print!(concat!($fmt, "\n"), $($t)*));
}

#[no_mangle]
pub extern "C" fn malloc(_size: usize) -> *mut std::ffi::c_void {
panic!()
}
#[no_mangle]
pub extern "C" fn free(_ptr: *mut std::ffi::c_void) {
panic!()
}
#[no_mangle]
pub extern "C" fn calloc(_nmemb: usize, _size: usize) -> *mut std::ffi::c_void {
panic!()
}
#[no_mangle]
pub extern "C" fn realloc(_ptr: *mut std::ffi::c_void, _size: usize) -> *mut std::ffi::c_void {
panic!()
}

#[wasm_bindgen_test]
async fn parquet() {
let timer = web_sys::window().unwrap().performance().unwrap();
let start = timer.now();

let pool = &ThreadPool::new(None).unwrap();

let rows = Parquet::<_, Value>::new(vec![
PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=02/part-00176-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet"),
PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=02/part-00176-ed461019-4a12-46fa-a3f3-246d58f0ee06.c000.snappy.parquet"),
PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=03/part-00137-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet"),
PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=04/part-00173-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet"),
PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=05/part-00025-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet"),
PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=05/part-00025-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet"),
PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=06/part-00185-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet"),
PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=07/part-00151-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet"),
]).await.unwrap();
assert_eq!(
rows.par_stream()
.map(|row: Result<_, _>| row.unwrap())
.count(pool)
.await,
207_535
);

let elapsed = timer.now() - start;
println!("in {}s", elapsed / 1000.0);
}

0 comments on commit 599a7c9

Please sign in to comment.