Skip to content

Commit

Permalink
Merge pull request #856 from hyperium/keep-alive
Browse files Browse the repository at this point in the history
feat(client): implement connection pooling for Client
  • Loading branch information
seanmonstar committed Jul 8, 2016
2 parents 5f273ef + 2904668 commit 220d09f
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 74 deletions.
95 changes: 69 additions & 26 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use std::collections::HashMap;
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::sync::mpsc;
use std::thread;
Expand All @@ -24,7 +25,6 @@ pub use self::response::Response;

mod connect;
mod dns;
//mod pool;
mod request;
mod response;

Expand Down Expand Up @@ -116,6 +116,7 @@ impl<H: Send> Client<H> {
loop_.run(Context {
connect_timeout: connect_timeout,
keep_alive: keep_alive,
idle_conns: HashMap::new(),
queue: HashMap::new(),
}).unwrap()
}));
Expand Down Expand Up @@ -332,7 +333,7 @@ impl<H: Handler<T>, T: Transport> http::MessageHandler<T> for Message<H, T> {
struct Context<K, H> {
connect_timeout: Duration,
keep_alive: bool,
// idle: HashMap<K, Vec<Notify>>,
idle_conns: HashMap<K, Vec<http::Control>>,
queue: HashMap<K, Vec<Queued<H>>>,
}

Expand All @@ -352,6 +353,27 @@ impl<K: http::Key, H> Context<K, H> {
}
queued
}

fn conn_response<C>(&mut self, conn: Option<(http::Conn<K, C::Output, Message<H, C::Output>>, Option<Duration>)>, time: rotor::Time)
-> rotor::Response<ClientFsm<C, H>, (C::Key, C::Output)>
where C: Connect<Key=K>, H: Handler<C::Output> {
match conn {
Some((conn, timeout)) => {
//TODO: HTTP2: a connection doesn't need to be idle to be used for a second stream
if conn.is_idle() {
self.idle_conns.entry(conn.key().clone()).or_insert_with(Vec::new)
.push(conn.control());
}
match timeout {
Some(dur) => rotor::Response::ok(ClientFsm::Socket(conn))
.deadline(time + dur),
None => rotor::Response::ok(ClientFsm::Socket(conn)),
}

}
None => rotor::Response::done()
}
}
}

