Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the file trait more portable #89

Merged
merged 5 commits into from
Jul 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 25 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "amadeus"
version = "0.3.4"
version = "0.3.5"
license = "Apache-2.0"
authors = ["Alec Mocatta <alec@mocatta.net>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand Down Expand Up @@ -35,14 +35,14 @@ json = ["amadeus-serde", "amadeus-derive/serde"]
features = ["constellation", "aws", "commoncrawl", "parquet", "postgres", "csv", "json"]

[dependencies]
amadeus-core = { version = "=0.3.4", path = "amadeus-core" }
amadeus-derive = { version = "=0.3.4", path = "amadeus-derive" }
amadeus-types = { version = "=0.3.4", path = "amadeus-types" }
amadeus-aws = { version = "=0.3.4", path = "amadeus-aws", optional = true }
amadeus-commoncrawl = { version = "=0.3.4", path = "amadeus-commoncrawl", optional = true }
amadeus-parquet = { version = "=0.3.4", path = "amadeus-parquet", optional = true }
amadeus-postgres = { version = "=0.3.4", path = "amadeus-postgres", optional = true }
amadeus-serde = { version = "=0.3.4", path = "amadeus-serde", optional = true }
amadeus-core = { version = "=0.3.5", path = "amadeus-core" }
amadeus-derive = { version = "=0.3.5", path = "amadeus-derive" }
amadeus-types = { version = "=0.3.5", path = "amadeus-types" }
amadeus-aws = { version = "=0.3.5", path = "amadeus-aws", optional = true }
amadeus-commoncrawl = { version = "=0.3.5", path = "amadeus-commoncrawl", optional = true }
amadeus-parquet = { version = "=0.3.5", path = "amadeus-parquet", optional = true }
amadeus-postgres = { version = "=0.3.5", path = "amadeus-postgres", optional = true }
amadeus-serde = { version = "=0.3.5", path = "amadeus-serde", optional = true }
async-channel = "1.1"
constellation-rs = { version = "0.2.0-alpha.2", default-features = false, optional = true }
derive-new = "0.5"
Expand All @@ -55,13 +55,21 @@ serde_closure = "0.3"
serde_traitobject = { version = "0.2", optional = true }
tokio = { version = "0.2", features = ["rt-threaded", "rt-util", "blocking"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
web-sys = { version = "0.3", features = ["Blob", "Performance", "Response", "Window"] }

[dev-dependencies]
either = { version = "1.5", features = ["serde"] }
rand = "0.7"
serde_json = "1.0"
streaming_algorithms = "0.3"
tokio = { version = "0.2", features = ["macros", "time"] }

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen-test = "0.3"

[build-dependencies]
rustversion = "1.0"

Expand Down Expand Up @@ -120,6 +128,10 @@ name = "parquet_dist"
harness = false
required-features = ["parquet"]

[[test]]
name = "parquet_wasm"
required-features = ["parquet"]

[[test]]
name = "csv"
required-features = ["csv"]
Expand All @@ -129,6 +141,10 @@ name = "csv_dist"
harness = false
required-features = ["csv"]

[[test]]
name = "csv_wasm"
required-features = ["csv"]

[[test]]
name = "json"
required-features = ["json"]
Expand Down
6 changes: 3 additions & 3 deletions amadeus-aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amadeus-aws"
version = "0.3.4"
version = "0.3.5"
license = "Apache-2.0"
authors = ["Alec Mocatta <alec@mocatta.net>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand All @@ -19,8 +19,8 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "2
maintenance = { status = "actively-developed" }

[dependencies]
amadeus-core = { version = "=0.3.4", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.4", path = "../amadeus-types" }
amadeus-core = { version = "=0.3.5", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.5", path = "../amadeus-types" }
async-compression = { version = "0.3.3", features = ["gzip", "futures-bufread"] }
async-trait = "0.1"
chrono = { version = "0.4", default-features = false }
Expand Down
18 changes: 10 additions & 8 deletions amadeus-aws/src/file.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::{future, future::LocalBoxFuture, FutureExt};
use rusoto_core::RusotoError;
use rusoto_s3::{GetObjectRequest, HeadObjectRequest, S3Client, S3};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -227,17 +227,17 @@ impl S3Page {
impl Page for S3Page {
type Error = IoError;

fn len(&self) -> u64 {
self.inner.len
fn len(&self) -> LocalBoxFuture<'static, Result<u64, Self::Error>> {
future::ready(Ok(self.inner.len)).boxed_local()
}
fn set_len(&self, _len: u64) -> Result<(), Self::Error> {
todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/61")
}
fn read(&self, offset: u64, len: usize) -> BoxFuture<'static, Result<Box<[u8]>, Self::Error>> {
fn read(
&self, offset: u64, len: usize,
) -> LocalBoxFuture<'static, Result<Box<[u8]>, Self::Error>> {
let self_ = S3Page {
inner: self.inner.clone(),
};
Box::pin(async move {
let len = len.min(usize::try_from(self_.inner.len.saturating_sub(offset)).unwrap());
let mut buf_ = vec![0; len].into_boxed_slice();
let mut buf = &mut *buf_;
let len: u64 = len.try_into().unwrap();
Expand Down Expand Up @@ -279,7 +279,9 @@ impl Page for S3Page {
Ok(buf_)
})
}
fn write(&self, _offset: u64, _buf: Box<[u8]>) -> BoxFuture<'static, Result<(), Self::Error>> {
fn write(
&self, _offset: u64, _buf: Box<[u8]>,
) -> LocalBoxFuture<'static, Result<(), Self::Error>> {
todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/61")
}
}
2 changes: 1 addition & 1 deletion amadeus-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//!
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html).

#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.3.4")]
#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.3.5")]
#![cfg_attr(nightly, feature(type_alias_impl_trait))]
#![warn(
// missing_copy_implementations,
Expand Down
6 changes: 3 additions & 3 deletions amadeus-commoncrawl/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amadeus-commoncrawl"
version = "0.3.4"
version = "0.3.5"
license = "MIT OR Apache-2.0"
authors = ["Stephen Becker IV <github@deathbyescalator.com>", "Alec Mocatta <alec@mocatta.net>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand All @@ -19,8 +19,8 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "2
maintenance = { status = "actively-developed" }

[dependencies]
amadeus-core = { version = "=0.3.4", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.4", path = "../amadeus-types" }
amadeus-core = { version = "=0.3.5", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.5", path = "../amadeus-types" }
async-compression = { version = "0.3.3", features = ["gzip", "futures-bufread"] }
futures = "0.3"
nom = "4.2.3"
Expand Down
2 changes: 1 addition & 1 deletion amadeus-commoncrawl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//!
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html).

#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.3.4")]
#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.3.5")]
#![cfg_attr(nightly, feature(type_alias_impl_trait))]
#![warn(
// missing_copy_implementations,
Expand Down
9 changes: 8 additions & 1 deletion amadeus-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amadeus-core"
version = "0.3.4"
version = "0.3.5"
license = "Apache-2.0"
authors = ["Alec Mocatta <alec@mocatta.net>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand Down Expand Up @@ -33,8 +33,15 @@ serde = { version = "1.0", features = ["derive"] }
serde_closure = "0.3"
streaming_algorithms = "0.3"
sum = { version = "0.1", features = ["futures", "serde"] }
tokio = "0.2"
walkdir = "2.2"
widestring = "0.4"

[target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys = "0.3"
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
web-sys = { version = "0.3", features = ["Blob", "Response", "Window"] }

[build-dependencies]
rustversion = "1.0"
65 changes: 30 additions & 35 deletions amadeus-core/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
mod local;

use async_trait::async_trait;
use futures::{future::BoxFuture, ready};
use futures::{future::LocalBoxFuture, ready};
use pin_project::pin_project;
use std::{
convert::TryFrom, error::Error, ffi, fmt, future::Future, io, pin::Pin, sync::Arc, task::{Context, Poll}
Expand Down Expand Up @@ -208,14 +208,16 @@ pub trait Partition: Clone + fmt::Debug + ProcessSend + 'static {
async fn pages(self) -> Result<Vec<Self::Page>, Self::Error>;
}
#[allow(clippy::len_without_is_empty)]
#[async_trait]
pub trait Page: Send {
pub trait Page {
type Error: Error + Clone + PartialEq + Into<io::Error> + ProcessSend + 'static;

fn len(&self) -> u64;
fn set_len(&self, len: u64) -> Result<(), Self::Error>;
fn read(&self, offset: u64, len: usize) -> BoxFuture<'static, Result<Box<[u8]>, Self::Error>>;
fn write(&self, offset: u64, buf: Box<[u8]>) -> BoxFuture<'static, Result<(), Self::Error>>;
fn len(&self) -> LocalBoxFuture<'static, Result<u64, Self::Error>>;
fn read(
&self, offset: u64, len: usize,
) -> LocalBoxFuture<'static, Result<Box<[u8]>, Self::Error>>;
fn write(
&self, offset: u64, buf: Box<[u8]>,
) -> LocalBoxFuture<'static, Result<(), Self::Error>>;

fn reader(self) -> Reader<Self>
where
Expand All @@ -225,43 +227,43 @@ pub trait Page: Send {
}
}

#[async_trait]
impl<T: ?Sized> Page for &T
where
T: Page + Sync,
T: Page,
{
type Error = T::Error;

fn len(&self) -> u64 {
fn len(&self) -> LocalBoxFuture<'static, Result<u64, Self::Error>> {
(**self).len()
}
fn set_len(&self, len: u64) -> Result<(), Self::Error> {
(**self).set_len(len)
}
fn read(&self, offset: u64, len: usize) -> BoxFuture<'static, Result<Box<[u8]>, Self::Error>> {
fn read(
&self, offset: u64, len: usize,
) -> LocalBoxFuture<'static, Result<Box<[u8]>, Self::Error>> {
(**self).read(offset, len)
}
fn write(&self, offset: u64, buf: Box<[u8]>) -> BoxFuture<'static, Result<(), Self::Error>> {
fn write(
&self, offset: u64, buf: Box<[u8]>,
) -> LocalBoxFuture<'static, Result<(), Self::Error>> {
(**self).write(offset, buf)
}
}
#[async_trait]
impl<T: ?Sized> Page for Arc<T>
where
T: Page + Sync,
T: Page,
{
type Error = T::Error;

fn len(&self) -> u64 {
fn len(&self) -> LocalBoxFuture<'static, Result<u64, Self::Error>> {
(**self).len()
}
fn set_len(&self, len: u64) -> Result<(), Self::Error> {
(**self).set_len(len)
}
fn read(&self, offset: u64, len: usize) -> BoxFuture<'static, Result<Box<[u8]>, Self::Error>> {
fn read(
&self, offset: u64, len: usize,
) -> LocalBoxFuture<'static, Result<Box<[u8]>, Self::Error>> {
(**self).read(offset, len)
}
fn write(&self, offset: u64, buf: Box<[u8]>) -> BoxFuture<'static, Result<(), Self::Error>> {
fn write(
&self, offset: u64, buf: Box<[u8]>,
) -> LocalBoxFuture<'static, Result<(), Self::Error>> {
(**self).write(offset, buf)
}
}
Expand All @@ -274,8 +276,7 @@ where
#[pin]
page: P,
#[pin]
pending: Option<BoxFuture<'static, Result<Box<[u8]>, P::Error>>>,
pending_len: Option<usize>,
pending: Option<LocalBoxFuture<'static, Result<Box<[u8]>, P::Error>>>,
offset: u64,
}
#[allow(clippy::len_without_is_empty)]
Expand All @@ -287,13 +288,9 @@ where
Self {
page,
pending: None,
pending_len: None,
offset: 0,
}
}
pub fn len(&self) -> u64 {
self.page.len()
}
}
impl<P> futures::io::AsyncRead for Reader<P>
where
Expand All @@ -305,22 +302,20 @@ where
let mut self_ = self.project();
if self_.pending.is_none() {
let start = *self_.offset;
let len = usize::try_from((self_.page.len() - start).min(buf.len() as u64)).unwrap();
let len = buf.len();
let len = len.min(PAGE_SIZE);
let pending = self_.page.read(start, len);
*self_.pending = Some(pending);
*self_.pending_len = Some(len);
}
let ret = ready!(self_.pending.as_mut().as_pin_mut().unwrap().poll(cx));
*self_.pending = None;
let len = self_.pending_len.take().unwrap();
let ret = ret
.map(|buf_| {
buf[..len].copy_from_slice(&buf_);
len
buf[..buf_.len()].copy_from_slice(&buf_);
buf_.len()
})
.map_err(Into::into);
*self_.offset += u64::try_from(len).unwrap();
*self_.offset += u64::try_from(ret.as_ref().ok().cloned().unwrap_or(0)).unwrap();
Poll::Ready(ret)
}
}
Expand Down