From 0b5c2b8da62b798b182bc31eb3406f0eb040cb5f Mon Sep 17 00:00:00 2001 From: "Herman J. Radtke III" Date: Fri, 14 Jul 2017 08:22:19 -0700 Subject: [PATCH] Include support for read and write timeouts --- Cargo.lock | 14 ++++++ Cargo.toml | 9 ++-- README.md | 4 ++ examples/client.rs | 54 +++++++++++++++++++++++ src/lib.rs | 107 +++++++++++++++++++++++++++++++++------------ 5 files changed, 157 insertions(+), 31 deletions(-) create mode 100644 examples/client.rs diff --git a/Cargo.lock b/Cargo.lock index 518d37e..a4cf414 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,7 @@ dependencies = [ "native-tls 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io-timeout 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -424,6 +425,18 @@ dependencies = [ "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tokio-io-timeout" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tokio-proto" version = "0.1.1" @@ -538,6 +551,7 @@ dependencies = [ "checksum time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)" = "d5d788d3aa77bc0ef3e9621256885555368b47bd495c13dd2e7413c89f845520" "checksum tokio-core 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "6a20ba4738d283cac7495ca36e045c80c2a8df3e05dd0909b17a06646af5a7ed" "checksum tokio-io 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c2c3ce9739f7387a0fa65b5421e81feae92e04d603f008898f4257790ce8c2db" +"checksum tokio-io-timeout 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c23bb67562a1fafa11947d444fdab978b949961ab656e2b13f7c9d531850bd6f" "checksum tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fbb47ae81353c63c487030659494b295f6cb6576242f907f203473b191b0389" "checksum tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24da22d077e0f15f55162bdbdc661228c1581892f52074fb242678d015b45162" "checksum tokio-tls 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d88e411cac1c87e405e4090be004493c5d8072a370661033b1a64ea205ec2e13" diff --git a/Cargo.toml b/Cargo.toml index 8ff64d8..300e43c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,12 +4,13 @@ version = "0.1.0" authors = ["Herman J. Radtke III "] [dependencies] -futures = "0.1.11" -hyper = "0.11.0" +futures = "0.1" +hyper = "0.11" tokio-core = "0.1" tokio-io = "0.1" -tokio-service = "0.1.0" +tokio-service = "0.1" +tokio-io-timeout = "0.1" [dev-dependencies] native-tls = "0.1" -hyper-tls = "0.1.1" +hyper-tls = "0.1" diff --git a/README.md b/README.md index 7427551..86fea72 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,8 @@ At the time this crate was created, hyper does not support timeouts. There is a There is a `TimeoutConnector` that implements the `hyper::Connect` trait. This connector wraps around `HttpConnector` or `HttpsConnector` values and provides timeouts. +**Note:** Because of the way `tokio_proto::ClientProto` works, a read or write timeout will return a _broken pipe_ error. + ## Usage First, add this to your `Cargo.toml`: @@ -28,6 +30,8 @@ Next, add this to your crate: extern crate hyper_timeout; ``` +See the [client example](./examples/client.rs) for a working example. + ## License Licensed under either of diff --git a/examples/client.rs b/examples/client.rs new file mode 100644 index 0000000..1151451 --- /dev/null +++ b/examples/client.rs @@ -0,0 +1,54 @@ +extern crate futures; +extern crate tokio_core; +extern crate hyper; +extern crate hyper_tls; +extern crate hyper_timeout; + +use std::env; +use std::time::Duration; + +use futures::Future; +use futures::stream::Stream; + +use hyper::Client; + +//use hyper::client::HttpConnector; +use hyper_tls::HttpsConnector; + +use hyper_timeout::TimeoutConnector; + +fn main() { + + let url = match env::args().nth(1) { + Some(url) => url, + None => { + println!("Usage: client "); + return; + } + }; + + let url = url.parse::().unwrap(); + + let mut core = tokio_core::reactor::Core::new().unwrap(); + let handle = core.handle(); + + // This example uses `HttpsConnector`, but you can also use the default hyper `HttpConnector` + //let connector = HttpConnector::new(4, &handle); + let connector = HttpsConnector::new(4, &handle).unwrap(); + let mut tm = TimeoutConnector::new(connector, &handle); + tm.set_connect_timeout(Some(Duration::from_secs(5))); + tm.set_read_timeout(Some(Duration::from_secs(5))); + tm.set_write_timeout(Some(Duration::from_secs(5))); + let client = Client::configure().connector(tm).build(&handle); + + let get = client.get(url).and_then(|res| { + println!("Response: {}", res.status()); + println!("Headers: \n{}", res.headers()); + + res.body().concat2() + }); + + let got = core.run(get).unwrap(); + let output = String::from_utf8_lossy(&got); + println!("{}", output); +} diff --git a/src/lib.rs b/src/lib.rs index d075457..2366225 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ extern crate futures; extern crate tokio_core; extern crate tokio_io; extern crate tokio_service; +extern crate tokio_io_timeout; extern crate hyper; use std::time::Duration; @@ -12,6 +13,7 @@ use futures::future::{Either, Future}; use tokio_core::reactor::{Handle, Timeout}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_service::Service; +use tokio_io_timeout::TimeoutStream; use hyper::client::Connect; @@ -23,46 +25,94 @@ pub struct TimeoutConnector { /// Handle to be used to set the timeout within tokio's core handle: Handle, /// Amount of time to wait connecting - connect_timeout: Duration, + connect_timeout: Option, + /// Amount of time to wait reading response + read_timeout: Option, + /// Amount of time to wait writing request + write_timeout: Option, } impl TimeoutConnector { /// Construct a new TimeoutConnector with a given connector implementing the `Connect` trait - pub fn new(connector: T, handle: &Handle, timeout: Duration) -> Self { + pub fn new(connector: T, handle: &Handle) -> Self { TimeoutConnector { connector: connector, handle: handle.clone(), - connect_timeout: timeout, + connect_timeout: None, + read_timeout: None, + write_timeout: None, } } + + /// Set the timeout for connecting to a URL. + /// + /// Default is no timeout. + #[inline] + pub fn set_connect_timeout(&mut self, val: Option) { + self.connect_timeout = val; + } + + /// Set the timeout for the response. + /// + /// Default is no timeout. + #[inline] + pub fn set_read_timeout(&mut self, val: Option) { + self.read_timeout = val; + } + + /// Set the timeout for the request. + /// + /// Default is no timeout. + #[inline] + pub fn set_write_timeout(&mut self, val: Option) { + self.write_timeout = val; + } } impl Service for TimeoutConnector - where T: Service + 'static, - T::Response: AsyncRead + AsyncWrite, - T::Future: Future, +where + T: Service + 'static, + T::Response: AsyncRead + AsyncWrite, + T::Future: Future, { type Request = T::Request; - type Response = T::Response; + type Response = TimeoutStream; type Error = T::Error; - type Future = Box>; + type Future = Box>; fn call(&self, req: Self::Request) -> Self::Future { + let handle = self.handle.clone(); + let read_timeout = self.read_timeout.clone(); + let write_timeout = self.write_timeout.clone(); let connecting = self.connector.call(req); - let timeout = Timeout::new(self.connect_timeout, &self.handle).unwrap(); - - Box::new(connecting.select2(timeout).then(|res| { - match res { - Ok(Either::A((io, _))) => Ok(io), - Ok(Either::B((_, _))) => { - Err(io::Error::new( - io::ErrorKind::TimedOut, - "Client timed out while connecting" - )) - } - Err(Either::A((e, _))) => Err(e), - Err(Either::B((e, _))) => Err(e), + + if self.connect_timeout.is_none() { + return Box::new(connecting.map(move |io| { + let mut tm = TimeoutStream::new(io, &handle); + tm.set_read_timeout(read_timeout); + tm.set_write_timeout(write_timeout); + tm + })); + } + + let connect_timeout = self.connect_timeout.expect("Connect timeout should be set"); + let timeout = Timeout::new(connect_timeout, &self.handle).unwrap(); + + Box::new(connecting.select2(timeout).then(move |res| match res { + Ok(Either::A((io, _))) => { + let mut tm = TimeoutStream::new(io, &handle); + tm.set_read_timeout(read_timeout); + tm.set_write_timeout(write_timeout); + Ok(tm) } + Ok(Either::B((_, _))) => { + Err(io::Error::new( + io::ErrorKind::TimedOut, + "Client timed out while connecting", + )) + } + Err(Either::A((e, _))) => Err(e), + Err(Either::B((e, _))) => Err(e), })) } } @@ -80,12 +130,15 @@ mod tests { let mut core = Core::new().unwrap(); // 10.255.255.1 is a not a routable IP address let url = "http://10.255.255.1".parse().unwrap(); - let connector = TimeoutConnector::new( - HttpConnector::new(1, &core.handle()), - &core.handle(), - Duration::from_millis(1) - ); + let mut connector = + TimeoutConnector::new(HttpConnector::new(1, &core.handle()), &core.handle()); + connector.set_connect_timeout(Some(Duration::from_millis(1))); - assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::TimedOut); + match core.run(connector.connect(url)) { + Err(e) => { + assert_eq!(e.kind(), io::ErrorKind::TimedOut); + } + _ => panic!("Expected timeout error"), + } } }