Skip to content

Commit

Permalink
Finish implementing collect
Browse files Browse the repository at this point in the history
  • Loading branch information
alexcrichton committed May 2, 2016
1 parent 03d60c8 commit 440ab52
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 109 deletions.
203 changes: 145 additions & 58 deletions src/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,79 +3,83 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

use cell::AtomicCell;
use {Future, Callback, PollResult, PollError};
use {Future, Callback, PollResult, PollError, IntoFuture};
use util;

pub struct Collect<I> where I: IntoIterator, I::Item: Future {
pub struct Collect<I> where I: IntoIterator, I::Item: IntoFuture {
state: State<I>,
}

enum State<I> where I: IntoIterator, I::Item: Future {
enum State<I> where I: IntoIterator, I::Item: IntoFuture {
Local {
cur: Option<I::Item>,
cur: Option<<I::Item as IntoFuture>::Future>,
remaining: I::IntoIter,
result: Vec<<I::Item as Future>::Item>,
result: Option<Vec<<I::Item as IntoFuture>::Item>>,
},
Scheduled(Arc<Scheduled<I>>),
Canceled,
Done,
}

struct Scheduled<I> where I: IntoIterator, I::Item: Future {
struct Scheduled<I> where I: IntoIterator, I::Item: IntoFuture {
state: AtomicUsize,
slot: AtomicCell<Option<I::Item>>,
slot: AtomicCell<Option<<I::Item as IntoFuture>::Future>>,
}

const EMPTY: usize = 0;
const FULL: usize = 1;
const CANCEL: usize = 2;
const CANCELED: usize = !0;

pub fn collect<I>(i: I) -> Collect<I>
where I: IntoIterator,
I::Item: Future,
I::Item: IntoFuture,
I::IntoIter: Send + 'static,
{
let mut i = i.into_iter();
Collect {
state: State::Local {
cur: i.next(),
cur: i.next().map(IntoFuture::into_future),
remaining: i,
result: Vec::new(),
result: Some(Vec::new()),
},
}
}

impl<I> Future for Collect<I>
where I: IntoIterator + Send + 'static,
I::IntoIter: Send + 'static,
I::Item: Future,
I::Item: IntoFuture,
{
type Item = Vec<<I::Item as Future>::Item>;
type Error = <I::Item as Future>::Error;
type Item = Vec<<I::Item as IntoFuture>::Item>;
type Error = <I::Item as IntoFuture>::Error;

fn poll(&mut self) -> Option<PollResult<Self::Item, Self::Error>> {
let (cur, remaining, result) = match self.state {
State::Local { ref mut cur, ref mut remaining, ref mut result } => {
(cur, remaining, result)
}
State::Canceled => return Some(Err(PollError::Canceled)),
State::Scheduled(..) => return Some(Err(util::reused())),
State::Scheduled(..) |
State::Done => return Some(Err(util::reused())),
};
loop {
match cur.as_mut().map(Future::poll) {
Some(Some(Ok(i))) => {
result.push(i);
*cur = remaining.next();
result.as_mut().unwrap().push(i);
*cur = remaining.next().map(IntoFuture::into_future);
}
Some(Some(Err(e))) => {
for mut item in remaining {
item.cancel();
for item in remaining {
item.into_future().cancel();
}
return Some(Err(e))
}
Some(None) => return None,

// TODO: two poll() calls should probably panic the second
None => return Some(Ok(mem::replace(result, Vec::new()))),
None => {
match result.take() {
Some(vec) => return Some(Ok(vec)),
None => return Some(Err(util::reused())),
}
}
}
}
}
Expand All @@ -86,7 +90,7 @@ impl<I> Future for Collect<I>
// }
// let remaining = try!(util::opt2poll(self.remaining.as_mut()));
// let mut err = None;
// for mut item in self.cur.take().into_iter().chain(remaining) {
// for item in self.cur.take().into_iter().chain(remaining) {
// if err.is_some() {
// item.cancel();
// } else {
Expand All @@ -104,71 +108,154 @@ impl<I> Future for Collect<I>

fn cancel(&mut self) {
match self.state {
State::Local { ref mut cur, ref mut remaining, ref mut result } => {
State::Local { ref mut cur, ref mut remaining, .. } => {
if let Some(mut cur) = cur.take() {
cur.cancel();
}
for mut future in remaining {
future.cancel();
for future in remaining {
future.into_future().cancel();
}
}
State::Scheduled(ref s) => {
loop {}
}
State::Canceled => return,
State::Scheduled(ref s) => s.cancel(),
State::Canceled | State::Done => return,
}
self.state = State::Canceled;
}

fn schedule<G>(&mut self, g: G)
where G: FnOnce(PollResult<Self::Item, Self::Error>) + Send + 'static
{
let state = match mem::replace(&mut self.state, State::Canceled) {
State::Local { cur, remaining, result } => (cur, remaining, result),
let state = match mem::replace(&mut self.state, State::Done) {
State::Local { cur, remaining, result } => {
(cur, remaining, result)
}
State::Canceled => return g(Err(PollError::Canceled)),
State::Scheduled(s) => {
self.state = State::Scheduled(s);
return g(Err(util::reused()))
}
State::Done => return g(Err(util::reused())),
};
let (cur, remaining, result) = state;
let mut cur = match cur {
let result = match result {
Some(result) => result,
None => return g(Err(util::reused())),
};
let cur = match cur {
Some(cur) => cur,
None => return g(Ok(result)),
};
let state = Arc::new(Scheduled {
state: AtomicUsize::new(EMPTY),
state: AtomicUsize::new(result.len()),
slot: AtomicCell::new(None),
});
let state2 = state.clone();
cur.schedule(move |res| {
drop((remaining, result));
// let item = match res {
// Ok(i) => i,
// Err(e) => {
// for mut future in remaining {
// future.cancel();
// }
// return g(Err(e))
// }
// };
// result.push(item);
// Collect {
// cur: remaining.next(),
// remaining: Some(remaining),
// result: result,
// }.schedule(g);
});
Scheduled::run(&state, cur, remaining, result, g);

*state2.slot.borrow().expect("couldn't pick up slot") = Some(cur);

self.state = State::Scheduled(state2);
self.state = State::Scheduled(state);
}

fn schedule_boxed(&mut self, cb: Box<Callback<Self::Item, Self::Error>>) {
self.schedule(|r| cb.call(r))
}
}

impl<I> Scheduled<I> where I: IntoIterator, I::Item: Future {
impl<I> Scheduled<I>
where I: IntoIterator + Send + 'static,
I::Item: IntoFuture,
I::IntoIter: Send + 'static
{
fn run<G>(state: &Arc<Scheduled<I>>,
mut future: <I::Item as IntoFuture>::Future,
remaining: I::IntoIter,
result: Vec<<I::Item as IntoFuture>::Item>,
g: G)
where G: FnOnce(PollResult<Vec<<I::Item as IntoFuture>::Item>,
<I::Item as IntoFuture>::Error>) + Send + 'static
{
let state2 = state.clone();
let nth = result.len() + 1;
future.schedule(move |res| {
Scheduled::finish(&state2, res, remaining, result, g)
});

match state.state.compare_and_swap(nth - 1, nth, Ordering::SeqCst) {
// Someone canceled in this window, we shouldn't continue so we
// cancel the future.
CANCELED => future.cancel(),

// We have successfully moved forward. Store our future for someone
// else to cancel. Lock acquisition can fail for two reasons:
//
// 1. Our future has finished, and someone else moved the world
// forward, and we're racing with them.
// 2. Someone is canceling this future, and they're trying to see if
// a future is listed.
//
// We treat both cases by canceling our own future (primarily for
// case 2). Canceling in case 1 won't have any affect as the future
// is already finished.
//
// After we store our future, we check the state one last time to
// ensure that if a cancellation came in while we were storing the
// future we actually cancel the future.
n if n <= nth - 1 => {
match state.slot.borrow() {
Some(mut slot) => *slot = Some(future),
None => return future.cancel(),
}

if state.state.load(Ordering::SeqCst) == CANCELED {
let f = state.slot.borrow().and_then(|mut f| f.take());
if let Some(mut f) = f {
f.cancel();
}
}
}

// Ok, looks like the world has moved on beyond us and we're no
// longer needed (our fiture finished)
_ => {}
}
}

fn finish<G>(state: &Arc<Scheduled<I>>,
res: PollResult<<I::Item as IntoFuture>::Item,
<I::Item as IntoFuture>::Error>,
mut remaining: I::IntoIter,
mut result: Vec<<I::Item as IntoFuture>::Item>,
g: G)
where G: FnOnce(PollResult<Vec<<I::Item as IntoFuture>::Item>,
<I::Item as IntoFuture>::Error>) + Send + 'static
{
match res {
Ok(item) => result.push(item),
Err(e) => {
for f in remaining {
f.into_future().cancel();
}
return g(Err(e))
}
}
match remaining.next() {
Some(f) => Scheduled::run(state, f.into_future(), remaining, result, g),
None => return g(Ok(result)),
}
}

fn cancel(&self) {
// Store CANCELED to indicate that we're done for good. Any future calls
// to `run` above will see this and cancel futures, so we just need to
// handle the case that there's a future stored already.
//
// We acquire the lock, and if a future is in there we cancel it. Note
// that acquiring the lock can fail if someone else is already storing a
// new future in it. We're guaranteed, however, that when they unlock
// the lock they'll check the state again and see CANCELED, then
// cancelling their future.
self.state.store(CANCELED, Ordering::SeqCst);
let f = self.slot.borrow().and_then(|mut f| f.take());
if let Some(mut f) = f {
f.cancel();
}
}
}
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ pub use or_else::OrElse;
pub use select::Select;
pub use then::Then;

// mod collect;
// pub use collect::{collect, Collect};
mod collect;
pub use collect::{collect, Collect};

// streams
// pub mod stream;
Expand Down
58 changes: 9 additions & 49 deletions tests/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,6 @@ fn test_empty() {
assert_empty(|| empty().then(|a| a));
}

// #[test]
// fn test_collect() {
// let f_ok1: FutureResult<i32, i32> = Ok(1).into_future();
// let f_ok2: FutureResult<i32, i32> = Ok(2).into_future();
// let f_ok3: FutureResult<i32, i32> = Ok(3).into_future();
// let f_err1: FutureResult<i32, i32> = Err(1).into_future();
//
// assert_eq!(get(collect(vec![f_ok1, f_ok2, f_ok3])), Ok(vec![1, 2, 3]));
// assert_eq!(get(collect(vec![f_ok1, f_err1, f_ok3])), Err(1));
// }

#[test]
fn test_finished() {
assert_done(|| finished(1), ok(1));
Expand All @@ -202,44 +191,6 @@ fn flatten() {
assert_empty(|| empty::<i32, u32>().map(finished).flatten());
}

// #[test]
// fn collect_progress() {
// let (p1, c1) = promise::pair::<i32, i32>();
// let (p2, c2) = promise::pair::<i32, i32>();
// let f = collect(vec![p1, p2]);
// let f = assert_empty(f);
// c1.finish(1);
// let f = assert_empty(assert_empty(f));
// c2.finish(2);
// assert_eq!(f.await(), Ok(vec![1, 2]));
//
// let (p1, c1) = promise::pair::<i32, i32>();
// let (p2, c2) = promise::pair::<i32, i32>();
// let (tx, rx) = channel();
// collect(vec![p1, p2]).schedule(move |r| tx.send(r).unwrap());
// assert!(rx.try_recv().is_err());
// c1.finish(1);
// assert!(rx.try_recv().is_err());
// c2.finish(2);
// assert_eq!(rx.recv().unwrap(), Ok(vec![1, 2]));
//
// let (p1, c1) = promise::pair::<i32, i32>();
// let (p2, c2) = promise::pair::<i32, i32>();
// let (tx, rx) = channel();
// collect(vec![p1, p2]).schedule(move |r| tx.send(r).unwrap());
// assert!(rx.try_recv().is_err());
// c1.finish(1);
// assert!(rx.try_recv().is_err());
// c2.fail(2);
// assert_eq!(rx.recv().unwrap(), Err(2));
//
// let (p1, c1) = promise::pair::<i32, i32>();
// let (p2, _c2) = promise::pair::<i32, i32>();
// let f = collect(vec![p1, p2]);
// c1.fail(1);
// assert_eq!(get(f), Err(1));
// }

#[test]
fn smoke_promise() {
assert_done(|| {
Expand Down Expand Up @@ -376,3 +327,12 @@ fn cancel_propagates() {
let mut f = promise::<i32, u32>().0.map_err(|_| panic!());
assert_cancel(f.poll().unwrap());
}

#[test]
fn collect_collects() {
assert_done(|| collect(vec![f_ok(1), f_ok(2)]), Ok(vec![1, 2]));
assert_done(|| collect(vec![f_ok(1)]), Ok(vec![1]));
assert_done(|| collect(Vec::<Result<i32, u32>>::new()), Ok(vec![]));

// TODO: needs more tests
}

0 comments on commit 440ab52

Please sign in to comment.