Skip to content

Commit

Permalink
feat(server): change default dispatcher
Browse files Browse the repository at this point in the history
- Deprecates the `no_proto` configuration on `Server`. It is always
  enabled.
- Deprecates all pieces related to tokio-proto.
- Makes the tokio-proto crate optional, and the `server-proto` feature
  can be used to completely remove the dependency. It is enabled by
  default.
  • Loading branch information
seanmonstar committed Dec 29, 2017
1 parent 0892cb2 commit 6ade21a
Show file tree
Hide file tree
Showing 15 changed files with 124 additions and 118 deletions.
7 changes: 4 additions & 3 deletions .travis.yml
Expand Up @@ -8,8 +8,9 @@ matrix:
env: FEATURES="--features nightly"
- rust: beta
- rust: stable
env: HYPER_DOCS=1
- rust: stable
env: HYPER_NO_PROTO=1
env: FEATURES="--no-default-features"
- rust: stable
env: FEATURES="--features compat"
- rust: 1.17.0
Expand Down Expand Up @@ -37,7 +38,7 @@ addons:


after_success:
- '[ $TRAVIS_RUST_VERSION = stable ] &&
- '[ "$HYPER_DOCS" = "1" ] &&
LOCAL="~/.local" && export PATH=$LOCAL/bin:$PATH &&
wget https://github.com/SimonKagstrom/kcov/archive/master.tar.gz &&
tar xzf master.tar.gz && mkdir kcov-master/build && cd kcov-master/build &&
Expand All @@ -51,7 +52,7 @@ after_success:
fi;
done &&
kcov --coveralls-id=$TRAVIS_JOB_ID --merge target/cov target/cov/*'
- '[ $TRAVIS_PULL_REQUEST = false ] && [ $TRAVIS_RUST_VERSION = stable ] &&
- '[ $TRAVIS_PULL_REQUEST = false ] && [ "$HYPER_DOCS" = "1" ] &&
{ [ "$TRAVIS_TAG" != "" ] || [ "$TRAVIS_BRANCH" == "master" ]; } &&
./.travis/docs.sh'

Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Expand Up @@ -32,7 +32,7 @@ percent-encoding = "1.0"
relay = "0.1"
time = "0.1"
tokio-core = "0.1.6"
tokio-proto = "0.1"
tokio-proto = { version = "0.1", optional = true }
tokio-service = "0.1"
tokio-io = "0.1"
unicase = "2.0"
Expand All @@ -48,4 +48,4 @@ default = ["server-proto"]
nightly = []
raw_status = []
compat = [ "http" ]
server-proto = []
server-proto = ["tokio-proto"]
3 changes: 1 addition & 2 deletions examples/hello.rs
Expand Up @@ -19,8 +19,7 @@ fn main() {
.with_body(PHRASE))
}));

let mut server = Http::new().bind(&addr, new_service).unwrap();
server.no_proto();
let server = Http::new().bind(&addr, new_service).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().unwrap();
}
3 changes: 1 addition & 2 deletions examples/params.rs
Expand Up @@ -99,8 +99,7 @@ fn main() {
pretty_env_logger::init().unwrap();
let addr = "127.0.0.1:1337".parse().unwrap();

let mut server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap();
server.no_proto();
let server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().unwrap();
}
3 changes: 1 addition & 2 deletions examples/send_file.rs
Expand Up @@ -135,8 +135,7 @@ fn main() {
pretty_env_logger::init().unwrap();
let addr = "127.0.0.1:1337".parse().unwrap();

let mut server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap();
server.no_proto();
let server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().unwrap();
}
3 changes: 1 addition & 2 deletions examples/server.rs
Expand Up @@ -47,8 +47,7 @@ fn main() {
pretty_env_logger::init().unwrap();
let addr = "127.0.0.1:1337".parse().unwrap();

let mut server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
server.no_proto();
let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().unwrap();
}
2 changes: 1 addition & 1 deletion src/client/mod.rs
Expand Up @@ -198,7 +198,7 @@ where C: Connect,
let pooled = pool.pooled(pool_key, tx);
let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone());
let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
handle.spawn(dispatch.map_err(|err| error!("no_proto error: {}", err)));
handle.spawn(dispatch.map_err(|err| error!("client connection error: {}", err)));
pooled
})
};
Expand Down
17 changes: 7 additions & 10 deletions src/lib.rs
Expand Up @@ -32,6 +32,7 @@ extern crate relay;
extern crate time;
extern crate tokio_core as tokio;
#[macro_use] extern crate tokio_io;
#[cfg(feature = "tokio-proto")]
extern crate tokio_proto;
extern crate tokio_service;
extern crate unicase;
Expand All @@ -55,17 +56,13 @@ pub use proto::RawStatus;

macro_rules! feat_server_proto {
($($i:item)*) => ($(
#[cfg_attr(
not(feature = "server-proto"),
deprecated(
since="0.11.7",
note="server-proto was recently added to default features, but you have disabled default features. A future version will remove these types if the server-proto feature is not enabled."
)
)]
#[cfg_attr(
not(feature = "server-proto"),
allow(deprecated)
#[cfg(feature = "server-proto")]
#[deprecated(
since="0.11.11",
note="All usage of the tokio-proto crate is going away."
)]
#[doc(hidden)]
#[allow(deprecated)]
$i
)*)
}
Expand Down
9 changes: 9 additions & 0 deletions src/mock.rs
Expand Up @@ -87,14 +87,23 @@ impl<T> AsyncIo<T> {
}

impl AsyncIo<Buf> {
#[cfg(feature = "tokio-proto")]
//TODO: fix proto::conn::tests to not use tokio-proto API,
//and then this cfg flag go away
pub fn new_buf<T: Into<Vec<u8>>>(buf: T, bytes: usize) -> AsyncIo<Buf> {
AsyncIo::new(Buf::wrap(buf.into()), bytes)
}

#[cfg(feature = "tokio-proto")]
//TODO: fix proto::conn::tests to not use tokio-proto API,
//and then this cfg flag go away
pub fn new_eof() -> AsyncIo<Buf> {
AsyncIo::new(Buf::wrap(Vec::new().into()), 1)
}

#[cfg(feature = "tokio-proto")]
//TODO: fix proto::conn::tests to not use tokio-proto API,
//and then this cfg flag go away
pub fn flushed(&self) -> bool {
self.flushed
}
Expand Down
96 changes: 58 additions & 38 deletions src/proto/body.rs
@@ -1,11 +1,13 @@
use bytes::Bytes;
use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
use futures::sync::{mpsc, oneshot};
#[cfg(feature = "tokio-proto")]
use tokio_proto;
use std::borrow::Cow;

use super::Chunk;

#[cfg(feature = "tokio-proto")]
pub type TokioBody = tokio_proto::streaming::Body<Chunk, ::Error>;
pub type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;

Expand All @@ -16,33 +18,36 @@ pub struct Body(Inner);

#[derive(Debug)]
enum Inner {
#[cfg(feature = "tokio-proto")]
Tokio(TokioBody),
Hyper {
close_tx: oneshot::Sender<()>,
Chan {
close_tx: oneshot::Sender<bool>,
rx: mpsc::Receiver<Result<Chunk, ::Error>>,
}
},
Once(Option<Chunk>),
Empty,
}

//pub(crate)
#[derive(Debug)]
pub struct ChunkSender {
close_rx: oneshot::Receiver<()>,
close_rx: oneshot::Receiver<bool>,
close_rx_check: bool,
tx: BodySender,
}

impl Body {
/// Return an empty body stream
#[inline]
pub fn empty() -> Body {
Body(Inner::Tokio(TokioBody::empty()))
Body(Inner::Empty)
}

/// Return a body stream with an associated sender half
#[inline]
pub fn pair() -> (mpsc::Sender<Result<Chunk, ::Error>>, Body) {
let (tx, rx) = TokioBody::pair();
let rx = Body(Inner::Tokio(rx));
(tx, rx)
let (tx, rx) = channel();
(tx.tx, rx)
}
}

Expand All @@ -60,13 +65,16 @@ impl Stream for Body {
#[inline]
fn poll(&mut self) -> Poll<Option<Chunk>, ::Error> {
match self.0 {
#[cfg(feature = "tokio-proto")]
Inner::Tokio(ref mut rx) => rx.poll(),
Inner::Hyper { ref mut rx, .. } => match rx.poll().expect("mpsc cannot error") {
Inner::Chan { ref mut rx, .. } => match rx.poll().expect("mpsc cannot error") {
Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))),
Async::Ready(Some(Err(err))) => Err(err),
Async::Ready(None) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
},
Inner::Once(ref mut val) => Ok(Async::Ready(val.take())),
Inner::Empty => Ok(Async::Ready(None)),
}
}
}
Expand All @@ -78,9 +86,10 @@ pub fn channel() -> (ChunkSender, Body) {

let tx = ChunkSender {
close_rx: close_rx,
close_rx_check: true,
tx: tx,
};
let rx = Body(Inner::Hyper {
let rx = Body(Inner::Chan {
close_tx: close_tx,
rx: rx,
});
Expand All @@ -90,9 +99,16 @@ pub fn channel() -> (ChunkSender, Body) {

impl ChunkSender {
pub fn poll_ready(&mut self) -> Poll<(), ()> {
match self.close_rx.poll() {
Ok(Async::Ready(())) | Err(_) => return Err(()),
Ok(Async::NotReady) => (),
if self.close_rx_check {
match self.close_rx.poll() {
Ok(Async::Ready(true)) | Err(_) => return Err(()),
Ok(Async::Ready(false)) => {
// needed to allow converting into a plain mpsc::Receiver
// if it has been, the tx will send false to disable this check
self.close_rx_check = false;
}
Ok(Async::NotReady) => (),
}
}

self.tx.poll_ready().map_err(|_| ())
Expand All @@ -107,63 +123,67 @@ impl ChunkSender {
}
}

// deprecate soon, but can't really deprecate trait impls
#[doc(hidden)]
impl From<Body> for tokio_proto::streaming::Body<Chunk, ::Error> {
#[inline]
fn from(b: Body) -> tokio_proto::streaming::Body<Chunk, ::Error> {
match b.0 {
Inner::Tokio(b) => b,
Inner::Hyper { close_tx, rx } => {
warn!("converting hyper::Body into a tokio_proto Body is deprecated");
::std::mem::forget(close_tx);
rx.into()
feat_server_proto! {
impl From<Body> for tokio_proto::streaming::Body<Chunk, ::Error> {
fn from(b: Body) -> tokio_proto::streaming::Body<Chunk, ::Error> {
match b.0 {
Inner::Tokio(b) => b,
Inner::Chan { close_tx, rx } => {
// disable knowing if the Rx gets dropped, since we cannot
// pass this tx along.
let _ = close_tx.send(false);
rx.into()
},
Inner::Once(Some(chunk)) => TokioBody::from(chunk),
Inner::Once(None) |
Inner::Empty => TokioBody::empty(),
}
}
}
}

// deprecate soon, but can't really deprecate trait impls
#[doc(hidden)]
impl From<tokio_proto::streaming::Body<Chunk, ::Error>> for Body {
#[inline]
fn from(tokio_body: tokio_proto::streaming::Body<Chunk, ::Error>) -> Body {
Body(Inner::Tokio(tokio_body))
impl From<tokio_proto::streaming::Body<Chunk, ::Error>> for Body {
fn from(tokio_body: tokio_proto::streaming::Body<Chunk, ::Error>) -> Body {
Body(Inner::Tokio(tokio_body))
}
}
}

impl From<mpsc::Receiver<Result<Chunk, ::Error>>> for Body {
#[inline]
fn from(src: mpsc::Receiver<Result<Chunk, ::Error>>) -> Body {
TokioBody::from(src).into()
let (tx, _) = oneshot::channel();
Body(Inner::Chan {
close_tx: tx,
rx: src,
})
}
}

impl From<Chunk> for Body {
#[inline]
fn from (chunk: Chunk) -> Body {
TokioBody::from(chunk).into()
Body(Inner::Once(Some(chunk)))
}
}

impl From<Bytes> for Body {
#[inline]
fn from (bytes: Bytes) -> Body {
Body::from(TokioBody::from(Chunk::from(bytes)))
Body::from(Chunk::from(bytes))
}
}

impl From<Vec<u8>> for Body {
#[inline]
fn from (vec: Vec<u8>) -> Body {
Body::from(TokioBody::from(Chunk::from(vec)))
Body::from(Chunk::from(vec))
}
}

impl From<&'static [u8]> for Body {
#[inline]
fn from (slice: &'static [u8]) -> Body {
Body::from(TokioBody::from(Chunk::from(slice)))
Body::from(Chunk::from(slice))
}
}

Expand All @@ -180,14 +200,14 @@ impl From<Cow<'static, [u8]>> for Body {
impl From<String> for Body {
#[inline]
fn from (s: String) -> Body {
Body::from(TokioBody::from(Chunk::from(s.into_bytes())))
Body::from(Chunk::from(s.into_bytes()))
}
}

impl From<&'static str> for Body {
#[inline]
fn from(slice: &'static str) -> Body {
Body::from(TokioBody::from(Chunk::from(slice.as_bytes())))
Body::from(Chunk::from(slice.as_bytes()))
}
}

Expand Down

0 comments on commit 6ade21a

Please sign in to comment.