diff --git a/examples/hello.rs b/examples/hello.rs index a9e45d3cd3..c01eff7d3a 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -5,7 +5,7 @@ extern crate pretty_env_logger; //extern crate num_cpus; use hyper::header::{ContentLength, ContentType}; -use hyper::server::{Server, Service, Request, Response}; +use hyper::server::{Http, Service, Request, Response}; static PHRASE: &'static [u8] = b"Hello World!"; @@ -31,9 +31,7 @@ impl Service for Hello { fn main() { pretty_env_logger::init().unwrap(); let addr = "127.0.0.1:3000".parse().unwrap(); - let _server = Server::standalone(|tokio| { - Server::http(&addr, tokio)? - .handle(|| Ok(Hello), tokio) - }).unwrap(); - println!("Listening on http://{}", addr); + let server = Http::new().bind(&addr, || Ok(Hello)).unwrap(); + println!("Listening on http://{}", server.local_addr().unwrap()); + server.run().unwrap(); } diff --git a/examples/server.rs b/examples/server.rs index f0578c3328..c404039b1a 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -7,8 +7,7 @@ extern crate log; use hyper::{Get, Post, StatusCode}; use hyper::header::ContentLength; -use hyper::server::{Server, Service, Request, Response}; - +use hyper::server::{Http, Service, Request, Response}; static INDEX: &'static [u8] = b"Try POST /echo"; @@ -48,10 +47,8 @@ impl Service for Echo { fn main() { pretty_env_logger::init().unwrap(); let addr = "127.0.0.1:1337".parse().unwrap(); - let (listening, server) = Server::standalone(|tokio| { - Server::http(&addr, tokio)? - .handle(|| Ok(Echo), tokio) - }).unwrap(); - println!("Listening on http://{}", listening); - server.run(); + + let server = Http::new().bind(&addr, || Ok(Echo)).unwrap(); + println!("Listening on http://{}", server.local_addr().unwrap()); + server.run().unwrap(); } diff --git a/src/server/mod.rs b/src/server/mod.rs index c2ae861b96..87f8f6da1d 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -2,23 +2,26 @@ //! //! A `Server` is created to listen on a port, parse HTTP requests, and hand //! them off to a `Service`. + +use std::cell::RefCell; use std::fmt; use std::io; -use std::net::{SocketAddr, TcpListener as StdTcpListener}; +use std::net::SocketAddr; +use std::rc::{Rc, Weak}; +use std::time::Duration; -use futures::{Future, Map}; -use futures::stream::{Stream}; -use futures::sync::oneshot; +use futures::future; +use futures::task::{self, Task}; +use futures::{Future, Map, Stream, Poll, Async}; use tokio::io::Io; +use tokio::reactor::{Core, Handle, Timeout}; use tokio::net::TcpListener; -use tokio::reactor::{Core, Handle}; use tokio_proto::BindServer; use tokio_proto::streaming::Message; use tokio_proto::streaming::pipeline::ServerProto; pub use tokio_service::{NewService, Service}; -pub use self::accept::Accept; pub use self::request::Request; pub use self::response::Response; @@ -27,204 +30,103 @@ use http; mod request; mod response; -type HttpIncoming = ::tokio::net::Incoming; - -/// A Server that can accept incoming network requests. -#[derive(Debug)] -pub struct Server { - accepter: A, - addr: SocketAddr, +/// An instance of the HTTP protocol, and implementation of tokio-proto's +/// `ServerProto` trait. +/// +/// This structure is used to create instances of `Server` or to spawn off tasks +/// which handle a connection to an HTTP server. Each instance of `Http` can be +/// configured with various protocol-level options such as keepalive. +#[derive(Debug, Clone)] +pub struct Http { keep_alive: bool, - //idle_timeout: Option, - //max_sockets: usize, } -impl Server { - /// Creates a new Server from a Stream of Ios. - /// - /// The addr is the socket address the accepter is listening on. - pub fn new(accepter: A, addr: SocketAddr) -> Server { - Server { - accepter: accepter, - addr: addr, +/// An instance of a server created through `Http::bind`. +/// +/// This server is intended as a convenience for creating a TCP listener on an +/// address and then serving TCP connections accepted with the service provided. +pub struct Server { + protocol: Http, + new_service: S, + core: Core, + listener: TcpListener, + shutdown_timeout: Duration, +} + +impl Http { + /// Creates a new instance of the HTTP protocol, ready to spawn a server or + /// start accepting connections. + pub fn new() -> Http { + Http { keep_alive: true, - //idle_timeout: Some(Duration::from_secs(75)), - //max_sockets: 4096, } } /// Enables or disables HTTP keep-alive. /// /// Default is true. - pub fn keep_alive(mut self, val: bool) -> Server { + pub fn keep_alive(&mut self, val: bool) -> &mut Self { self.keep_alive = val; self } - /* - /// Sets how long an idle connection will be kept before closing. - /// - /// Default is 75 seconds. - pub fn idle_timeout(mut self, val: Option) -> Server { - self.idle_timeout = val; - self - } - */ - - /* - /// Sets the maximum open sockets for this Server. + /// Bind the provided `addr` and return a server ready to handle + /// connections. /// - /// Default is 4096, but most servers can handle much more than this. - pub fn max_sockets(mut self, val: usize) -> Server { - self.max_sockets = val; - self - } - */ -} - -impl Server { - /// Creates a new HTTP server config listening on the provided address. - pub fn http(addr: &SocketAddr, handle: &Handle) -> ::Result> { - let listener = try!(StdTcpListener::bind(addr)); - let addr = try!(listener.local_addr()); - let listener = try!(TcpListener::from_listener(listener, &addr, handle)); - Ok(Server::new(listener.incoming(), addr)) - } -} - - -/* -impl Server> { - /// Creates a new server config that will handle `HttpStream`s over SSL. + /// This method will bind the `addr` provided with a new TCP listener ready + /// to accept connections. Each connection will be processed with the + /// `new_service` object provided as well, creating a new service per + /// connection. /// - /// You can use any SSL implementation, as long as it implements `hyper::net::Ssl`. - pub fn https(addr: &SocketAddr, ssl: S) -> ::Result>> { - HttpsListener::new(addr, ssl) - .map(Server::new) - .map_err(From::from) - } -} -*/ - - -impl Server { - /// Binds to a socket and starts handling connections. - pub fn handle(self, factory: H, handle: &Handle) -> ::Result - where H: NewService + 'static { - let binder = HttpServer { - keep_alive: self.keep_alive, - }; - let inner_handle = handle.clone(); - handle.spawn(self.accepter.accept().for_each(move |(socket, remote_addr)| { - let service = HttpService { - inner: try!(factory.new_service()), - remote_addr: remote_addr, - }; - binder.bind_server(&inner_handle, socket, service); - Ok(()) - }).map_err(|e| { - error!("listener io error: {:?}", e); - () - })); - - Ok(self.addr) - } -} - -impl Server<()> { - /// Create a server that owns its event loop. - /// - /// The returned `ServerLoop` can be used to run the loop forever in the - /// thread. The returned `Listening` can be sent to another thread, and - /// used to shutdown the `ServerLoop`. - pub fn standalone(closure: F) -> ::Result<(Listening, ServerLoop)> - where F: FnOnce(&Handle) -> ::Result { + /// The returned `Server` contains one method, `run`, which is used to + /// actually run the server. + pub fn bind(&self, addr: &SocketAddr, new_service: S) -> ::Result> + where S: NewService + + Send + Sync + 'static, + { let core = try!(Core::new()); let handle = core.handle(); - let addr = try!(closure(&handle)); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - Ok(( - Listening { - addr: addr, - shutdown: shutdown_tx, - }, - ServerLoop { - inner: Some((core, shutdown_rx)), - } - )) - - } -} - -/// A configured `Server` ready to run. -pub struct ServerLoop { - inner: Option<(Core, oneshot::Receiver<()>)>, -} - -impl fmt::Debug for ServerLoop { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.pad("ServerLoop") + let listener = try!(TcpListener::bind(addr, &handle)); + + Ok(Server { + new_service: new_service, + core: core, + listener: listener, + protocol: self.clone(), + shutdown_timeout: Duration::new(1, 0), + }) } -} -impl ServerLoop { - /// Runs the server forever in this loop. + /// Use this `Http` instance to create a new server task which handles the + /// connection `io` provided. /// - /// This will block the current thread. - pub fn run(self) { - // drop will take care of it. - trace!("ServerLoop::run()"); - } -} - -impl Drop for ServerLoop { - fn drop(&mut self) { - self.inner.take().map(|(mut loop_, shutdown)| { - debug!("ServerLoop::drop running"); - let _ = loop_.run(shutdown.or_else(|_dropped| ::futures::future::empty::<(), oneshot::Canceled>())); - debug!("Server closed"); - }); - } -} - -/// A handle of the running server. -pub struct Listening { - addr: SocketAddr, - shutdown: ::futures::sync::oneshot::Sender<()>, -} - -impl fmt::Debug for Listening { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Listening") - .field("addr", &self.addr) - .finish() - } -} - -impl fmt::Display for Listening { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - fmt::Display::fmt(&self.addr, f) - } -} - -impl Listening { - /// The addresses this server is listening on. - pub fn addr(&self) -> &SocketAddr { - &self.addr - } - - /// Stop the server from listening to its socket address. - pub fn close(self) { - debug!("closing server {}", self); - self.shutdown.complete(()); + /// This is the low-level method used to actually spawn handling a TCP + /// connection, typically. The `handle` provided is the event loop on which + /// the server task will be spawned, `io` is the I/O object associated with + /// this connection (data that's read/written), `remote_addr` is the remote + /// peer address of the HTTP client, and `service` defines how HTTP requests + /// will be handled (and mapped to responses). + /// + /// This method is typically not invoked directly but is rather transitively + /// used through the `serve` helper method above. This can be useful, + /// however, when writing mocks or accepting sockets from a non-TCP + /// location. + pub fn bind_connection(&self, + handle: &Handle, + io: I, + remote_addr: SocketAddr, + service: S) + where S: Service + 'static, + I: Io + 'static, + { + self.bind_server(handle, io, HttpService { + inner: service, + remote_addr: remote_addr, + }) } } -struct HttpServer { - keep_alive: bool, -} - -impl ServerProto for HttpServer { +impl ServerProto for Http { type Request = http::RequestHead; type RequestBody = http::Chunk; type Response = ResponseHead; @@ -277,54 +179,169 @@ impl Service for HttpService } } -//private so the `Acceptor` type can stay internal -mod accept { - use std::io; - use std::net::SocketAddr; - use futures::{Stream, Poll}; - use tokio::io::Io; +impl Server + where S: NewService + + Send + Sync + 'static, +{ + /// Returns the local address that this server is bound to. + pub fn local_addr(&self) -> ::Result { + Ok(try!(self.listener.local_addr())) + } + + /// Returns a handle to the underlying event loop that this server will be + /// running on. + pub fn handle(&self) -> Handle { + self.core.handle() + } - /// An Acceptor is an incoming Stream of Io. + /// Configure the amount of time this server will wait for a "graceful + /// shutdown". /// - /// This trait is not implemented directly, and only exists to make the - /// intent clearer. A `Stream` - /// should be implemented instead. - pub trait Accept: Stream { - #[doc(hidden)] - type Output: Io + 'static; - #[doc(hidden)] - type Stream: Stream + 'static; - - #[doc(hidden)] - fn accept(self) -> Accepter - where Self: Sized; + /// This is the amount of time after the shutdown signal is received the + /// server will wait for all pending connections to finish. If the timeout + /// elapses then the server will be forcibly shut down. + /// + /// This defaults to 1s. + pub fn shutdown_timeout(&mut self, timeout: Duration) -> &mut Self { + self.shutdown_timeout = timeout; + self } - #[allow(missing_debug_implementations)] - pub struct Accepter + 'static, I: Io + 'static>(T, ::std::marker::PhantomData); + /// Execute this server infinitely. + /// + /// This method does not currently return, but it will return an error if + /// one occurs. + pub fn run(self) -> ::Result<()> { + self.run_until(future::empty()) + } - impl Stream for Accepter - where T: Stream, - I: Io + 'static, + /// Execute this server until the given future, `shutdown_signal`, resolves. + /// + /// This method, like `run` above, is used to execute this HTTP server. The + /// difference with `run`, however, is that this method allows for shutdown + /// in a graceful fashion. The future provided is interpreted as a signal to + /// shut down the server when it resolves. + /// + /// This method will block the current thread executing the HTTP server. + /// When the `shutdown_signal` has resolved then the TCP listener will be + /// unbound (dropped). The thread will continue to block for a maximum of + /// `shutdown_timeout` time waiting for active connections to shut down. + /// Once the `shutdown_timeout` elapses or all active connections are + /// cleaned out then this method will return. + pub fn run_until(self, shutdown_signal: F) -> ::Result<()> + where F: Future, { - type Item = T::Item; - type Error = io::Error; + let Server { protocol, new_service, mut core, listener, shutdown_timeout } = self; + let handle = core.handle(); + + // Mini future to track the number of active services + let info = Rc::new(RefCell::new(Info { + active: 0, + blocker: None, + })); + + // Future for our server's execution + let srv = listener.incoming().for_each(|(socket, addr)| { + let s = NotifyService { + inner: try!(new_service.new_service()), + info: Rc::downgrade(&info), + }; + info.borrow_mut().active += 1; + protocol.bind_connection(&handle, socket, addr, s); + Ok(()) + }); - #[inline] - fn poll(&mut self) -> Poll, Self::Error> { - self.0.poll() + // Main execution of the server. Here we use `select` to wait for either + // `incoming` or `f` to resolve. We know that `incoming` will never + // resolve with a success (it's infinite) so we're actually just waiting + // for an error or for `f`, our shutdown signal. + // + // When we get a shutdown signal (`Ok`) then we drop the TCP listener to + // stop accepting incoming connections. + match core.run(shutdown_signal.select(srv.map_err(|e| e.into()))) { + Ok(((), _incoming)) => {} + Err((e, _other)) => return Err(e), + } + + // Ok we've stopped accepting new connections at this point, but we want + // to give existing connections a chance to clear themselves out. Wait + // at most `shutdown_timeout` time before we just return clearing + // everything out. + // + // Our custom `WaitUntilZero` will resolve once all services constructed + // here have been destroyed. + let timeout = try!(Timeout::new(shutdown_timeout, &handle)); + let wait = WaitUntilZero { info: info.clone() }; + match core.run(wait.select(timeout)) { + Ok(_) => Ok(()), + Err((e, _)) => return Err(e.into()) } } +} - impl Accept for T - where T: Stream + 'static, - I: Io + 'static, - { - type Output = I; - type Stream = T; +impl fmt::Debug for Server { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Server") + .field("core", &"...") + .field("listener", &self.listener) + .field("new_service", &self.new_service) + .field("protocol", &self.protocol) + .finish() + } +} + +struct NotifyService { + inner: S, + info: Weak>, +} + +struct WaitUntilZero { + info: Rc>, +} + +struct Info { + active: usize, + blocker: Option, +} + +impl Service for NotifyService { + type Request = S::Request; + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; - fn accept(self) -> Accepter { - Accepter(self, ::std::marker::PhantomData) + fn call(&self, message: Self::Request) -> Self::Future { + self.inner.call(message) + } +} + +impl Drop for NotifyService { + fn drop(&mut self) { + let info = match self.info.upgrade() { + Some(info) => info, + None => return, + }; + let mut info = info.borrow_mut(); + info.active -= 1; + if info.active == 0 { + if let Some(task) = info.blocker.take() { + task.unpark(); + } + } + } +} + +impl Future for WaitUntilZero { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(), io::Error> { + let mut info = self.info.borrow_mut(); + if info.active == 0 { + Ok(().into()) + } else { + info.blocker = Some(task::park()); + Ok(Async::NotReady) } } } diff --git a/tests/server.rs b/tests/server.rs index d38b9b4b38..ad05f97519 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -4,27 +4,29 @@ extern crate futures; extern crate spmc; extern crate pretty_env_logger; -use futures::Future; -use futures::stream::Stream; +use futures::{Future, Stream}; +use futures::sync::oneshot; use std::net::{TcpStream, SocketAddr}; use std::io::{Read, Write}; use std::sync::mpsc; +use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; -use hyper::server::{Server, Request, Response, Service, NewService}; +use hyper::server::{Http, Request, Response, Service, NewService}; struct Serve { - listening: Option, + addr: SocketAddr, msg_rx: mpsc::Receiver, reply_tx: spmc::Sender, - spawn_rx: mpsc::Receiver<()>, + shutdown_signal: Option>, + thread: Option>, } impl Serve { fn addr(&self) -> &SocketAddr { - self.listening.as_ref().unwrap().addr() + &self.addr } fn body(&self) -> Vec { @@ -66,14 +68,14 @@ impl<'a> ReplyBuilder<'a> { impl Drop for Serve { fn drop(&mut self) { - self.listening.take().unwrap().close(); - self.spawn_rx.recv().expect("server thread should shutdown cleanly"); + drop(self.shutdown_signal.take()); + self.thread.take().unwrap().join().unwrap(); } } #[derive(Clone)] struct TestService { - tx: mpsc::Sender, + tx: Arc>>, reply: spmc::Receiver, _timeout: Option, } @@ -94,7 +96,7 @@ impl NewService for TestService { type Request = Request; type Response = Response; type Error = hyper::Error; - + type Instance = TestService; fn new_service(&self) -> std::io::Result { @@ -113,7 +115,7 @@ impl Service for TestService { let tx = self.tx.clone(); let replies = self.reply.clone(); req.body().for_each(move |chunk| { - tx.send(Msg::Chunk(chunk.to_vec())).unwrap(); + tx.lock().unwrap().send(Msg::Chunk(chunk.to_vec())).unwrap(); Ok(()) }).map(move |_| { let mut res = Response::new(); @@ -150,35 +152,32 @@ fn serve() -> Serve { fn serve_with_timeout(dur: Option) -> Serve { let _ = pretty_env_logger::init(); - let (thread_tx, thread_rx) = mpsc::channel(); - let (spawn_tx, spawn_rx) = mpsc::channel(); + let (addr_tx, addr_rx) = mpsc::channel(); let (msg_tx, msg_rx) = mpsc::channel(); let (reply_tx, reply_rx) = spmc::channel(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); let addr = "127.0.0.1:0".parse().unwrap(); let thread_name = format!("test-server-{:?}", dur); - thread::Builder::new().name(thread_name).spawn(move || { - let (listening, server) = Server::standalone(move |tokio| { - Server::http(&addr, tokio).unwrap() - .handle(TestService { - tx: msg_tx.clone(), - _timeout: dur, - reply: reply_rx, - }, tokio) + let thread = thread::Builder::new().name(thread_name).spawn(move || { + let srv = Http::new().bind(&addr, TestService { + tx: Arc::new(Mutex::new(msg_tx.clone())), + _timeout: dur, + reply: reply_rx, }).unwrap(); - thread_tx.send(listening).unwrap(); - server.run(); - spawn_tx.send(()).unwrap(); + addr_tx.send(srv.local_addr().unwrap()).unwrap(); + srv.run_until(shutdown_rx.then(|_| Ok(()))).unwrap(); }).unwrap(); - let listening = thread_rx.recv().unwrap(); + let addr = addr_rx.recv().unwrap(); Serve { - listening: Some(listening), msg_rx: msg_rx, reply_tx: reply_tx, - spawn_rx: spawn_rx, + addr: addr, + shutdown_signal: Some(shutdown_tx), + thread: Some(thread), } }