Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ version = "0.1.0"
authors = ["Herman J. Radtke III <herman@hermanradtke.com>"]

[dependencies]
futures = "0.1.11"
hyper = "0.11.0"
futures = "0.1"
hyper = "0.11"
tokio-core = "0.1"
tokio-io = "0.1"
tokio-service = "0.1.0"
tokio-service = "0.1"
tokio-io-timeout = "0.1"

[dev-dependencies]
native-tls = "0.1"
hyper-tls = "0.1.1"
hyper-tls = "0.1"
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ At the time this crate was created, hyper does not support timeouts. There is a

There is a `TimeoutConnector` that implements the `hyper::Connect` trait. This connector wraps around `HttpConnector` or `HttpsConnector` values and provides timeouts.

**Note:** Because of the way `tokio_proto::ClientProto` works, a read or write timeout will return a _broken pipe_ error.

## Usage

First, add this to your `Cargo.toml`:
Expand All @@ -28,6 +30,8 @@ Next, add this to your crate:
extern crate hyper_timeout;
```

See the [client example](./examples/client.rs) for a working example.

## License

Licensed under either of
Expand Down
54 changes: 54 additions & 0 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
extern crate futures;
extern crate tokio_core;
extern crate hyper;
extern crate hyper_tls;
extern crate hyper_timeout;

use std::env;
use std::time::Duration;

use futures::Future;
use futures::stream::Stream;

use hyper::Client;

//use hyper::client::HttpConnector;
use hyper_tls::HttpsConnector;

use hyper_timeout::TimeoutConnector;

fn main() {

let url = match env::args().nth(1) {
Some(url) => url,
None => {
println!("Usage: client <url>");
return;
}
};

let url = url.parse::<hyper::Uri>().unwrap();

let mut core = tokio_core::reactor::Core::new().unwrap();
let handle = core.handle();

// This example uses `HttpsConnector`, but you can also use the default hyper `HttpConnector`
//let connector = HttpConnector::new(4, &handle);
let connector = HttpsConnector::new(4, &handle).unwrap();
let mut tm = TimeoutConnector::new(connector, &handle);
tm.set_connect_timeout(Some(Duration::from_secs(5)));
tm.set_read_timeout(Some(Duration::from_secs(5)));
tm.set_write_timeout(Some(Duration::from_secs(5)));
let client = Client::configure().connector(tm).build(&handle);

let get = client.get(url).and_then(|res| {
println!("Response: {}", res.status());
println!("Headers: \n{}", res.headers());

res.body().concat2()
});

let got = core.run(get).unwrap();
let output = String::from_utf8_lossy(&got);
println!("{}", output);
}
107 changes: 80 additions & 27 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_service;
extern crate tokio_io_timeout;
extern crate hyper;

use std::time::Duration;
Expand All @@ -12,6 +13,7 @@ use futures::future::{Either, Future};
use tokio_core::reactor::{Handle, Timeout};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_service::Service;
use tokio_io_timeout::TimeoutStream;

use hyper::client::Connect;

Expand All @@ -23,46 +25,94 @@ pub struct TimeoutConnector<T> {
/// Handle to be used to set the timeout within tokio's core
handle: Handle,
/// Amount of time to wait connecting
connect_timeout: Duration,
connect_timeout: Option<Duration>,
/// Amount of time to wait reading response
read_timeout: Option<Duration>,
/// Amount of time to wait writing request
write_timeout: Option<Duration>,
}

impl<T: Connect> TimeoutConnector<T> {
/// Construct a new TimeoutConnector with a given connector implementing the `Connect` trait
pub fn new(connector: T, handle: &Handle, timeout: Duration) -> Self {
pub fn new(connector: T, handle: &Handle) -> Self {
TimeoutConnector {
connector: connector,
handle: handle.clone(),
connect_timeout: timeout,
connect_timeout: None,
read_timeout: None,
write_timeout: None,
}
}

/// Set the timeout for connecting to a URL.
///
/// Default is no timeout.
#[inline]
pub fn set_connect_timeout(&mut self, val: Option<Duration>) {
self.connect_timeout = val;
}

/// Set the timeout for the response.
///
/// Default is no timeout.
#[inline]
pub fn set_read_timeout(&mut self, val: Option<Duration>) {
self.read_timeout = val;
}

/// Set the timeout for the request.
///
/// Default is no timeout.
#[inline]
pub fn set_write_timeout(&mut self, val: Option<Duration>) {
self.write_timeout = val;
}
}

impl<T> Service for TimeoutConnector<T>
where T: Service<Error=io::Error> + 'static,
T::Response: AsyncRead + AsyncWrite,
T::Future: Future<Error=io::Error>,
where
T: Service<Error = io::Error> + 'static,
T::Response: AsyncRead + AsyncWrite,
T::Future: Future<Error = io::Error>,
{
type Request = T::Request;
type Response = T::Response;
type Response = TimeoutStream<T::Response>;
type Error = T::Error;
type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

fn call(&self, req: Self::Request) -> Self::Future {
let handle = self.handle.clone();
let read_timeout = self.read_timeout.clone();
let write_timeout = self.write_timeout.clone();
let connecting = self.connector.call(req);
let timeout = Timeout::new(self.connect_timeout, &self.handle).unwrap();

Box::new(connecting.select2(timeout).then(|res| {
match res {
Ok(Either::A((io, _))) => Ok(io),
Ok(Either::B((_, _))) => {
Err(io::Error::new(
io::ErrorKind::TimedOut,
"Client timed out while connecting"
))
}
Err(Either::A((e, _))) => Err(e),
Err(Either::B((e, _))) => Err(e),

if self.connect_timeout.is_none() {
return Box::new(connecting.map(move |io| {
let mut tm = TimeoutStream::new(io, &handle);
tm.set_read_timeout(read_timeout);
tm.set_write_timeout(write_timeout);
tm
}));
}

let connect_timeout = self.connect_timeout.expect("Connect timeout should be set");
let timeout = Timeout::new(connect_timeout, &self.handle).unwrap();

Box::new(connecting.select2(timeout).then(move |res| match res {
Ok(Either::A((io, _))) => {
let mut tm = TimeoutStream::new(io, &handle);
tm.set_read_timeout(read_timeout);
tm.set_write_timeout(write_timeout);
Ok(tm)
}
Ok(Either::B((_, _))) => {
Err(io::Error::new(
io::ErrorKind::TimedOut,
"Client timed out while connecting",
))
}
Err(Either::A((e, _))) => Err(e),
Err(Either::B((e, _))) => Err(e),
}))
}
}
Expand All @@ -80,12 +130,15 @@ mod tests {
let mut core = Core::new().unwrap();
// 10.255.255.1 is a not a routable IP address
let url = "http://10.255.255.1".parse().unwrap();
let connector = TimeoutConnector::new(
HttpConnector::new(1, &core.handle()),
&core.handle(),
Duration::from_millis(1)
);
let mut connector =
TimeoutConnector::new(HttpConnector::new(1, &core.handle()), &core.handle());
connector.set_connect_timeout(Some(Duration::from_millis(1)));

assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::TimedOut);
match core.run(connector.connect(url)) {
Err(e) => {
assert_eq!(e.kind(), io::ErrorKind::TimedOut);
}
_ => panic!("Expected timeout error"),
}
}
}