Permalink
Browse files

feat(server): Rewrite the accept loop into a custom thread pool.

This is a modified and specialized thread pool meant for
managing an acceptor in a multi-threaded way. A single handler
is provided which will be invoked on each stream.

Unlike the old thread pool, this returns a join guard which
will block until the acceptor closes, enabling friendly behavior
for the listening guard.

The task pool itself is also faster as it only pays for message passing
if sub-threads panic. In the optimistic case where there are few panics,
this saves using channels for any other communication.

This improves performance by around 15%, all the way to 105k req/sec
on my machine, which usually gets about 90k.

BREAKING_CHANGE: server::Listening::await is removed.
  • Loading branch information...
1 parent f554c09 commit 3528fb9b015a0959268452d5b42d5544c7b98a6a @reem reem committed Feb 14, 2015
Showing with 158 additions and 80 deletions.
  1. +2 −1 examples/hello.rs
  2. +1 −2 examples/server.rs
  3. +7 −3 src/lib.rs
  4. +95 −0 src/server/acceptor.rs
  5. +53 −74 src/server/mod.rs
View
@@ -13,6 +13,7 @@ fn hello(_: Request, res: Response) {
}
fn main() {
- hyper::Server::http(Ipv4Addr(127, 0, 0, 1), 3000).listen(hello).unwrap();
+ let _listening = hyper::Server::http(Ipv4Addr(127, 0, 0, 1), 3000)
+ .listen(hello).unwrap();
println!("Listening on http://127.0.0.1:3000");
}
View
@@ -51,7 +51,6 @@ fn echo(mut req: Request, mut res: Response) {
fn main() {
let server = Server::http(Ipv4Addr(127, 0, 0, 1), 1337);
- let mut listening = server.listen(echo).unwrap();
+ let _guard = server.listen(echo).unwrap();
println!("Listening on http://127.0.0.1:1337");
- listening.await();
}
View
@@ -1,5 +1,5 @@
#![feature(core, collections, hash, io, os, path, std_misc,
- slicing_syntax, box_syntax)]
+ slicing_syntax, box_syntax, unsafe_destructor)]
#![deny(missing_docs)]
#![cfg_attr(test, deny(warnings))]
#![cfg_attr(test, feature(alloc, test))]
@@ -130,12 +130,16 @@ extern crate "rustc-serialize" as serialize;
extern crate time;
extern crate url;
extern crate openssl;
-#[macro_use] extern crate log;
-#[cfg(test)] extern crate test;
extern crate "unsafe-any" as uany;
extern crate cookie;
extern crate unicase;
+#[macro_use]
+extern crate log;
+
+#[cfg(test)]
+extern crate test;
+
pub use std::old_io::net::ip::{SocketAddr, IpAddr, Ipv4Addr, Ipv6Addr, Port};
pub use mimewrapper::mime;
pub use url::Url;
@@ -0,0 +1,95 @@
+use std::thread::{Thread, JoinGuard};
+use std::sync::Arc;
+use std::sync::mpsc;
+use net::NetworkAcceptor;
+
+pub struct AcceptorPool<A: NetworkAcceptor> {
+ acceptor: A
+}
+
+impl<A: NetworkAcceptor> AcceptorPool<A> {
+ /// Create a thread pool to manage the acceptor.
+ pub fn new(acceptor: A) -> AcceptorPool<A> {
+ AcceptorPool { acceptor: acceptor }
+ }
+
+ /// Runs the acceptor pool. Blocks until the acceptors are closed.
+ ///
+ /// ## Panics
+ ///
+ /// Panics if threads == 0.
+ pub fn accept<F: Fn(A::Stream) + Send + Sync>(self,
+ work: F,
+ threads: usize) -> JoinGuard<'static, ()> {
+ assert!(threads != 0, "Can't accept on 0 threads.");
+
+ // Replace with &F when Send changes land.
+ let work = Arc::new(work);
+
+ let (super_tx, supervisor_rx) = mpsc::channel();
+
+ let spawn =
+ move || spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone());
+
+ // Go
+ for _ in 0..threads { spawn() }
+
+ // Spawn the supervisor
+ Thread::scoped(move || for () in supervisor_rx.iter() { spawn() })
+ }
+}
+
+fn spawn_with<A, F>(supervisor: mpsc::Sender<()>, work: Arc<F>, mut acceptor: A)
+where A: NetworkAcceptor,
+ F: Fn(<A as NetworkAcceptor>::Stream) + Send + Sync {
+ use std::old_io::EndOfFile;
+
+ Thread::spawn(move || {
+ let sentinel = Sentinel::new(supervisor, ());
+
+ loop {
+ match acceptor.accept() {
+ Ok(stream) => work(stream),
+ Err(ref e) if e.kind == EndOfFile => {
+ debug!("Server closed.");
+ sentinel.cancel();
+ return;
+ },
+
+ Err(e) => {
+ error!("Connection failed: {}", e);
+ }
+ }
+ }
+ });
+}
+
+struct Sentinel<T: Send> {
+ value: Option<T>,
+ supervisor: mpsc::Sender<T>,
+ active: bool
+}
+
+impl<T: Send> Sentinel<T> {
+ fn new(channel: mpsc::Sender<T>, data: T) -> Sentinel<T> {
+ Sentinel {
+ value: Some(data),
+ supervisor: channel,
+ active: true
+ }
+ }
+
+ fn cancel(mut self) { self.active = false; }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Sentinel<T> {
+ fn drop(&mut self) {
+ // If we were cancelled, get out of here.
+ if !self.active { return; }
+
+ // Respawn ourselves
+ let _ = self.supervisor.send(self.value.take().unwrap());
+ }
+}
+
View
@@ -1,10 +1,8 @@
//! HTTP Server
-use std::old_io::{Listener, EndOfFile, BufferedReader, BufferedWriter};
+use std::old_io::{Listener, BufferedReader, BufferedWriter};
use std::old_io::net::ip::{IpAddr, Port, SocketAddr};
use std::os;
-use std::sync::{Arc, TaskPool};
-use std::thread::{Builder, JoinGuard};
-
+use std::thread::JoinGuard;
pub use self::request::Request;
pub use self::response::Response;
@@ -19,9 +17,13 @@ use net::{NetworkListener, NetworkStream, NetworkAcceptor,
HttpAcceptor, HttpListener};
use version::HttpVersion::{Http10, Http11};
+use self::acceptor::AcceptorPool;
+
pub mod request;
pub mod response;
+mod acceptor;
+
/// A server can listen on a TCP socket.
///
/// Once listening, it will create a `Request`/`Response` pair for each
@@ -71,71 +73,14 @@ S: NetworkStream + Clone + Send> Server<L> {
let acceptor = try!(self.listener.listen((self.ip, self.port)));
let socket = try!(acceptor.socket_name());
- let mut captured = acceptor.clone();
- let guard = Builder::new().name("hyper acceptor".to_string()).scoped(move || {
- let handler = Arc::new(handler);
- debug!("threads = {:?}", threads);
- let pool = TaskPool::new(threads);
- for conn in captured.incoming() {
- match conn {
- Ok(mut stream) => {
- debug!("Incoming stream");
- let handler = handler.clone();
- pool.execute(move || {
- let addr = match stream.peer_name() {
- Ok(addr) => addr,
- Err(e) => {
- error!("Peer Name error: {:?}", e);
- return;
- }
- };
- let mut rdr = BufferedReader::new(stream.clone());
- let mut wrt = BufferedWriter::new(stream);
-
- let mut keep_alive = true;
- while keep_alive {
- let mut res = Response::new(&mut wrt);
- let req = match Request::new(&mut rdr, addr) {
- Ok(req) => req,
- Err(e@HttpIoError(_)) => {
- debug!("ioerror in keepalive loop = {:?}", e);
- return;
- }
- Err(e) => {
- //TODO: send a 400 response
- error!("request error = {:?}", e);
- return;
- }
- };
-
- keep_alive = match (req.version, req.headers.get::<Connection>()) {
- (Http10, Some(conn)) if !conn.contains(&KeepAlive) => false,
- (Http11, Some(conn)) if conn.contains(&Close) => false,
- _ => true
- };
- res.version = req.version;
- handler.handle(req, res);
- debug!("keep_alive = {:?}", keep_alive);
- }
-
- });
- },
- Err(ref e) if e.kind == EndOfFile => {
- debug!("server closed");
- break;
- },
- Err(e) => {
- error!("Connection failed: {}", e);
- continue;
- }
- }
- }
- });
+ debug!("threads = {:?}", threads);
+ let pool = AcceptorPool::new(acceptor.clone());
+ let work = move |stream| handle_connection(stream, &handler);
Ok(Listening {
- acceptor: acceptor,
- guard: Some(guard),
+ _guard: pool.accept(work, threads),
socket: socket,
+ acceptor: acceptor
})
}
@@ -146,22 +91,56 @@ S: NetworkStream + Clone + Send> Server<L> {
}
+fn handle_connection<S, H>(mut stream: S, handler: &H)
+where S: NetworkStream + Clone, H: Handler {
+ debug!("Incoming stream");
+ let addr = match stream.peer_name() {
+ Ok(addr) => addr,
+ Err(e) => {
+ error!("Peer Name error: {:?}", e);
+ return;
+ }
+ };
+
+ let mut rdr = BufferedReader::new(stream.clone());
+ let mut wrt = BufferedWriter::new(stream);
+
+ let mut keep_alive = true;
+ while keep_alive {
+ let mut res = Response::new(&mut wrt);
+ let req = match Request::new(&mut rdr, addr) {
+ Ok(req) => req,
+ Err(e@HttpIoError(_)) => {
+ debug!("ioerror in keepalive loop = {:?}", e);
+ return;
+ }
+ Err(e) => {
+ //TODO: send a 400 response
+ error!("request error = {:?}", e);
+ return;
+ }
+ };
+
+ keep_alive = match (req.version, req.headers.get::<Connection>()) {
+ (Http10, Some(conn)) if !conn.contains(&KeepAlive) => false,
+ (Http11, Some(conn)) if conn.contains(&Close) => false,
+ _ => true
+ };
+ res.version = req.version;
+ handler.handle(req, res);
+ debug!("keep_alive = {:?}", keep_alive);
+ }
+}
+
/// A listening server, which can later be closed.
pub struct Listening<A = HttpAcceptor> {
acceptor: A,
- guard: Option<JoinGuard<'static, ()>>,
+ _guard: JoinGuard<'static, ()>,
/// The socket addresses that the server is bound to.
pub socket: SocketAddr,
}
impl<A: NetworkAcceptor> Listening<A> {
- /// Causes the current thread to wait for this listening to complete.
- pub fn await(&mut self) {
- if let Some(guard) = self.guard.take() {
- let _ = guard.join();
- }
- }
-
/// Stop the server from listening to its socket address.
pub fn close(&mut self) -> HttpResult<()> {
debug!("closing server");

0 comments on commit 3528fb9

Please sign in to comment.