Skip to content

Commit

Permalink
Use either instead of PartitionBool for partition_map
Browse files Browse the repository at this point in the history
  • Loading branch information
drewkett committed Sep 8, 2021
1 parent a32eb02 commit db1a3cb
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 61 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -6,5 +6,6 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
either = "1.6"
futures = "0.3"
pin-project-lite = "0.2"
16 changes: 9 additions & 7 deletions src/lib.rs
@@ -1,11 +1,13 @@
mod partition;
mod partition_map;

use futures::Stream;
pub(crate) use partition::Partition;
pub use partition::{FalsePartition, TruePartition};
pub(crate) use partition_map::PartitionMap;
pub use partition_map::{FalsePartitionMap, PartitionBool, TruePartitionMap};
pub use partition_map::{LeftPartitionMap, RightPartitionMap};

pub use either::Either;
use futures::Stream;

pub trait ParititonStream: Stream + Sized {
fn partition<P>(
Expand All @@ -28,15 +30,15 @@ pub trait ParititonStream: Stream + Sized {
self,
predicate: P,
) -> (
TruePartitionMap<Self::Item, T, F, Self, P>,
FalsePartitionMap<Self::Item, T, F, Self, P>,
LeftPartitionMap<Self::Item, T, F, Self, P>,
RightPartitionMap<Self::Item, T, F, Self, P>,
)
where
P: Fn(Self::Item) -> PartitionBool<T, F>,
P: Fn(Self::Item) -> Either<T, F>,
{
let stream = PartitionMap::new(self, predicate);
let true_stream = TruePartitionMap::new(stream.clone());
let false_stream = FalsePartitionMap::new(stream);
let true_stream = LeftPartitionMap::new(stream.clone());
let false_stream = RightPartitionMap::new(stream);
(true_stream, false_stream)
}
}
104 changes: 50 additions & 54 deletions src/partition_map.rs
Expand Up @@ -5,74 +5,70 @@ use std::{
task::{Poll, Waker},
};

use either::Either;
use futures::Stream;
use pin_project_lite::pin_project;

pub enum PartitionBool<T, F> {
True(T),
False(F),
}

pin_project! {
pub(crate) struct PartitionMap<I, T, F, S, P> {
buf_true: Option<T>,
buf_false: Option<F>,
waker_true: Option<Waker>,
waker_false: Option<Waker>,
pub(crate) struct PartitionMap<I, L, R, S, P> {
buf_left: Option<L>,
buf_right: Option<R>,
waker_left: Option<Waker>,
waker_right: Option<Waker>,
#[pin]
stream: S,
predicate: P,
item: PhantomData<I>
}
}

impl<I, T, F, S, P> PartitionMap<I, T, F, S, P>
impl<I, L, R, S, P> PartitionMap<I, L, R, S, P>
where
S: Stream<Item = I>,
P: Fn(I) -> PartitionBool<T, F>,
P: Fn(I) -> Either<L, R>,
{
pub(crate) fn new(stream: S, predicate: P) -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(Self {
buf_false: None,
buf_true: None,
waker_false: None,
waker_true: None,
buf_right: None,
buf_left: None,
waker_right: None,
waker_left: None,
stream,
predicate,
item: PhantomData,
}))
}

fn poll_next_true(
fn poll_next_left(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<T>> {
) -> std::task::Poll<Option<L>> {
let this = self.project();
// There should only ever be one waker calling the function
if this.waker_true.is_none() {
*this.waker_true = Some(cx.waker().clone());
if this.waker_left.is_none() {
*this.waker_left = Some(cx.waker().clone());
}
if let Some(item) = this.buf_true.take() {
if let Some(item) = this.buf_left.take() {
// There was already a value in the buffer. Return that value
return Poll::Ready(Some(item));
}
if this.buf_false.is_some() {
if this.buf_right.is_some() {
// There is a value available for the other stream. Wake that
// stream if possible and return pending since we can't store
// multiple values for a stream
if let Some(waker) = this.waker_false {
if let Some(waker) = this.waker_right {
waker.wake_by_ref();
}
return Poll::Pending;
}
match this.stream.poll_next(cx) {
Poll::Ready(Some(item)) => {
match (this.predicate)(item) {
PartitionBool::True(true_item) => Poll::Ready(Some(true_item)),
PartitionBool::False(false_item) => {
Either::Left(left_item) => Poll::Ready(Some(left_item)),
Either::Right(right_item) => {
// This value is not what we wanted. Store it and notify other partition task if it exists
let _ = this.buf_false.replace(false_item);
if let Some(waker) = this.waker_false {
let _ = this.buf_right.replace(right_item);
if let Some(waker) = this.waker_right {
waker.wake_by_ref();
}
Poll::Pending
Expand All @@ -84,40 +80,40 @@ where
}
}

fn poll_next_false(
fn poll_next_right(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<F>> {
) -> std::task::Poll<Option<R>> {
let this = self.project();
// I think there should only ever be one waker calling the function
if this.waker_false.is_none() {
*this.waker_false = Some(cx.waker().clone());
if this.waker_right.is_none() {
*this.waker_right = Some(cx.waker().clone());
}
if let Some(item) = this.buf_false.take() {
if let Some(item) = this.buf_right.take() {
// There was already a value in the buffer. Return that value
return Poll::Ready(Some(item));
}
if this.buf_true.is_some() {
if this.buf_left.is_some() {
// There is a value available for the other stream. Wake that
// stream if possible and return pending since we can't store
// multiple values for a stream
if let Some(waker) = this.waker_true {
if let Some(waker) = this.waker_left {
waker.wake_by_ref();
}
return Poll::Pending;
}
match this.stream.poll_next(cx) {
Poll::Ready(Some(item)) => {
match (this.predicate)(item) {
PartitionBool::True(true_item) => {
Either::Left(left_item) => {
// This value is not what we wanted. Store it and notify other partition task if it exists
let _ = this.buf_true.replace(true_item);
if let Some(waker) = this.waker_true {
let _ = this.buf_left.replace(left_item);
if let Some(waker) = this.waker_left {
waker.wake_by_ref();
}
Poll::Pending
}
PartitionBool::False(false_item) => Poll::Ready(Some(false_item)),
Either::Right(right_item) => Poll::Ready(Some(right_item)),
}
}
Poll::Ready(None) => Poll::Ready(None),
Expand All @@ -127,31 +123,31 @@ where
}

pin_project! {
pub struct TruePartitionMap<I, T, F, S, P> {
pub struct LeftPartitionMap<I, L, R, S, P> {
#[pin]
stream: Arc<Mutex<PartitionMap<I, T, F, S, P>>>,
stream: Arc<Mutex<PartitionMap<I, L, R, S, P>>>,
}
}

impl<I, T, F, S, P> TruePartitionMap<I, T, F, S, P> {
pub(crate) fn new(stream: Arc<Mutex<PartitionMap<I, T, F, S, P>>>) -> Self {
impl<I, L, R, S, P> LeftPartitionMap<I, L, R, S, P> {
pub(crate) fn new(stream: Arc<Mutex<PartitionMap<I, L, R, S, P>>>) -> Self {
Self { stream }
}
}

impl<I, T, F, S, P> Stream for TruePartitionMap<I, T, F, S, P>
impl<I, L, R, S, P> Stream for LeftPartitionMap<I, L, R, S, P>
where
S: Stream<Item = I> + Unpin,
P: Fn(I) -> PartitionBool<T, F>,
P: Fn(I) -> Either<L, R>,
{
type Item = T;
type Item = L;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
let response = if let Ok(mut guard) = this.stream.try_lock() {
PartitionMap::poll_next_true(Pin::new(&mut guard), cx)
PartitionMap::poll_next_left(Pin::new(&mut guard), cx)
} else {
cx.waker().wake_by_ref();
Poll::Pending
Expand All @@ -161,31 +157,31 @@ where
}

pin_project! {
pub struct FalsePartitionMap<I, T, F, S, P> {
pub struct RightPartitionMap<I, L, R, S, P> {
#[pin]
stream: Arc<Mutex<PartitionMap<I, T, F , S, P>>>,
stream: Arc<Mutex<PartitionMap<I, L, R , S, P>>>,
}
}

impl<I, T, F, S, P> FalsePartitionMap<I, T, F, S, P> {
pub(crate) fn new(stream: Arc<Mutex<PartitionMap<I, T, F, S, P>>>) -> Self {
impl<I, L, R, S, P> RightPartitionMap<I, L, R, S, P> {
pub(crate) fn new(stream: Arc<Mutex<PartitionMap<I, L, R, S, P>>>) -> Self {
Self { stream }
}
}

impl<I, T, F, S, P> Stream for FalsePartitionMap<I, T, F, S, P>
impl<I, L, R, S, P> Stream for RightPartitionMap<I, L, R, S, P>
where
S: Stream<Item = I> + Unpin,
P: Fn(I) -> PartitionBool<T, F>,
P: Fn(I) -> Either<L, R>,
{
type Item = F;
type Item = R;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
let response = if let Ok(mut guard) = this.stream.try_lock() {
PartitionMap::poll_next_false(Pin::new(&mut guard), cx)
PartitionMap::poll_next_right(Pin::new(&mut guard), cx)
} else {
cx.waker().wake_by_ref();
Poll::Pending
Expand Down

0 comments on commit db1a3cb

Please sign in to comment.