diff --git a/examples/client.rs b/examples/client.rs index a66afb5af2..e453656cce 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -48,7 +48,7 @@ impl hyper::client::Handler for Dump { Err(e) => match e.kind() { io::ErrorKind::WouldBlock => Next::read(), _ => { - println!("ERROR: {}", e); + println!("ERROR:example: {}", e); Next::end() } } @@ -56,7 +56,7 @@ impl hyper::client::Handler for Dump { } fn on_error(&mut self, err: hyper::Error) -> Next { - println!("ERROR: {}", err); + println!("ERROR:example: {}", err); Next::remove() } } diff --git a/src/client/connect.rs b/src/client/connect.rs index fc34f45fc1..19aee61584 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -16,7 +16,7 @@ pub trait Connect { /// Type of Transport to create type Output: Transport; /// The key used to determine if an existing socket can be used. - type Key: Eq + Hash + Clone; + type Key: Eq + Hash + Clone + fmt::Debug; /// Returns the key based off the Url. fn key(&self, &Url) -> Option; /// Connect to a remote address. @@ -96,10 +96,12 @@ impl Connect for HttpConnector { } fn connected(&mut self) -> Option<(Self::Key, io::Result)> { - let (host, addr) = match self.dns.as_ref().expect("dns workers lost").resolved() { + let (host, addrs) = match self.dns.as_ref().expect("dns workers lost").resolved() { Ok(res) => res, Err(_) => return None }; + //TODO: try all addrs + let addr = addrs.and_then(|mut addrs| Ok(addrs.next().unwrap())); debug!("Http::resolved <- ({:?}, {:?})", host, addr); if let Entry::Occupied(mut entry) = self.resolving.entry(host) { let resolved = entry.get_mut().remove(0); diff --git a/src/client/dns.rs b/src/client/dns.rs index 8a3579e612..0c2a2226fb 100644 --- a/src/client/dns.rs +++ b/src/client/dns.rs @@ -1,6 +1,7 @@ use std::io; -use std::net::{IpAddr, ToSocketAddrs}; +use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; use std::thread; +use std::vec; use ::spmc; @@ -11,7 +12,19 @@ pub struct Dns { rx: channel::Receiver, } -pub type Answer = (String, io::Result); +pub type Answer = (String, io::Result); + +pub struct IpAddrs { + iter: vec::IntoIter, +} + +impl Iterator for IpAddrs { + type Item = IpAddr; + #[inline] + fn next(&mut self) -> Option { + self.iter.next().map(|addr| addr.ip()) + } +} impl Dns { pub fn new(notify: (channel::Sender, channel::Receiver), threads: usize) -> Dns { @@ -26,7 +39,7 @@ impl Dns { } pub fn resolve>(&self, hostname: T) { - self.tx.send(hostname.into()).expect("Workers all died horribly"); + self.tx.send(hostname.into()).expect("DNS workers all died unexpectedly"); } pub fn resolved(&self) -> Result { @@ -41,9 +54,8 @@ fn work(rx: spmc::Receiver, notify: channel::Sender) { let notify = worker.notify.as_ref().expect("Worker lost notify"); while let Ok(host) = rx.recv() { debug!("resolve {:?}", host); - let res = match (&*host, 80).to_socket_addrs().map(|mut i| i.next()) { - Ok(Some(addr)) => (host, Ok(addr.ip())), - Ok(None) => (host, Err(io::Error::new(io::ErrorKind::Other, "no addresses found"))), + let res = match (&*host, 80).to_socket_addrs().map(|i| IpAddrs{ iter: i }) { + Ok(addrs) => (host, Ok(addrs)), Err(e) => (host, Err(e)) }; diff --git a/src/client/mod.rs b/src/client/mod.rs index fed4505d9c..a7fc701621 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -30,7 +30,6 @@ mod response; /// A Client to make outgoing HTTP requests. pub struct Client { - //handle: Option>, tx: http::channel::Sender>, } @@ -64,16 +63,6 @@ impl Client { pub fn configure() -> Config { Config::default() } - - /*TODO - pub fn http() -> Config { - - } - - pub fn https() -> Config { - - } - */ } impl::Output>> Client { @@ -243,14 +232,6 @@ impl fmt::Display for ClientError { } } -/* -impl Drop for Client { - fn drop(&mut self) { - self.handle.take().map(|handle| handle.join()); - } -} -*/ - /// A trait to react to client events that happen for each message. /// /// Each event handler returns it's desired `Next` action. @@ -338,15 +319,16 @@ struct Context { } impl Context { - fn pop_queue(&mut self, key: &K) -> Queued { + fn pop_queue(&mut self, key: &K) -> Option> { let mut should_remove = false; let queued = { - let mut vec = self.queue.get_mut(key).expect("handler not in queue for key"); - let queued = vec.remove(0); - if vec.is_empty() { - should_remove = true; - } - queued + self.queue.get_mut(key).map(|vec| { + let queued = vec.remove(0); + if vec.is_empty() { + should_remove = true; + } + queued + }) }; if should_remove { self.queue.remove(key); @@ -379,16 +361,22 @@ impl Context { impl, T: Transport> http::MessageHandlerFactory for Context { type Output = Message; - fn create(&mut self, seed: http::Seed) -> Self::Output { + fn create(&mut self, seed: http::Seed) -> Option { let key = seed.key(); - let queued = self.pop_queue(key); - let (url, mut handler) = (queued.url, queued.handler); - handler.on_control(seed.control()); - Message { - handler: handler, - url: Some(url), - _marker: PhantomData, - } + self.pop_queue(key).map(|queued| { + let (url, mut handler) = (queued.url, queued.handler); + handler.on_control(seed.control()); + + Message { + handler: handler, + url: Some(url), + _marker: PhantomData, + } + }) + } + + fn keep_alive_interest(&self) -> Next { + Next::wait() } } @@ -402,6 +390,7 @@ where C: Connect, C::Output: Transport, H: Handler { Connector(C, http::channel::Receiver>), + Connecting((C::Key, C::Output)), Socket(http::Conn>) } @@ -415,6 +404,7 @@ where impl rotor::Machine for ClientFsm where C: Connect, + C::Key: fmt::Debug, C::Output: Transport, H: Handler { type Context = Context; @@ -422,24 +412,47 @@ where C: Connect, fn create(seed: Self::Seed, scope: &mut Scope) -> rotor::Response { rotor_try!(scope.register(&seed.1, EventSet::writable(), PollOpt::level())); - rotor::Response::ok( - ClientFsm::Socket( - http::Conn::new(seed.0, seed.1, scope.notifier()) - .keep_alive(scope.keep_alive) - ) - ) + rotor::Response::ok(ClientFsm::Connecting(seed)) } fn ready(self, events: EventSet, scope: &mut Scope) -> rotor::Response { match self { - ClientFsm::Connector(..) => { - unreachable!("Connector can never be ready") - }, ClientFsm::Socket(conn) => { let res = conn.ready(events, scope); let now = scope.now(); scope.conn_response(res, now) + }, + ClientFsm::Connecting(mut seed) => { + if events.is_error() || events.is_hup() { + if let Some(err) = seed.1.take_socket_error().err() { + debug!("error while connecting: {:?}", err); + scope.pop_queue(&seed.0).map(move |mut queued| queued.handler.on_error(::Error::Io(err))); + rotor::Response::done() + } else { + trace!("connecting is_error, but no socket error"); + rotor::Response::ok(ClientFsm::Connecting(seed)) + } + } else if events.is_writable() { + if scope.queue.contains_key(&seed.0) { + trace!("connected and writable {:?}", seed.0); + rotor::Response::ok( + ClientFsm::Socket( + http::Conn::new(seed.0, seed.1, Next::write().timeout(scope.connect_timeout), scope.notifier()) + .keep_alive(scope.keep_alive) + ) + ) + } else { + trace!("connected, but queued handler is gone: {:?}", seed.0); // probably took too long connecting + rotor::Response::done() + } + } else { + // spurious? + rotor::Response::ok(ClientFsm::Connecting(seed)) + } } + ClientFsm::Connector(..) => { + unreachable!("Connector can never be ready") + }, } } @@ -477,6 +490,7 @@ where C: Connect, None => rotor::Response::ok(self) } } + ClientFsm::Connecting(..) => unreachable!(), ClientFsm::Socket(conn) => { let res = conn.timeout(scope); let now = scope.now(); @@ -494,13 +508,15 @@ where C: Connect, let res = conn.wakeup(scope); let now = scope.now(); scope.conn_response(res, now) - } + }, + ClientFsm::Connecting(..) => unreachable!("connecting sockets should not be woken up") } } } impl ClientFsm where C: Connect, + C::Key: fmt::Debug, C::Output: Transport, H: Handler { fn connect(self, scope: &mut rotor::Scope<::Context>) -> rotor::Response::Seed> { @@ -508,14 +524,13 @@ where C: Connect, ClientFsm::Connector(mut connector, rx) => { if let Some((key, res)) = connector.connected() { match res { - Ok(socket) => { - trace!("connected"); + Ok(mut socket) => { + trace!("connected, err = {:?}", socket.take_socket_error()); return rotor::Response::spawn(ClientFsm::Connector(connector, rx), (key, socket)); }, Err(e) => { trace!("connected error = {:?}", e); - let mut queued = scope.pop_queue(&key); - let _ = queued.handler.on_error(::Error::Io(e)); + scope.pop_queue(&key).map(|mut queued| queued.handler.on_error(::Error::Io(e))); } } } diff --git a/src/http/conn.rs b/src/http/conn.rs index 9d727e72b0..0324e12691 100644 --- a/src/http/conn.rs +++ b/src/http/conn.rs @@ -163,7 +163,10 @@ impl> ConnInner { Ok(decoder) => { trace!("decoder = {:?}", decoder); let keep_alive = self.keep_alive_enabled && head.should_keep_alive(); - let mut handler = scope.create(Seed(&self.key, &self.ctrl.0)); + let mut handler = match scope.create(Seed(&self.key, &self.ctrl.0)) { + Some(handler) => handler, + None => unreachable!() + }; let next = handler.on_incoming(head, &self.transport); trace!("handler.on_incoming() -> {:?}", next); @@ -276,7 +279,7 @@ impl> ConnInner { }; let mut s = State::Http1(http1); if let Some(next) = next { - s.update(next); + s.update(next, &**scope); } trace!("Conn.on_readable State::Http1 completed, new state = State::{:?}", s); @@ -304,7 +307,13 @@ impl> ConnInner { // this could be a Client request, which writes first, so pay // attention to the version written here, which will adjust // our internal state to Http1 or Http2 - let mut handler = scope.create(Seed(&self.key, &self.ctrl.0)); + let mut handler = match scope.create(Seed(&self.key, &self.ctrl.0)) { + Some(handler) => handler, + None => { + trace!("could not create handler {:?}", self.key); + return State::Closed; + } + }; let mut head = http::MessageHead::default(); let interest = handler.on_outgoing(&mut head); if head.version == HttpVersion::Http11 { @@ -427,7 +436,7 @@ impl> ConnInner { }; if let Some(next) = next { - state.update(next); + state.update(next, &**scope); } state } @@ -439,7 +448,7 @@ impl> ConnInner { } } - fn on_error(&mut self, err: ::Error) { + fn on_error(&mut self, err: ::Error, factory: &F) where F: MessageHandlerFactory { debug!("on_error err = {:?}", err); trace!("on_error state = {:?}", self.state); let next = match self.state { @@ -447,7 +456,7 @@ impl> ConnInner { State::Http1(ref mut http1) => http1.handler.on_error(err), State::Closed => Next::remove(), }; - self.state.update(next); + self.state.update(next, factory); } fn on_readable(&mut self, scope: &mut Scope) @@ -477,15 +486,15 @@ impl> ConnInner { } impl> Conn { - pub fn new(key: K, transport: T, notify: rotor::Notifier) -> Conn { + pub fn new(key: K, transport: T, next: Next, notify: rotor::Notifier) -> Conn { Conn(Box::new(ConnInner { buf: Buffer::new(), ctrl: channel::new(notify), keep_alive_enabled: true, key: key, state: State::Init { - interest: H::Message::initial_interest().interest, - timeout: None, + interest: next.interest, + timeout: next.timeout, }, transport: transport, })) @@ -506,7 +515,7 @@ impl> Conn { trace!("is_error, but not socket error"); // spurious? }, - Err(e) => self.0.on_error(e.into()) + Err(e) => self.0.on_error(e.into(), &**scope) } } @@ -570,7 +579,7 @@ impl> Conn { }, Err(e) => { trace!("error reregistering: {:?}", e); - self.0.on_error(e.into()); + self.0.on_error(e.into(), &**scope); None } } @@ -580,7 +589,7 @@ impl> Conn { where F: MessageHandlerFactory { while let Ok(next) = self.0.ctrl.1.try_recv() { trace!("woke up with {:?}", next); - self.0.state.update(next); + self.0.state.update(next, &**scope); } self.ready(EventSet::readable() | EventSet::writable(), scope) } @@ -588,7 +597,7 @@ impl> Conn { pub fn timeout(mut self, scope: &mut Scope) -> Option<(Self, Option)> where F: MessageHandlerFactory { //TODO: check if this was a spurious timeout? - self.0.on_error(::Error::Timeout); + self.0.on_error(::Error::Timeout, &**scope); self.ready(EventSet::none(), scope) } @@ -660,7 +669,7 @@ impl, T: Transport> fmt::Debug for State { } impl, T: Transport> State { - fn update(&mut self, next: Next) { + fn update(&mut self, next: Next, factory: &F) where F: MessageHandlerFactory, K: Key { let timeout = next.timeout; let state = mem::replace(self, State::Closed); let new_state = match (state, next.interest) { @@ -730,10 +739,10 @@ impl, T: Transport> State { }; match (reading, writing) { (Reading::KeepAlive, Writing::KeepAlive) => { - //XXX keepalive + let next = factory.keep_alive_interest(); State::Init { - interest: H::Message::keep_alive_interest().interest, - timeout: None, + interest: next.interest, + timeout: next.timeout, } }, (reading, Writing::Chunk(chunk)) => { @@ -848,6 +857,10 @@ impl, T: Transport> State { } }; let new_state = match new_state { + State::Init { interest, .. } => State::Init { + timeout: timeout, + interest: interest, + }, State::Http1(mut http1) => { http1.timeout = timeout; State::Http1(http1) @@ -943,22 +956,13 @@ impl<'a, K: Key + 'a> Seed<'a, K> { pub trait MessageHandlerFactory { type Output: MessageHandler; - fn create(&mut self, seed: Seed) -> Self::Output; -} + fn create(&mut self, seed: Seed) -> Option; -impl MessageHandlerFactory for F -where F: FnMut(Seed) -> H, - K: Key, - H: MessageHandler, - T: Transport { - type Output = H; - fn create(&mut self, seed: Seed) -> H { - self(seed) - } + fn keep_alive_interest(&self) -> Next; } -pub trait Key: Eq + Hash + Clone {} -impl Key for T {} +pub trait Key: Eq + Hash + Clone + fmt::Debug {} +impl Key for T {} #[cfg(test)] mod tests { diff --git a/src/http/h1/parse.rs b/src/http/h1/parse.rs index 489342e5f2..ac08d8d9d3 100644 --- a/src/http/h1/parse.rs +++ b/src/http/h1/parse.rs @@ -4,7 +4,7 @@ use std::io::Write; use httparse; use header::{self, Headers, ContentLength, TransferEncoding}; -use http::{MessageHead, RawStatus, Http1Message, ParseResult, Next, ServerMessage, ClientMessage, Next_, RequestLine}; +use http::{MessageHead, RawStatus, Http1Message, ParseResult, ServerMessage, ClientMessage, RequestLine}; use http::h1::{Encoder, Decoder}; use method::Method; use status::StatusCode; @@ -27,14 +27,6 @@ impl Http1Message for ServerMessage { type Incoming = RequestLine; type Outgoing = StatusCode; - fn initial_interest() -> Next { - Next::new(Next_::Read) - } - - fn keep_alive_interest() -> Next { - Next::new(Next_::Read) - } - fn parse(buf: &[u8]) -> ParseResult { let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS]; trace!("Request.parse([Header; {}], [u8; {}])", headers.len(), buf.len()); @@ -113,15 +105,6 @@ impl Http1Message for ClientMessage { type Incoming = RawStatus; type Outgoing = RequestLine; - - fn initial_interest() -> Next { - Next::new(Next_::Write) - } - - fn keep_alive_interest() -> Next { - Next::new(Next_::Wait) - } - fn parse(buf: &[u8]) -> ParseResult { let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS]; trace!("Response.parse([Header; {}], [u8; {}])", headers.len(), buf.len()); diff --git a/src/http/mod.rs b/src/http/mod.rs index b9fe7c5cb8..3d974e716e 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -280,13 +280,9 @@ pub enum ClientMessage {} pub trait Http1Message { type Incoming; type Outgoing: Default; - //TODO: replace with associated const when stable - fn initial_interest() -> Next; - fn keep_alive_interest() -> Next; fn parse(bytes: &[u8]) -> ParseResult; fn decoder(head: &MessageHead) -> ::Result; fn encode(head: MessageHead, dst: &mut Vec) -> h1::Encoder; - } /// Used to signal desired events when working with asynchronous IO. diff --git a/src/server/mod.rs b/src/server/mod.rs index 2457b586f0..49af79e730 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -167,8 +167,12 @@ struct Context { impl, T: Transport> http::MessageHandlerFactory<(), T> for Context { type Output = message::Message; - fn create(&mut self, seed: http::Seed<()>) -> Self::Output { - message::Message::new(self.factory.create(seed.control())) + fn create(&mut self, seed: http::Seed<()>) -> Option { + Some(message::Message::new(self.factory.create(seed.control()))) + } + + fn keep_alive_interest(&self) -> Next { + Next::read() } } @@ -191,7 +195,7 @@ where A: Accept, rotor_try!(scope.register(&seed, EventSet::readable(), PollOpt::level())); rotor::Response::ok( ServerFsm::Conn( - http::Conn::new((), seed, scope.notifier()) + http::Conn::new((), seed, Next::read(), scope.notifier()) .keep_alive(scope.keep_alive) ) )