Skip to content

Commit

Permalink
Merge pull request #103 from constellation-rs/join
Browse files Browse the repository at this point in the history
Add left & inner join and limited sort
  • Loading branch information
alecmocatta committed Aug 7, 2020
2 parents e118942 + b6d9119 commit f6f11fb
Show file tree
Hide file tree
Showing 60 changed files with 5,014 additions and 124 deletions.
32 changes: 20 additions & 12 deletions Cargo.toml
Expand Up @@ -2,7 +2,7 @@

[package]
name = "amadeus"
version = "0.3.7"
version = "0.4.0"
license = "Apache-2.0"
authors = ["Alec Mocatta <alec@mocatta.net>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand All @@ -12,8 +12,8 @@ Harmonious distributed data processing & analysis in Rust.
parquet postgres aws s3 cloudfront elb json csv logs hadoop hdfs arrow common crawl
"""
repository = "https://github.com/alecmocatta/amadeus"
homepage = "https://github.com/alecmocatta/amadeus"
repository = "https://github.com/constellation-rs/amadeus"
homepage = "https://github.com/constellation-rs/amadeus"
documentation = "https://docs.rs/amadeus"
readme = "README.md"
edition = "2018"
Expand All @@ -36,18 +36,20 @@ bench = ["serde-csv", "once_cell", "arrow-parquet", "rayon"]
features = ["constellation", "aws", "commoncrawl", "parquet", "postgres", "csv", "json"]

[dependencies]
amadeus-core = { version = "=0.3.7", path = "amadeus-core" }
amadeus-derive = { version = "=0.3.7", path = "amadeus-derive" }
amadeus-types = { version = "=0.3.7", path = "amadeus-types" }
amadeus-aws = { version = "=0.3.7", path = "amadeus-aws", optional = true }
amadeus-commoncrawl = { version = "=0.3.7", path = "amadeus-commoncrawl", optional = true }
amadeus-parquet = { version = "=0.3.7", path = "amadeus-parquet", optional = true }
amadeus-postgres = { version = "=0.3.7", path = "amadeus-postgres", optional = true }
amadeus-serde = { version = "=0.3.7", path = "amadeus-serde", optional = true }
amadeus-core = { version = "=0.4.0", path = "amadeus-core" }
amadeus-derive = { version = "=0.4.0", path = "amadeus-derive" }
amadeus-types = { version = "=0.4.0", path = "amadeus-types" }
amadeus-aws = { version = "=0.4.0", path = "amadeus-aws", optional = true }
amadeus-commoncrawl = { version = "=0.4.0", path = "amadeus-commoncrawl", optional = true }
amadeus-parquet = { version = "=0.4.0", path = "amadeus-parquet", optional = true }
amadeus-postgres = { version = "=0.4.0", path = "amadeus-postgres", optional = true }
amadeus-serde = { version = "=0.4.0", path = "amadeus-serde", optional = true }
amadeus-streaming = { version = "=0.4.0", path = "amadeus-streaming" }
async-channel = "1.1"
bincode = { version = "1.3", optional = true }
constellation-rs = { version = "0.2.0-alpha.2", default-features = false, optional = true }
derive-new = "0.5"
event-listener = "=2.3.1" # https://github.com/stjepang/event-listener/issues/9
futures = "0.3"
num_cpus = "1.13"
pin-project = "0.4"
Expand All @@ -72,7 +74,6 @@ doc-comment = "0.3"
either = { version = "1.5", features = ["serde"] }
rand = "0.7"
serde_json = "1.0"
streaming_algorithms = "0.3"
tokio = { version = "0.2", features = ["macros", "time"] }

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
Expand All @@ -81,6 +82,13 @@ wasm-bindgen-test = "0.3"
[build-dependencies]
rustversion = "1.0"

[profile.bench]
codegen-units = 1
debug = 2
incremental = false
lto = true
# panic = "abort" # this is disallowed by cargo currently

[[example]]
name = "cloudfront_logs"
required-features = ["aws"]
Expand Down
10 changes: 5 additions & 5 deletions amadeus-aws/Cargo.toml
@@ -1,15 +1,15 @@
[package]
name = "amadeus-aws"
version = "0.3.7"
version = "0.4.0"
license = "Apache-2.0"
authors = ["Alec Mocatta <alec@mocatta.net>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
keywords = ["amadeus", "data", "aws", "s3", "logs"]
description = """
Harmonious distributed data analysis in Rust.
"""
repository = "https://github.com/alecmocatta/amadeus"
homepage = "https://github.com/alecmocatta/amadeus"
repository = "https://github.com/constellation-rs/amadeus"
homepage = "https://github.com/constellation-rs/amadeus"
documentation = "https://docs.rs/amadeus"
readme = "README.md"
edition = "2018"
Expand All @@ -19,8 +19,8 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "2
maintenance = { status = "actively-developed" }

[dependencies]
amadeus-core = { version = "=0.3.7", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.7", path = "../amadeus-types" }
amadeus-core = { version = "=0.4.0", path = "../amadeus-core" }
amadeus-types = { version = "=0.4.0", path = "../amadeus-types" }
async-compression = { version = "0.3.3", features = ["gzip", "futures-bufread"] }
async-trait = "0.1"
chrono = { version = "0.4", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion amadeus-aws/README.md
@@ -1,3 +1,3 @@
# amadeus-aws

This subcrate of the [`amadeus`](https://github.com/alecmocatta/amadeus) project includes a filesystem backend for S3 and a source for AWS Cloudfront logs.
This subcrate of the [`amadeus`](https://github.com/constellation-rs/amadeus) project includes a filesystem backend for S3 and a source for AWS Cloudfront logs.
2 changes: 1 addition & 1 deletion amadeus-aws/src/lib.rs
Expand Up @@ -6,7 +6,7 @@
//!
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html).

#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.3.7")]
#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.4.0")]
#![cfg_attr(nightly, feature(type_alias_impl_trait))]
#![warn(
// missing_copy_implementations,
Expand Down
10 changes: 5 additions & 5 deletions amadeus-commoncrawl/Cargo.toml
@@ -1,15 +1,15 @@
[package]
name = "amadeus-commoncrawl"
version = "0.3.7"
version = "0.4.0"
license = "MIT OR Apache-2.0"
authors = ["Stephen Becker IV <github@deathbyescalator.com>", "Alec Mocatta <alec@mocatta.net>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
keywords = ["amadeus", "data", "commoncrawl", "web", "crawl"]
description = """
Harmonious distributed data analysis in Rust.
"""
repository = "https://github.com/alecmocatta/amadeus"
homepage = "https://github.com/alecmocatta/amadeus"
repository = "https://github.com/constellation-rs/amadeus"
homepage = "https://github.com/constellation-rs/amadeus"
documentation = "https://docs.rs/amadeus"
readme = "README.md"
edition = "2018"
Expand All @@ -19,8 +19,8 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "2
maintenance = { status = "actively-developed" }

[dependencies]
amadeus-core = { version = "=0.3.7", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.7", path = "../amadeus-types" }
amadeus-core = { version = "=0.4.0", path = "../amadeus-core" }
amadeus-types = { version = "=0.4.0", path = "../amadeus-types" }
async-compression = { version = "0.3.3", features = ["gzip", "futures-bufread"] }
futures = "0.3"
nom = "4.2.3"
Expand Down
2 changes: 1 addition & 1 deletion amadeus-commoncrawl/README.md
@@ -1,3 +1,3 @@
# amadeus-commoncrawl

This subcrate of the [`amadeus`](https://github.com/alecmocatta/amadeus) project includes a source for the CommonCrawl datasets.
This subcrate of the [`amadeus`](https://github.com/constellation-rs/amadeus) project includes a source for the CommonCrawl datasets.
2 changes: 1 addition & 1 deletion amadeus-commoncrawl/src/lib.rs
Expand Up @@ -6,7 +6,7 @@
//!
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html).

#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.3.7")]
#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.4.0")]
#![cfg_attr(nightly, feature(type_alias_impl_trait))]
#![warn(
// missing_copy_implementations,
Expand Down
9 changes: 5 additions & 4 deletions amadeus-core/Cargo.toml
@@ -1,15 +1,15 @@
[package]
name = "amadeus-core"
version = "0.3.7"
version = "0.4.0"
license = "Apache-2.0"
authors = ["Alec Mocatta <alec@mocatta.net>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
keywords = ["amadeus", "distributed", "data-science", "data", "logs"]
description = """
Harmonious distributed data analysis in Rust.
"""
repository = "https://github.com/alecmocatta/amadeus"
homepage = "https://github.com/alecmocatta/amadeus"
repository = "https://github.com/constellation-rs/amadeus"
homepage = "https://github.com/constellation-rs/amadeus"
documentation = "https://docs.rs/amadeus"
readme = "README.md"
edition = "2018"
Expand All @@ -19,20 +19,21 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "2
maintenance = { status = "actively-developed" }

[dependencies]
amadeus-streaming = { version = "=0.4.0", path = "../amadeus-streaming" }
async-trait = "0.1"
derive-new = "0.5"
educe = "0.4"
either = { version = "1.5", features = ["serde"] }
futures = "0.3"
indexmap = { version = "1.5", features = ["serde-1"] }
itertools = "0.9"
multimap = "0.8"
owned_chars = "0.3"
pin-project = "0.4"
rand = "0.7"
replace_with = "0.1"
serde = { version = "1.0", features = ["derive"] }
serde_closure = "0.3"
streaming_algorithms = "0.3"
sum = { version = "0.1", features = ["futures", "serde"] }
tokio = { version = "0.2", features = ["blocking", "rt-core"] }
walkdir = "2.2"
Expand Down
2 changes: 1 addition & 1 deletion amadeus-core/README.md
@@ -1,3 +1,3 @@
# amadeus-core

This subcrate of the [`amadeus`](https://github.com/alecmocatta/amadeus) project includes fundamental definitions including `DistributedIterator` and `ProcessPool`.
This subcrate of the [`amadeus`](https://github.com/constellation-rs/amadeus) project includes fundamental definitions including `DistributedIterator` and `ProcessPool`.
2 changes: 1 addition & 1 deletion amadeus-core/src/lib.rs
Expand Up @@ -6,7 +6,7 @@
//!
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. All functionality is re-exposed in [`amadeus`](https://docs.rs/amadeus/0.3/amadeus/).

#![doc(html_root_url = "https://docs.rs/amadeus-core/0.3.7")]
#![doc(html_root_url = "https://docs.rs/amadeus-core/0.4.0")]
#![cfg_attr(nightly, feature(unboxed_closures))]
#![recursion_limit = "25600"]
#![warn(
Expand Down
32 changes: 32 additions & 0 deletions amadeus-core/src/par_pipe.rs
Expand Up @@ -82,6 +82,28 @@ macro_rules! pipe {
$assert_pipe(Cloned::new(self))
}

#[inline]
fn left_join<K, V1, V2>(self, right: impl IntoIterator<Item = (K, V2)>) -> LeftJoin<Self, K, V1, V2>
where
K: Eq + Hash + Clone + $send + 'static,
V1: 'static,
V2: Clone + $send + 'static,
Self: $pipe<Input, Output = (K, V1)> + Sized,
{
$assert_pipe(LeftJoin::new(self, right.into_iter().collect()))
}

#[inline]
fn inner_join<K, V1, V2>(self, right: impl IntoIterator<Item = (K, V2)>) -> InnerJoin<Self, K, V1, V2>
where
K: Eq + Hash + Clone + $send + 'static,
V1: 'static,
V2: Clone + $send + 'static,
Self: $pipe<Input, Output = (K, V1)> + Sized,
{
$assert_pipe(InnerJoin::new(self, right.into_iter().collect()))
}

// #[must_use]
// fn chain<C>(self, chain: C) -> Chain<Self, C::Iter>
// where
Expand Down Expand Up @@ -155,6 +177,16 @@ macro_rules! pipe {
$assert_sink(Histogram::new(self))
}

#[inline]
fn sort_n_by<F>(self, n: usize, cmp: F) -> Sort<Self, F>
where
F: $fns::Fn(&Self::Output, &Self::Output) -> Ordering + Clone + $send + 'static,
Self::Output: Clone + $send + 'static,
Self: Sized,
{
$assert_sink(Sort::new(self, cmp, n))
}

#[inline]
fn count(self) -> Count<Self>
where
Expand Down
53 changes: 51 additions & 2 deletions amadeus-core/src/par_sink/sample.rs
@@ -1,10 +1,13 @@
#![allow(clippy::type_complexity)]

use amadeus_streaming::{
HyperLogLogMagnitude, SampleUnstable as SASampleUnstable, Sort as SASort, Top
};
use derive_new::new;
use rand::thread_rng;
use serde::{Deserialize, Serialize};
use std::hash::Hash;
use streaming_algorithms::{HyperLogLogMagnitude, SampleUnstable as SASampleUnstable, Top};
use serde_closure::traits;
use std::{cmp::Ordering, hash::Hash};

use super::{
folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink, SumFolder, SumZeroFolder
Expand Down Expand Up @@ -48,6 +51,52 @@ impl<Item> FolderSync<Item> for SampleUnstableFolder {
}
}

#[derive(new)]
#[must_use]
pub struct Sort<P, F> {
pipe: P,
f: F,
n: usize,
}

impl_par_dist! {
#[cfg_attr(not(nightly), serde_closure::desugar)]
impl<P: ParallelPipe<Item>, F, Item> ParallelSink<Item> for Sort<P, F>
where
F: traits::Fn(&P::Output, &P::Output) -> Ordering + Clone + Send + 'static,
P::Output: Clone + Send + 'static,
{
folder_par_sink!(
SortFolder<F>,
SumZeroFolder<SASort<P::Output, F>>,
self,
SortFolder::new(self.f.clone(), self.n),
SumZeroFolder::new(SASort::new(self.f, self.n))
);
}
}

#[derive(Clone, Serialize, Deserialize, new)]
pub struct SortFolder<F> {
f: F,
n: usize,
}

#[cfg_attr(not(nightly), serde_closure::desugar)]
impl<Item, F> FolderSync<Item> for SortFolder<F>
where
F: traits::Fn(&Item, &Item) -> Ordering + Clone,
{
type Done = SASort<Item, F>;

fn zero(&mut self) -> Self::Done {
SASort::new(self.f.clone(), self.n)
}
fn push(&mut self, state: &mut Self::Done, item: Item) {
state.push(item)
}
}

#[derive(new)]
#[must_use]
pub struct MostFrequent<P> {
Expand Down
15 changes: 8 additions & 7 deletions amadeus-core/src/par_sink/sum.rs
@@ -1,8 +1,8 @@
use derive_new::new;
use educe::Educe;
use replace_with::replace_with_or_abort;
use replace_with::{replace_with, replace_with_or_abort};
use serde::{Deserialize, Serialize};
use std::{iter, marker::PhantomData, mem};
use std::{iter, marker::PhantomData};

use super::{folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink};

Expand Down Expand Up @@ -43,14 +43,15 @@ where

#[inline(always)]
fn zero(&mut self) -> Self::Done {
iter::empty::<B>().sum()
B::sum(iter::empty::<B>())
}
#[inline(always)]
fn push(&mut self, state: &mut Self::Done, item: Item) {
let zero = iter::empty::<B>().sum();
let left = mem::replace(state, zero);
let right = iter::once(item).sum::<B>();
*state = B::sum(iter::once(left).chain(iter::once(right)));
let default = || B::sum(iter::empty::<B>());
replace_with(state, default, |left| {
let right = iter::once(item).sum::<B>();
B::sum(iter::once(left).chain(iter::once(right)))
})
}
}

Expand Down

0 comments on commit f6f11fb

Please sign in to comment.