impl<K: http::Key, H: Handler<T>, T: Transport> http::MessageHandlerFactory<K, T> for Context<K, H> {
Expand Down Expand Up @@ -414,14 +436,9 @@ where C: Connect,
unreachable!("Connector can never be ready")
},
ClientFsm::Socket(conn) => {
match conn.ready(events, scope) {
Some((conn, None)) => rotor::Response::ok(ClientFsm::Socket(conn)),
Some((conn, Some(dur))) => {
rotor::Response::ok(ClientFsm::Socket(conn))
.deadline(scope.now() + dur)
}
None => rotor::Response::done()
}
let res = conn.ready(events, scope);
let now = scope.now();
scope.conn_response(res, now)
}
}
}
Expand Down Expand Up @@ -461,14 +478,9 @@ where C: Connect,
}
}
ClientFsm::Socket(conn) => {
match conn.timeout(scope) {
Some((conn, None)) => rotor::Response::ok(ClientFsm::Socket(conn)),
Some((conn, Some(dur))) => {
rotor::Response::ok(ClientFsm::Socket(conn))
.deadline(scope.now() + dur)
}
None => rotor::Response::done()
}
let res = conn.timeout(scope);
let now = scope.now();
scope.conn_response(res, now)
}
}
}
Expand All @@ -478,13 +490,10 @@ where C: Connect,
ClientFsm::Connector(..) => {
self.connect(scope)
},
ClientFsm::Socket(conn) => match conn.wakeup(scope) {
Some((conn, None)) => rotor::Response::ok(ClientFsm::Socket(conn)),
Some((conn, Some(dur))) => {
rotor::Response::ok(ClientFsm::Socket(conn))
.deadline(scope.now() + dur)
}
None => rotor::Response::done()
ClientFsm::Socket(conn) => {
let res = conn.wakeup(scope);
let now = scope.now();
scope.conn_response(res, now)
}
}
}
Expand Down Expand Up @@ -513,7 +522,41 @@ where C: Connect,
loop {
match rx.try_recv() {
Ok(Notify::Connect(url, mut handler)) => {
// TODO: check pool for sockets to this domain
// check pool for sockets to this domain
if let Some(key) = connector.key(&url) {
let mut remove_idle = false;
let mut woke_up = false;
if let Some(mut idle) = scope.idle_conns.get_mut(&key) {
while !idle.is_empty() {
let ctrl = idle.remove(0);
// err means the socket has since died
if ctrl.ready(Next::write()).is_ok() {
woke_up = true;
break;
}
}
remove_idle = idle.is_empty();
}
if remove_idle {
scope.idle_conns.remove(&key);
}

if woke_up {
trace!("woke up idle conn for '{}'", url);
let deadline = scope.now() + scope.connect_timeout;
scope.queue.entry(key).or_insert_with(Vec::new).push(Queued {
deadline: deadline,
handler: handler,
url: url
});
continue;
}
} else {
// this connector cannot handle this url anyways
let _ = handler.on_error(io::Error::new(io::ErrorKind::InvalidInput, "invalid url for connector").into());
continue;
}
// no exist connection, call connector
match connector.connect(&url) {
Ok(key) => {
let deadline = scope.now() + scope.connect_timeout;
Expand Down
76 changes: 59 additions & 17 deletions src/http/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
fn interest(&self) -> Reg {
match self.state {
State::Closed => Reg::Remove,
State::Init => {
<H as MessageHandler>::Message::initial_interest().interest()
State::Init { interest, .. } => {
interest.register()
}
State::Http1(Http1 { reading: Reading::Closed, writing: Writing::Closed, .. }) => {
Reg::Remove
Expand Down Expand Up @@ -142,12 +142,12 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {

fn read<F: MessageHandlerFactory<K, T, Output=H>>(&mut self, scope: &mut Scope<F>, state: State<H, T>) -> State<H, T> {
match state {
State::Init => {
State::Init { interest: Next_::Read, .. } => {
let head = match self.parse() {
Ok(head) => head,
Err(::Error::Io(e)) => match e.kind() {
io::ErrorKind::WouldBlock |
io::ErrorKind::Interrupted => return State::Init,
io::ErrorKind::Interrupted => return state,
_ => {
debug!("io error trying to parse {:?}", e);
return State::Closed;
Expand Down Expand Up @@ -219,6 +219,10 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
}
}
},
State::Init { .. } => {
trace!("on_readable State::{:?}", state);
state
},
State::Http1(mut http1) => {
let next = match http1.reading {
Reading::Init => None,
Expand Down Expand Up @@ -274,7 +278,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
if let Some(next) = next {
s.update(next);
}
trace!("Conn.on_readable State::Http1 completed, new state = {:?}", s);
trace!("Conn.on_readable State::Http1 completed, new state = State::{:?}", s);

let again = match s {
State::Http1(Http1 { reading: Reading::Body(ref encoder), .. }) => encoder.is_eof(),
Expand All @@ -296,7 +300,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {

fn write<F: MessageHandlerFactory<K, T, Output=H>>(&mut self, scope: &mut Scope<F>, mut state: State<H, T>) -> State<H, T> {
let next = match state {
State::Init => {
State::Init { interest: Next_::Write, .. } => {
// this could be a Client request, which writes first, so pay
// attention to the version written here, which will adjust
// our internal state to Http1 or Http2
Expand Down Expand Up @@ -336,6 +340,10 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
}
Some(interest)
}
State::Init { .. } => {
trace!("Conn.on_writable State::{:?}", state);
None
}
State::Http1(Http1 { ref mut handler, ref mut writing, ref mut keep_alive, .. }) => {
match *writing {
Writing::Init => {
Expand Down Expand Up @@ -426,7 +434,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {

fn can_read_more(&self) -> bool {
match self.state {
State::Init => false,
State::Init { .. } => false,
_ => !self.buf.is_empty()
}
}
Expand All @@ -435,7 +443,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
debug!("on_error err = {:?}", err);
trace!("on_error state = {:?}", self.state);
let next = match self.state {
State::Init => Next::remove(),
State::Init { .. } => Next::remove(),
State::Http1(ref mut http1) => http1.handler.on_error(err),
State::Closed => Next::remove(),
};
Expand All @@ -461,7 +469,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
fn on_remove(self) {
debug!("on_remove");
match self.state {
State::Init | State::Closed => (),
State::Init { .. } | State::Closed => (),
State::Http1(http1) => http1.handler.on_remove(self.transport),
}
}
Expand All @@ -475,7 +483,10 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
ctrl: channel::new(notify),
keep_alive_enabled: true,
key: key,
state: State::Init,
state: State::Init {
interest: H::Message::initial_interest().interest,
timeout: None,
},
transport: transport,
}))
}
Expand Down Expand Up @@ -585,10 +596,30 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
self.0.on_remove()
}

pub fn key(&self) -> &K {
&self.0.key
}

pub fn control(&self) -> Control {
Control {
tx: self.0.ctrl.0.clone(),
}
}

pub fn is_idle(&self) -> bool {
if let State::Init { interest: Next_::Wait, .. } = self.0.state {
true
} else {
false
}
}
}

enum State<H: MessageHandler<T>, T: Transport> {
Init,
Init {
interest: Next_,
timeout: Option<Duration>,
},
/// Http1 will only ever use a connection to send and receive a single
/// message at a time. Once a H1 status has been determined, we will either
/// be reading or writing an H1 message, and optionally multiple if
Expand All @@ -606,7 +637,7 @@ enum State<H: MessageHandler<T>, T: Transport> {
impl<H: MessageHandler<T>, T: Transport> State<H, T> {
fn timeout(&self) -> Option<Duration> {
match *self {
State::Init => None,
State::Init { timeout, .. } => timeout,
State::Http1(ref http1) => http1.timeout,
State::Closed => None,
}
Expand All @@ -616,7 +647,10 @@ impl<H: MessageHandler<T>, T: Transport> State<H, T> {
impl<H: MessageHandler<T>, T: Transport> fmt::Debug for State<H, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
State::Init => f.write_str("Init"),
State::Init { interest, timeout } => f.debug_struct("Init")
.field("interest", &interest)
.field("timeout", &timeout)
.finish(),
State::Http1(ref h1) => f.debug_tuple("Http1")
.field(h1)
.finish(),
Expand All @@ -632,10 +666,14 @@ impl<H: MessageHandler<T>, T: Transport> State<H, T> {
let new_state = match (state, next.interest) {
(_, Next_::Remove) => State::Closed,
(State::Closed, _) => State::Closed,
(State::Init, _) => State::Init,
(State::Init { timeout, .. }, e) => State::Init {
interest: e,
timeout: timeout,
},
(State::Http1(http1), Next_::End) => {
let reading = match http1.reading {
Reading::Body(ref decoder) if decoder.is_eof() => {
Reading::Body(ref decoder) |
Reading::Wait(ref decoder) if decoder.is_eof() => {
if http1.keep_alive {
Reading::KeepAlive
} else {
Expand All @@ -646,6 +684,7 @@ impl<H: MessageHandler<T>, T: Transport> State<H, T> {
_ => Reading::Closed,
};
let writing = match http1.writing {
Writing::Wait(encoder) |
Writing::Ready(encoder) => {
if encoder.is_eof() {
if http1.keep_alive {
Expand Down Expand Up @@ -691,8 +730,11 @@ impl<H: MessageHandler<T>, T: Transport> State<H, T> {
};
match (reading, writing) {
(Reading::KeepAlive, Writing::KeepAlive) => {
//http1.handler.on_keep_alive();
State::Init
//XXX keepalive
State::Init {
interest: H::Message::keep_alive_interest().interest,
timeout: None,
}
},
(reading, Writing::Chunk(chunk)) => {
State::Http1(Http1 {
Expand Down
8 changes: 8 additions & 0 deletions src/http/h1/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ impl Http1Message for ServerMessage {
Next::new(Next_::Read)
}

fn keep_alive_interest() -> Next {
Next::new(Next_::Read)
}

fn parse(buf: &[u8]) -> ParseResult<RequestLine> {
let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS];
trace!("Request.parse([Header; {}], [u8; {}])", headers.len(), buf.len());
Expand Down Expand Up @@ -114,6 +118,10 @@ impl Http1Message for ClientMessage {
Next::new(Next_::Write)
}

fn keep_alive_interest() -> Next {
Next::new(Next_::Wait)
}

fn parse(buf: &[u8]) -> ParseResult<RawStatus> {
let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS];
trace!("Response.parse([Header; {}], [u8; {}])", headers.len(), buf.len());
Expand Down

0 comments on commit 220d09f

Please sign in to comment.