Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions src/client/cancel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use futures::{Async, Future, Poll};
use futures::task::{self, Task};

use common::Never;

use self::lock::Lock;

#[derive(Clone)]
pub struct Cancel {
inner: Arc<Inner>,
}

pub struct Canceled {
inner: Arc<Inner>,
}

struct Inner {
is_canceled: AtomicBool,
task: Lock<Option<Task>>,
}

impl Cancel {
pub fn new() -> (Cancel, Canceled) {
let inner = Arc::new(Inner {
is_canceled: AtomicBool::new(false),
task: Lock::new(None),
});
let inner2 = inner.clone();
(
Cancel {
inner: inner,
},
Canceled {
inner: inner2,
},
)
}

pub fn cancel(&self) {
if !self.inner.is_canceled.swap(true, Ordering::SeqCst) {
if let Some(mut locked) = self.inner.task.try_lock() {
if let Some(task) = locked.take() {
task.notify();
}
}
// if we couldn't take the lock, Canceled was trying to park.
// After parking, it will check is_canceled one last time,
// so we can just stop here.
}
}

pub fn is_canceled(&self) -> bool {
self.inner.is_canceled.load(Ordering::SeqCst)
}
}

impl Future for Canceled {
type Item = ();
type Error = Never;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.inner.is_canceled.load(Ordering::SeqCst) {
Ok(Async::Ready(()))
} else {
if let Some(mut locked) = self.inner.task.try_lock() {
if locked.is_none() {
// it's possible a Cancel just tried to cancel on another thread,
// and we just missed it. Once we have the lock, we should check
// one more time before parking this task and going away.
if self.inner.is_canceled.load(Ordering::SeqCst) {
return Ok(Async::Ready(()));
}
*locked = Some(task::current());
}
Ok(Async::NotReady)
} else {
// if we couldn't take the lock, then a Cancel taken has it.
// The *ONLY* reason is because it is in the process of canceling.
Ok(Async::Ready(()))
}
}
}
}

impl Drop for Canceled {
fn drop(&mut self) {
self.inner.is_canceled.store(true, Ordering::SeqCst);
}
}


// a sub module just to protect unsafety
mod lock {
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};

pub struct Lock<T> {
is_locked: AtomicBool,
value: UnsafeCell<T>,
}

impl<T> Lock<T> {
pub fn new(val: T) -> Lock<T> {
Lock {
is_locked: AtomicBool::new(false),
value: UnsafeCell::new(val),
}
}

pub fn try_lock(&self) -> Option<Locked<T>> {
if !self.is_locked.swap(true, Ordering::SeqCst) {
Some(Locked { lock: self })
} else {
None
}
}
}

unsafe impl<T: Send> Send for Lock<T> {}
unsafe impl<T: Send> Sync for Lock<T> {}

pub struct Locked<'a, T: 'a> {
lock: &'a Lock<T>,
}

impl<'a, T> Deref for Locked<'a, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.lock.value.get() }
}
}

impl<'a, T> DerefMut for Locked<'a, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.lock.value.get() }
}
}

impl<'a, T> Drop for Locked<'a, T> {
fn drop(&mut self) {
self.lock.is_locked.store(false, Ordering::SeqCst);
}
}
}
73 changes: 73 additions & 0 deletions src/client/dispatch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use futures::{Async, Future, Poll, Stream};
use futures::sync::{mpsc, oneshot};

use common::Never;
use super::cancel::{Cancel, Canceled};

pub type Callback<U> = oneshot::Sender<::Result<U>>;
pub type Promise<U> = oneshot::Receiver<::Result<U>>;

pub fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
let (tx, rx) = mpsc::unbounded();
let (cancel, canceled) = Cancel::new();
let tx = Sender {
cancel: cancel,
inner: tx,
};
let rx = Receiver {
canceled: canceled,
inner: rx,
};
(tx, rx)
}

pub struct Sender<T, U> {
cancel: Cancel,
inner: mpsc::UnboundedSender<(T, Callback<U>)>,
}

impl<T, U> Sender<T, U> {
pub fn is_closed(&self) -> bool {
self.cancel.is_canceled()
}

pub fn cancel(&self) {
self.cancel.cancel();
}

pub fn send(&self, val: T) -> Result<Promise<U>, T> {
let (tx, rx) = oneshot::channel();
self.inner.unbounded_send((val, tx))
.map(move |_| rx)
.map_err(|e| e.into_inner().0)
}
}

impl<T, U> Clone for Sender<T, U> {
fn clone(&self) -> Sender<T, U> {
Sender {
cancel: self.cancel.clone(),
inner: self.inner.clone(),
}
}
}

pub struct Receiver<T, U> {
canceled: Canceled,
inner: mpsc::UnboundedReceiver<(T, Callback<U>)>,
}

impl<T, U> Stream for Receiver<T, U> {
type Item = (T, Callback<U>);
type Error = Never;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Async::Ready(()) = self.canceled.poll()? {
return Ok(Async::Ready(None));
}
self.inner.poll()
.map_err(|()| unreachable!("mpsc never errors"))
}
}

//TODO: Drop for Receiver should consume inner
78 changes: 33 additions & 45 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
//! HTTP Client

use std::cell::{Cell, RefCell};
use std::cell::Cell;
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::rc::Rc;
use std::time::Duration;

use futures::{Future, Poll, Stream};
use futures::future::{self, Executor};
use futures::{Async, Future, Poll, Stream};
use futures::future::{self, Either, Executor};
#[cfg(feature = "compat")]
use http;
use tokio::reactor::Handle;
Expand All @@ -28,7 +28,10 @@ pub use self::connect::{HttpConnector, Connect};

use self::background::{bg, Background};

mod cancel;
mod connect;
//TODO(easy): move cancel and dispatch into common instead
pub(crate) mod dispatch;
mod dns;
mod pool;
#[cfg(feature = "compat")]
Expand Down Expand Up @@ -189,20 +192,16 @@ where C: Connect,
head.headers.set_pos(0, host);
}

use futures::Sink;
use futures::sync::{mpsc, oneshot};

let checkout = self.pool.checkout(domain.as_ref());
let connect = {
let executor = self.executor.clone();
let pool = self.pool.clone();
let pool_key = Rc::new(domain.to_string());
self.connector.connect(url)
.and_then(move |io| {
// 1 extra slot for possible Close message
let (tx, rx) = mpsc::channel(1);
let (tx, rx) = dispatch::channel();
let tx = HyperClient {
tx: RefCell::new(tx),
tx: tx,
should_close: Cell::new(true),
};
let pooled = pool.pooled(pool_key, tx);
Expand All @@ -225,33 +224,26 @@ where C: Connect,
});

let resp = race.and_then(move |client| {
use proto::dispatch::ClientMsg;

let (callback, rx) = oneshot::channel();
client.should_close.set(false);

match client.tx.borrow_mut().start_send(ClientMsg::Request(head, body, callback)) {
Ok(_) => (),
Err(e) => match e.into_inner() {
ClientMsg::Request(_, _, callback) => {
error!("pooled connection was not ready, this is a hyper bug");
let err = io::Error::new(
io::ErrorKind::BrokenPipe,
"pool selected dead connection",
);
let _ = callback.send(Err(::Error::Io(err)));
},
_ => unreachable!("ClientMsg::Request was just sent"),
match client.tx.send((head, body)) {
Ok(rx) => {
client.should_close.set(false);
Either::A(rx.then(|res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
Err(_) => panic!("dispatch dropped without returning error"),
}
}))
},
Err(_) => {
error!("pooled connection was not ready, this is a hyper bug");
let err = io::Error::new(
io::ErrorKind::BrokenPipe,
"pool selected dead connection",
);
Either::B(future::err(::Error::Io(err)))
}
}

rx.then(|res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
Err(_) => panic!("dispatch dropped without returning error"),
}
})
});

FutureResponse(Box::new(resp))
Expand All @@ -276,13 +268,8 @@ impl<C, B> fmt::Debug for Client<C, B> {
}

struct HyperClient<B> {
// A sentinel that is usually always true. If this is dropped
// while true, this will try to shutdown the dispatcher task.
//
// This should be set to false whenever it is checked out of the
// pool and successfully used to send a request.
should_close: Cell<bool>,
tx: RefCell<::futures::sync::mpsc::Sender<proto::dispatch::ClientMsg<B>>>,
tx: dispatch::Sender<proto::dispatch::ClientMsg<B>, ::Response>,
}

impl<B> Clone for HyperClient<B> {
Expand All @@ -296,18 +283,19 @@ impl<B> Clone for HyperClient<B> {

impl<B> self::pool::Ready for HyperClient<B> {
fn poll_ready(&mut self) -> Poll<(), ()> {
self.tx
.borrow_mut()
.poll_ready()
.map_err(|_| ())
if self.tx.is_closed() {
Err(())
} else {
Ok(Async::Ready(()))
}
}
}

impl<B> Drop for HyperClient<B> {
fn drop(&mut self) {
if self.should_close.get() {
self.should_close.set(false);
let _ = self.tx.borrow_mut().try_send(proto::dispatch::ClientMsg::Close);
self.tx.cancel();
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub use self::str::ByteStr;

mod str;

pub enum Never {}
Loading