From 0285fc2acc6c6f3bbb63ce72691e0d1ee0cdc9ff Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Sun, 7 Sep 2014 14:18:51 -0700 Subject: [PATCH 1/5] Abstract out NetworkStream This introduces a new Trait, NetworkStream, which abstracts over the functionality provided by TcpStream so that it can be easily mocked and extended in testing and hyper can be used for other connection sources. --- examples/concurrent-server.rs | 11 ++-- examples/server.rs | 56 ++++++++-------- src/client/request.rs | 19 +++--- src/client/response.rs | 29 ++++---- src/lib.rs | 5 +- src/net.rs | 120 ++++++++++++++++++++++++++++++++++ src/server/mod.rs | 57 +++++++++------- src/server/request.rs | 26 ++++---- src/server/response.rs | 28 ++++---- 9 files changed, 240 insertions(+), 111 deletions(-) create mode 100644 src/net.rs diff --git a/examples/concurrent-server.rs b/examples/concurrent-server.rs index 0fb4c38f59..3bd371adee 100644 --- a/examples/concurrent-server.rs +++ b/examples/concurrent-server.rs @@ -1,4 +1,4 @@ -#![feature(macro_rules)] +#![feature(macro_rules, default_type_params)] extern crate hyper; extern crate debug; @@ -10,15 +10,16 @@ use std::sync::Arc; use hyper::{Get, Post}; use hyper::server::{Server, Handler, Incoming, Request, Response, Fresh}; use hyper::header::common::ContentLength; +use hyper::net::{HttpStream, HttpAcceptor}; trait ConcurrentHandler: Send + Sync { - fn handle(&self, req: Request, res: Response); + fn handle(&self, req: Request, res: Response); } struct Concurrent { handler: Arc } -impl Handler for Concurrent { - fn handle(self, mut incoming: Incoming) { +impl Handler for Concurrent { + fn handle(self, mut incoming: Incoming) { for (mut req, mut res) in incoming { let clone = self.handler.clone(); spawn(proc() { clone.handle(req, res) }) @@ -38,7 +39,7 @@ macro_rules! try_abort( struct Echo; impl ConcurrentHandler for Echo { - fn handle(&self, mut req: Request, mut res: Response) { + fn handle(&self, mut req: Request, mut res: Response) { match req.uri { hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) { (&Get, "/") | (&Get, "/echo") => { diff --git a/examples/server.rs b/examples/server.rs index e70db4fc72..0906951592 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -7,10 +7,8 @@ use std::io::util::copy; use std::io::net::ip::Ipv4Addr; use hyper::{Get, Post}; -use hyper::server::{Server, Handler, Incoming}; use hyper::header::common::ContentLength; - -struct Echo; +use hyper::server::{Server, Incoming}; macro_rules! try_continue( ($e:expr) => {{ @@ -21,41 +19,39 @@ macro_rules! try_continue( }} ) -impl Handler for Echo { - fn handle(self, mut incoming: Incoming) { - for (mut req, mut res) in incoming { - match req.uri { - hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) { - (&Get, "/") | (&Get, "/echo") => { - let out = b"Try POST /echo"; - - res.headers_mut().set(ContentLength(out.len())); - let mut res = try_continue!(res.start()); - try_continue!(res.write(out)); - try_continue!(res.end()); - continue; - }, - (&Post, "/echo") => (), // fall through, fighting mutable borrows - _ => { - *res.status_mut() = hyper::status::NotFound; - try_continue!(res.start().and_then(|res| res.end())); - continue; - } +fn echo(mut incoming: Incoming) { + for (mut req, mut res) in incoming { + match req.uri { + hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) { + (&Get, "/") | (&Get, "/echo") => { + let out = b"Try POST /echo"; + + res.headers_mut().set(ContentLength(out.len())); + let mut res = try_continue!(res.start()); + try_continue!(res.write(out)); + try_continue!(res.end()); + continue; }, + (&Post, "/echo") => (), // fall through, fighting mutable borrows _ => { + *res.status_mut() = hyper::status::NotFound; try_continue!(res.start().and_then(|res| res.end())); continue; } - }; - - let mut res = try_continue!(res.start()); - try_continue!(copy(&mut req, &mut res)); - try_continue!(res.end()); - } + }, + _ => { + try_continue!(res.start().and_then(|res| res.end())); + continue; + } + }; + + let mut res = try_continue!(res.start()); + try_continue!(copy(&mut req, &mut res)); + try_continue!(res.end()); } } fn main() { let server = Server::http(Ipv4Addr(127, 0, 0, 1), 1337); - server.listen(Echo).unwrap(); + server.listen(echo).unwrap(); } diff --git a/src/client/request.rs b/src/client/request.rs index c422a9422c..cc5e53f209 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -1,5 +1,4 @@ //! Client Requests -use std::io::net::tcp::TcpStream; use std::io::{BufferedWriter, IoResult}; use url::Url; @@ -7,6 +6,7 @@ use url::Url; use method; use header::Headers; use header::common::Host; +use net::{NetworkStream, HttpStream}; use rfc7230::LINE_ENDING; use version; use {HttpResult, HttpUriError}; @@ -14,7 +14,7 @@ use super::{Response}; /// A client request to a remote server. -pub struct Request { +pub struct Request { /// The method of this request. pub method: method::Method, /// The headers that will be sent with this request. @@ -24,13 +24,13 @@ pub struct Request { /// The HTTP version of this request. pub version: version::HttpVersion, headers_written: bool, - body: BufferedWriter, + body: BufferedWriter, } -impl Request { +impl Request { /// Create a new client request. - pub fn new(method: method::Method, url: Url) -> HttpResult { + pub fn new(method: method::Method, url: Url) -> HttpResult> { debug!("{} {}", method, url); let host = match url.serialize_host() { Some(host) => host, @@ -43,7 +43,7 @@ impl Request { }; debug!("port={}", port); - let stream = try_io!(TcpStream::connect(host.as_slice(), port)); + let stream = try_io!(NetworkStream::connect(host.as_slice(), port)); let stream = BufferedWriter::new(stream); let mut headers = Headers::new(); headers.set(Host(host)); @@ -82,16 +82,15 @@ impl Request { /// Completes writing the request, and returns a response to read from. /// /// Consumes the Request. - pub fn send(mut self) -> HttpResult { + pub fn send(mut self) -> HttpResult> { try_io!(self.flush()); - let mut raw = self.body.unwrap(); - try_io!(raw.close_write()); + let raw = self.body.unwrap(); Response::new(raw) } } -impl Writer for Request { +impl Writer for Request { fn write(&mut self, msg: &[u8]) -> IoResult<()> { if !self.headers_written { try!(self.write_head()); diff --git a/src/client/response.rs b/src/client/response.rs index bf39407b20..2e6f5eed76 100644 --- a/src/client/response.rs +++ b/src/client/response.rs @@ -1,33 +1,33 @@ //! Client Responses use std::io::{BufferedReader, IoResult}; -use std::io::net::tcp::TcpStream; use header; use header::common::{ContentLength, TransferEncoding}; use header::common::transfer_encoding::Chunked; +use net::{NetworkStream, HttpStream}; use rfc7230::{read_status_line, HttpReader, SizedReader, ChunkedReader, EofReader}; use status; use version; use {HttpResult}; /// A response for a client request to a remote server. -pub struct Response { +pub struct Response { /// The status from the server. pub status: status::StatusCode, /// The headers from the server. pub headers: header::Headers, /// The HTTP version of this response from the server. pub version: version::HttpVersion, - body: HttpReader>, + body: HttpReader>, } -impl Response { +impl Response { /// Creates a new response from a server. - pub fn new(tcp: TcpStream) -> HttpResult { - let mut tcp = BufferedReader::new(tcp); - let (version, status) = try!(read_status_line(&mut tcp)); - let mut headers = try!(header::Headers::from_raw(&mut tcp)); + pub fn new(stream: S) -> HttpResult> { + let mut stream = BufferedReader::new(stream); + let (version, status) = try!(read_status_line(&mut stream)); + let mut headers = try!(header::Headers::from_raw(&mut stream)); debug!("{} {}", version, status); debug!("{}", headers); @@ -40,22 +40,22 @@ impl Response { }; if codings.contains(&Chunked) { - ChunkedReader(tcp, None) + ChunkedReader(stream, None) } else { - debug!("not chucked. read till eof"); - EofReader(tcp) + debug!("not chuncked. read till eof"); + EofReader(stream) } } None => unreachable!() } } else if headers.has::() { match headers.get_ref::() { - Some(&ContentLength(len)) => SizedReader(tcp, len), + Some(&ContentLength(len)) => SizedReader(stream, len), None => unreachable!() } } else { debug!("neither Transfer-Encoding nor Content-Length"); - EofReader(tcp) + EofReader(stream) }; Ok(Response { @@ -67,7 +67,8 @@ impl Response { } } -impl Reader for Response { +impl Reader for Response { + #[inline] fn read(&mut self, buf: &mut [u8]) -> IoResult { self.body.read(buf) } diff --git a/src/lib.rs b/src/lib.rs index 5beb9b8cbd..44a052c86d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ //! # hyper -#![feature(macro_rules, phase)] +#![feature(macro_rules, phase, default_type_params)] #![warn(missing_doc)] -#![deny(warnings)] +//#![deny(warnings)] #![experimental] extern crate time; @@ -53,6 +53,7 @@ macro_rules! trace( pub mod client; pub mod method; pub mod header; +pub mod net; pub mod server; pub mod status; pub mod uri; diff --git a/src/net.rs b/src/net.rs new file mode 100644 index 0000000000..44d6e6b71f --- /dev/null +++ b/src/net.rs @@ -0,0 +1,120 @@ +//! A collection of traits abstracting over Listeners and Streams. +use std::io::{IoResult, Stream, Listener, Acceptor}; +use std::io::net::ip::{SocketAddr, Port}; +use std::io::net::tcp::{TcpStream, TcpListener, TcpAcceptor}; + +/// An abstraction to listen for connections on a certain port. +pub trait NetworkListener>: Listener { + /// Bind to a socket. + /// + /// Note: This does not start listening for connections. You must call + /// `listen()` to do that. + fn bind(host: &str, port: Port) -> IoResult; + + /// Get the address this Listener ended up listening on. + fn socket_name(&mut self) -> IoResult; +} + +/// An abstraction to receive `HttpStream`s. +pub trait NetworkAcceptor: Acceptor + Clone + Send { + /// Closes the Acceptor, so no more incoming connections will be handled. + fn close(&mut self) -> IoResult<()>; +} + +/// An abstraction over streams that a Server can utilize. +pub trait NetworkStream: Stream + Clone { + /// Get the remote address of the underlying connection. + fn peer_name(&mut self) -> IoResult; + + /// Connect to a remote address. + fn connect(host: &str, port: Port) -> IoResult; +} + +/// A `NetworkListener` for `HttpStream`s. +pub struct HttpListener { + inner: TcpListener +} + +impl Listener for HttpListener { + #[inline] + fn listen(self) -> IoResult { + Ok(HttpAcceptor { + inner: try!(self.inner.listen()) + }) + } +} + +impl NetworkListener for HttpListener { + #[inline] + fn bind(host: &str, port: Port) -> IoResult { + Ok(HttpListener { + inner: try!(TcpListener::bind(host, port)) + }) + } + + #[inline] + fn socket_name(&mut self) -> IoResult { + self.inner.socket_name() + } +} + +/// A `NetworkAcceptor` for `HttpStream`s. +#[deriving(Clone)] +pub struct HttpAcceptor { + inner: TcpAcceptor +} + +impl Acceptor for HttpAcceptor { + #[inline] + fn accept(&mut self) -> IoResult { + Ok(HttpStream { + inner: try!(self.inner.accept()) + }) + } +} + +impl NetworkAcceptor for HttpAcceptor { + #[inline] + fn close(&mut self) -> IoResult<()> { + self.inner.close_accept() + } +} + +/// A wrapper around a TcpStream. +#[deriving(Clone)] +pub struct HttpStream { + inner: TcpStream +} + +impl Reader for HttpStream { + #[inline] + fn read(&mut self, buf: &mut [u8]) -> IoResult { + self.inner.read(buf) + } +} + +impl Writer for HttpStream { + #[inline] + fn write(&mut self, msg: &[u8]) -> IoResult<()> { + self.inner.write(msg) + } + #[inline] + fn flush(&mut self) -> IoResult<()> { + self.inner.flush() + } +} + + +impl NetworkStream for HttpStream { + #[inline] + fn peer_name(&mut self) -> IoResult { + self.inner.peer_name() + } + + #[inline] + fn connect(host: &str, port: Port) -> IoResult { + Ok(HttpStream { + inner: try!(TcpStream::connect(host, port)) + }) + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index e33b390283..62ad25540e 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,11 +1,13 @@ //! HTTP Server -use std::io::net::tcp::{TcpListener, TcpAcceptor}; use std::io::{Acceptor, Listener, IoResult, EndOfFile, IncomingConnections}; use std::io::net::ip::{IpAddr, Port, SocketAddr}; pub use self::request::Request; pub use self::response::{Response, Fresh, Streaming}; +use net::{NetworkListener, NetworkAcceptor, NetworkStream}; +use net::{HttpListener, HttpAcceptor}; + pub mod request; pub mod response; @@ -13,32 +15,41 @@ pub mod response; /// /// Once listening, it will create a `Request`/`Response` pair for each /// incoming connection, and hand them to the provided handler. -pub struct Server { +pub struct Server { ip: IpAddr, port: Port } +impl Server { + /// Creates a new server that will handle `HttpStream`s. + pub fn http(ip: IpAddr, port: Port) -> Server { + Server { + ip: ip, + port: port + } + } +} -impl Server { +impl, S: NetworkStream, A: NetworkAcceptor> Server { - /// Creates a server to be used for `http` conenctions. - pub fn http(ip: IpAddr, port: Port) -> Server { + /// Creates a server that can listen for and handle `NetworkStreams`. + pub fn new(ip: IpAddr, port: Port) -> Server { Server { ip: ip, port: port } } + /// Binds to a socket, and starts handling connections. - pub fn listen(self, handler: H) -> IoResult { - let mut listener = try!(TcpListener::bind(self.ip.to_string().as_slice(), self.port)); + pub fn listen + 'static>(self, handler: H) -> IoResult> { + let mut listener: L = try!(NetworkListener::bind(self.ip.to_string().as_slice(), self.port)); let socket = try!(listener.socket_name()); let acceptor = try!(listener.listen()); - let worker = acceptor.clone(); + let mut worker = acceptor.clone(); spawn(proc() { - let mut acceptor = worker; - handler.handle(Incoming { from: acceptor.incoming() }); + handler.handle(Incoming { from: worker.incoming() }); }); Ok(Listening { @@ -51,12 +62,12 @@ impl Server { /// An iterator over incoming connections, represented as pairs of /// hyper Requests and Responses. -pub struct Incoming<'a> { - from: IncomingConnections<'a, TcpAcceptor> +pub struct Incoming<'a, A: 'a = HttpAcceptor> { + from: IncomingConnections<'a, A> } -impl<'a> Iterator<(Request, Response)> for Incoming<'a> { - fn next(&mut self) -> Option<(Request, Response)> { +impl<'a, A: NetworkAcceptor, S: NetworkStream> Iterator<(Request, Response)> for Incoming<'a, A> { + fn next(&mut self) -> Option<(Request, Response)> { for conn in self.from { match conn { Ok(stream) => { @@ -85,30 +96,30 @@ impl<'a> Iterator<(Request, Response)> for Incoming<'a> { } /// A listening server, which can later be closed. -pub struct Listening { - acceptor: TcpAcceptor, +pub struct Listening { + acceptor: A, /// The socket address that the server is bound to. pub socket_addr: SocketAddr, } -impl Listening { - /// Stop the server from listening to its socket address. +impl, S: NetworkStream> Listening { + /// Stop the server from listening to it's socket address. pub fn close(mut self) -> IoResult<()> { debug!("closing server"); - self.acceptor.close_accept() + self.acceptor.close() } } /// A handler that can handle incoming requests for a server. -pub trait Handler: Send { +pub trait Handler, S: NetworkStream>: Send { /// Receives a `Request`/`Response` pair, and should perform some action on them. /// /// This could reading from the request, and writing to the response. - fn handle(self, Incoming); + fn handle(self, Incoming); } -impl Handler for fn(Incoming) { - fn handle(self, incoming: Incoming) { +impl, S: NetworkStream> Handler for fn(Incoming) { + fn handle(self, incoming: Incoming) { (self)(incoming) } } diff --git a/src/server/request.rs b/src/server/request.rs index 718052803e..2480f6b6d1 100644 --- a/src/server/request.rs +++ b/src/server/request.rs @@ -4,7 +4,6 @@ //! target URI, headers, and message body. use std::io::{Reader, BufferedReader, IoResult}; use std::io::net::ip::SocketAddr; -use std::io::net::tcp::TcpStream; use {HttpResult}; use version::{HttpVersion}; @@ -13,10 +12,11 @@ use header::Headers; use header::common::ContentLength; use rfc7230::{read_request_line}; use rfc7230::{HttpReader, SizedReader, ChunkedReader}; +use net::{NetworkStream, HttpStream}; use uri::RequestUri; -/// A request bundles several parts of an incoming TCP stream, given to a `Handler`. -pub struct Request { +/// A request bundles several parts of an incoming `NetworkStream`, given to a `Handler`. +pub struct Request { /// The IP address of the remote connection. pub remote_addr: SocketAddr, /// The `Method`, such as `Get`, `Post`, etc. @@ -27,19 +27,19 @@ pub struct Request { pub uri: RequestUri, /// The version of HTTP for this request. pub version: HttpVersion, - body: HttpReader> + body: HttpReader> } -impl Request { +impl Request { /// Create a new Request, reading the StartLine and Headers so they are /// immediately useful. - pub fn new(mut tcp: TcpStream) -> HttpResult { - let remote_addr = try_io!(tcp.peer_name()); - let mut tcp = BufferedReader::new(tcp); - let (method, uri, version) = try!(read_request_line(&mut tcp)); - let mut headers = try!(Headers::from_raw(&mut tcp)); + pub fn new(mut stream: S) -> HttpResult> { + let remote_addr = try_io!(stream.peer_name()); + let mut stream = BufferedReader::new(stream); + let (method, uri, version) = try!(read_request_line(&mut stream)); + let mut headers = try!(Headers::from_raw(&mut stream)); debug!("{} {} {}", method, uri, version); debug!("{}", headers); @@ -47,12 +47,12 @@ impl Request { let body = if headers.has::() { match headers.get_ref::() { - Some(&ContentLength(len)) => SizedReader(tcp, len), + Some(&ContentLength(len)) => SizedReader(stream, len), None => unreachable!() } } else { todo!("check for Transfer-Encoding: chunked"); - ChunkedReader(tcp, None) + ChunkedReader(stream, None) }; Ok(Request { @@ -66,7 +66,7 @@ impl Request { } } -impl Reader for Request { +impl Reader for Request { fn read(&mut self, buf: &mut [u8]) -> IoResult { self.body.read(buf) } diff --git a/src/server/response.rs b/src/server/response.rs index ab3dc81e00..3ae34d671b 100644 --- a/src/server/response.rs +++ b/src/server/response.rs @@ -3,15 +3,15 @@ //! These are responses sent by a `hyper::Server` to clients, after //! receiving a request. use std::io::{BufferedWriter, IoResult}; -use std::io::net::tcp::TcpStream; use time::now_utc; use header; use header::common; +use rfc7230::{CR, LF, LINE_ENDING}; use status; +use net::NetworkStream; use version; -use rfc7230::{CR, LF, LINE_ENDING}; /// Phantom type indicating Headers and StatusCode have not been written. pub struct Fresh; @@ -26,18 +26,18 @@ impl WriteStatus for Streaming {} impl WriteStatus for Fresh {} /// The outgoing half for a Tcp connection, created by a `Server` and given to a `Handler`. -pub struct Response { +pub struct Response { /// The HTTP version of this response. pub version: version::HttpVersion, // Stream the Response is writing to, not accessible through UnwrittenResponse - body: BufferedWriter, // TODO: use a HttpWriter from rfc7230 + body: BufferedWriter, // TODO: use a HttpWriter from rfc7230 // The status code for the request. status: status::StatusCode, // The outgoing headers on this response. headers: header::Headers } -impl Response { +impl Response { /// The status of this response. #[inline] pub fn status(&self) -> status::StatusCode { self.status } @@ -47,9 +47,9 @@ impl Response { /// Construct a Response from its constituent parts. pub fn construct(version: version::HttpVersion, - body: BufferedWriter, + body: BufferedWriter, status: status::StatusCode, - headers: header::Headers) -> Response { + headers: header::Headers) -> Response { Response { status: status, version: version, @@ -59,19 +59,19 @@ impl Response { } } -impl Response { +impl Response { /// Creates a new Response that can be used to write to a network stream. - pub fn new(tcp: TcpStream) -> Response { + pub fn new(stream: S) -> Response { Response { status: status::Ok, version: version::Http11, headers: header::Headers::new(), - body: BufferedWriter::new(tcp) + body: BufferedWriter::new(stream) } } /// Consume this Response, writing the Headers and Status and creating a Response - pub fn start(mut self) -> IoResult> { + pub fn start(mut self) -> IoResult> { debug!("writing head: {} {}", self.version, self.status); try!(write!(self.body, "{} {}{}{}", self.version, self.status, CR as char, LF as char)); @@ -104,12 +104,12 @@ impl Response { pub fn headers_mut(&mut self) -> &mut header::Headers { &mut self.headers } /// Deconstruct this Response into its constituent parts. - pub fn deconstruct(self) -> (version::HttpVersion, BufferedWriter, status::StatusCode, header::Headers) { + pub fn deconstruct(self) -> (version::HttpVersion, BufferedWriter, status::StatusCode, header::Headers) { (self.version, self.body, self.status, self.headers) } } -impl Response { +impl Response { /// Flushes all writing of a response to the client. pub fn end(mut self) -> IoResult<()> { debug!("ending"); @@ -117,7 +117,7 @@ impl Response { } } -impl Writer for Response { +impl Writer for Response { fn write(&mut self, msg: &[u8]) -> IoResult<()> { debug!("write {:u} bytes", msg.len()); self.body.write(msg) From 76a58940d813c4c6d8e42700e19b53542ba2e29b Mon Sep 17 00:00:00 2001 From: Jonathan Reem Date: Tue, 9 Sep 2014 14:28:06 -0700 Subject: [PATCH 2/5] Use trait objects and dynamic dispatch to abstract over NetworkStream Server and client benchmarks show that this makes very little difference in performance and using dynamic dispatch here is significantly more ergonomic. This also bounds NetworkStream with Send to prevent incorrect implementations. Allows the implementation of mock streams for testing and flexibility. Fixes #5 --- examples/concurrent-server.rs | 4 ++-- src/net.rs | 15 ++++++++++++++- src/server/mod.rs | 14 ++++++-------- src/server/request.rs | 16 ++++++++-------- src/server/response.rs | 25 +++++++++++++------------ 5 files changed, 43 insertions(+), 31 deletions(-) diff --git a/examples/concurrent-server.rs b/examples/concurrent-server.rs index 3bd371adee..740312fb1c 100644 --- a/examples/concurrent-server.rs +++ b/examples/concurrent-server.rs @@ -13,7 +13,7 @@ use hyper::header::common::ContentLength; use hyper::net::{HttpStream, HttpAcceptor}; trait ConcurrentHandler: Send + Sync { - fn handle(&self, req: Request, res: Response); + fn handle(&self, req: Request, res: Response); } struct Concurrent { handler: Arc } @@ -39,7 +39,7 @@ macro_rules! try_abort( struct Echo; impl ConcurrentHandler for Echo { - fn handle(&self, mut req: Request, mut res: Response) { + fn handle(&self, mut req: Request, mut res: Response) { match req.uri { hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) { (&Get, "/") | (&Get, "/echo") => { diff --git a/src/net.rs b/src/net.rs index 44d6e6b71f..943ba0a64f 100644 --- a/src/net.rs +++ b/src/net.rs @@ -22,7 +22,7 @@ pub trait NetworkAcceptor: Acceptor + Clone + Send { } /// An abstraction over streams that a Server can utilize. -pub trait NetworkStream: Stream + Clone { +pub trait NetworkStream: Stream + Clone + Send { /// Get the remote address of the underlying connection. fn peer_name(&mut self) -> IoResult; @@ -30,6 +30,19 @@ pub trait NetworkStream: Stream + Clone { fn connect(host: &str, port: Port) -> IoResult; } +impl Reader for Box { + #[inline] + fn read(&mut self, buf: &mut [u8]) -> IoResult { self.read(buf) } +} + +impl Writer for Box { + #[inline] + fn write(&mut self, msg: &[u8]) -> IoResult<()> { self.write(msg) } + + #[inline] + fn flush(&mut self) -> IoResult<()> { self.flush() } +} + /// A `NetworkListener` for `HttpStream`s. pub struct HttpListener { inner: TcpListener diff --git a/src/server/mod.rs b/src/server/mod.rs index 62ad25540e..6362df0c1d 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -6,7 +6,7 @@ pub use self::request::Request; pub use self::response::{Response, Fresh, Streaming}; use net::{NetworkListener, NetworkAcceptor, NetworkStream}; -use net::{HttpListener, HttpAcceptor}; +use net::HttpListener; pub mod request; pub mod response; @@ -31,7 +31,6 @@ impl Server { } impl, S: NetworkStream, A: NetworkAcceptor> Server { - /// Creates a server that can listen for and handle `NetworkStreams`. pub fn new(ip: IpAddr, port: Port) -> Server { Server { @@ -40,9 +39,8 @@ impl, S: NetworkStream, A: NetworkAcceptor> Server + 'static>(self, handler: H) -> IoResult> { + pub fn listen>(self, handler: H) -> IoResult> { let mut listener: L = try!(NetworkListener::bind(self.ip.to_string().as_slice(), self.port)); let socket = try!(listener.socket_name()); let acceptor = try!(listener.listen()); @@ -62,12 +60,12 @@ impl, S: NetworkStream, A: NetworkAcceptor> Server { +pub struct Incoming<'a, A: 'a> { from: IncomingConnections<'a, A> } -impl<'a, A: NetworkAcceptor, S: NetworkStream> Iterator<(Request, Response)> for Incoming<'a, A> { - fn next(&mut self) -> Option<(Request, Response)> { +impl<'a, S: NetworkStream, A: NetworkAcceptor> Iterator<(Request, Response)> for Incoming<'a, A> { + fn next(&mut self) -> Option<(Request, Response)> { for conn in self.from { match conn { Ok(stream) => { @@ -96,7 +94,7 @@ impl<'a, A: NetworkAcceptor, S: NetworkStream> Iterator<(Request, Response } /// A listening server, which can later be closed. -pub struct Listening { +pub struct Listening { acceptor: A, /// The socket address that the server is bound to. pub socket_addr: SocketAddr, diff --git a/src/server/request.rs b/src/server/request.rs index 2480f6b6d1..81570d6796 100644 --- a/src/server/request.rs +++ b/src/server/request.rs @@ -12,11 +12,11 @@ use header::Headers; use header::common::ContentLength; use rfc7230::{read_request_line}; use rfc7230::{HttpReader, SizedReader, ChunkedReader}; -use net::{NetworkStream, HttpStream}; +use net::NetworkStream; use uri::RequestUri; /// A request bundles several parts of an incoming `NetworkStream`, given to a `Handler`. -pub struct Request { +pub struct Request { /// The IP address of the remote connection. pub remote_addr: SocketAddr, /// The `Method`, such as `Get`, `Post`, etc. @@ -27,17 +27,17 @@ pub struct Request { pub uri: RequestUri, /// The version of HTTP for this request. pub version: HttpVersion, - body: HttpReader> + body: HttpReader>> } -impl Request { +impl Request { /// Create a new Request, reading the StartLine and Headers so they are /// immediately useful. - pub fn new(mut stream: S) -> HttpResult> { + pub fn new(mut stream: S) -> HttpResult { let remote_addr = try_io!(stream.peer_name()); - let mut stream = BufferedReader::new(stream); + let mut stream = BufferedReader::new(box stream as Box); let (method, uri, version) = try!(read_request_line(&mut stream)); let mut headers = try!(Headers::from_raw(&mut stream)); @@ -61,12 +61,12 @@ impl Request { uri: uri, headers: headers, version: version, - body: body, + body: body }) } } -impl Reader for Request { +impl Reader for Request { fn read(&mut self, buf: &mut [u8]) -> IoResult { self.body.read(buf) } diff --git a/src/server/response.rs b/src/server/response.rs index 3ae34d671b..165979fb6b 100644 --- a/src/server/response.rs +++ b/src/server/response.rs @@ -26,18 +26,18 @@ impl WriteStatus for Streaming {} impl WriteStatus for Fresh {} /// The outgoing half for a Tcp connection, created by a `Server` and given to a `Handler`. -pub struct Response { +pub struct Response { /// The HTTP version of this response. pub version: version::HttpVersion, // Stream the Response is writing to, not accessible through UnwrittenResponse - body: BufferedWriter, // TODO: use a HttpWriter from rfc7230 + body: BufferedWriter>, // TODO: use a HttpWriter from rfc7230 // The status code for the request. status: status::StatusCode, // The outgoing headers on this response. headers: header::Headers } -impl Response { +impl Response { /// The status of this response. #[inline] pub fn status(&self) -> status::StatusCode { self.status } @@ -47,9 +47,9 @@ impl Response { /// Construct a Response from its constituent parts. pub fn construct(version: version::HttpVersion, - body: BufferedWriter, + body: BufferedWriter>, status: status::StatusCode, - headers: header::Headers) -> Response { + headers: header::Headers) -> Response { Response { status: status, version: version, @@ -59,19 +59,19 @@ impl Response { } } -impl Response { +impl Response { /// Creates a new Response that can be used to write to a network stream. - pub fn new(stream: S) -> Response { + pub fn new(stream: S) -> Response { Response { status: status::Ok, version: version::Http11, headers: header::Headers::new(), - body: BufferedWriter::new(stream) + body: BufferedWriter::new(box stream as Box) } } /// Consume this Response, writing the Headers and Status and creating a Response - pub fn start(mut self) -> IoResult> { + pub fn start(mut self) -> IoResult> { debug!("writing head: {} {}", self.version, self.status); try!(write!(self.body, "{} {}{}{}", self.version, self.status, CR as char, LF as char)); @@ -104,12 +104,13 @@ impl Response { pub fn headers_mut(&mut self) -> &mut header::Headers { &mut self.headers } /// Deconstruct this Response into its constituent parts. - pub fn deconstruct(self) -> (version::HttpVersion, BufferedWriter, status::StatusCode, header::Headers) { + pub fn deconstruct(self) -> (version::HttpVersion, BufferedWriter>, + status::StatusCode, header::Headers) { (self.version, self.body, self.status, self.headers) } } -impl Response { +impl Response { /// Flushes all writing of a response to the client. pub fn end(mut self) -> IoResult<()> { debug!("ending"); @@ -117,7 +118,7 @@ impl Response { } } -impl Writer for Response { +impl Writer for Response { fn write(&mut self, msg: &[u8]) -> IoResult<()> { debug!("write {:u} bytes", msg.len()); self.body.write(msg) From 80268673345745500d55b23f0c7867c9f141e21a Mon Sep 17 00:00:00 2001 From: Jonathan Reem Date: Tue, 9 Sep 2014 17:06:23 -0700 Subject: [PATCH 3/5] Deny missing documentation and all warnings for cleaner builds. --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 44a052c86d..c4463cfcdf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ //! # hyper #![feature(macro_rules, phase, default_type_params)] -#![warn(missing_doc)] -//#![deny(warnings)] +#![deny(missing_doc)] +#![deny(warnings)] #![experimental] extern crate time; From ed491655dd0872016ee61788295df2c48525c34b Mon Sep 17 00:00:00 2001 From: Jonathan Reem Date: Tue, 9 Sep 2014 16:51:32 -0700 Subject: [PATCH 4/5] Use dynamic dispatch for client Request and Response through Box Also adds a convenience `abstract` method to NetworkStream for creating Box from a NetworkStream. --- src/client/request.rs | 16 ++++++++-------- src/client/response.rs | 10 +++++----- src/net.rs | 16 ++++++++++++++++ src/server/request.rs | 2 +- src/server/response.rs | 2 +- 5 files changed, 31 insertions(+), 15 deletions(-) diff --git a/src/client/request.rs b/src/client/request.rs index cc5e53f209..c48e5196c7 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -14,7 +14,7 @@ use super::{Response}; /// A client request to a remote server. -pub struct Request { +pub struct Request { /// The method of this request. pub method: method::Method, /// The headers that will be sent with this request. @@ -24,13 +24,13 @@ pub struct Request { /// The HTTP version of this request. pub version: version::HttpVersion, headers_written: bool, - body: BufferedWriter, + body: BufferedWriter>, } -impl Request { +impl Request { /// Create a new client request. - pub fn new(method: method::Method, url: Url) -> HttpResult> { + pub fn new(method: method::Method, url: Url) -> HttpResult { debug!("{} {}", method, url); let host = match url.serialize_host() { Some(host) => host, @@ -43,8 +43,8 @@ impl Request { }; debug!("port={}", port); - let stream = try_io!(NetworkStream::connect(host.as_slice(), port)); - let stream = BufferedWriter::new(stream); + let stream: HttpStream = try_io!(NetworkStream::connect(host.as_slice(), port)); + let stream = BufferedWriter::new(stream.abstract()); let mut headers = Headers::new(); headers.set(Host(host)); Ok(Request { @@ -82,7 +82,7 @@ impl Request { /// Completes writing the request, and returns a response to read from. /// /// Consumes the Request. - pub fn send(mut self) -> HttpResult> { + pub fn send(mut self) -> HttpResult { try_io!(self.flush()); let raw = self.body.unwrap(); Response::new(raw) @@ -90,7 +90,7 @@ impl Request { } -impl Writer for Request { +impl Writer for Request { fn write(&mut self, msg: &[u8]) -> IoResult<()> { if !self.headers_written { try!(self.write_head()); diff --git a/src/client/response.rs b/src/client/response.rs index 2e6f5eed76..ef05cf40d5 100644 --- a/src/client/response.rs +++ b/src/client/response.rs @@ -18,14 +18,14 @@ pub struct Response { pub headers: header::Headers, /// The HTTP version of this response from the server. pub version: version::HttpVersion, - body: HttpReader>, + body: HttpReader>>, } -impl Response { +impl Response { /// Creates a new response from a server. - pub fn new(stream: S) -> HttpResult> { - let mut stream = BufferedReader::new(stream); + pub fn new(stream: Box) -> HttpResult { + let mut stream = BufferedReader::new(stream.abstract()); let (version, status) = try!(read_status_line(&mut stream)); let mut headers = try!(header::Headers::from_raw(&mut stream)); @@ -67,7 +67,7 @@ impl Response { } } -impl Reader for Response { +impl Reader for Response { #[inline] fn read(&mut self, buf: &mut [u8]) -> IoResult { self.body.read(buf) diff --git a/src/net.rs b/src/net.rs index 943ba0a64f..2a3f5796f8 100644 --- a/src/net.rs +++ b/src/net.rs @@ -28,6 +28,22 @@ pub trait NetworkStream: Stream + Clone + Send { /// Connect to a remote address. fn connect(host: &str, port: Port) -> IoResult; + + /// Turn this into an appropriately typed trait object. + #[inline] + fn abstract(self) -> Box { + box self as Box + } + + #[doc(hidden)] + #[inline] + // Hack to work around lack of Clone impl for Box + fn clone_box(&self) -> Box { self.clone().abstract() } +} + +impl Clone for Box { + #[inline] + fn clone(&self) -> Box { self.clone_box() } } impl Reader for Box { diff --git a/src/server/request.rs b/src/server/request.rs index 81570d6796..89071afb5d 100644 --- a/src/server/request.rs +++ b/src/server/request.rs @@ -37,7 +37,7 @@ impl Request { /// immediately useful. pub fn new(mut stream: S) -> HttpResult { let remote_addr = try_io!(stream.peer_name()); - let mut stream = BufferedReader::new(box stream as Box); + let mut stream = BufferedReader::new(stream.abstract()); let (method, uri, version) = try!(read_request_line(&mut stream)); let mut headers = try!(Headers::from_raw(&mut stream)); diff --git a/src/server/response.rs b/src/server/response.rs index 165979fb6b..155138eb64 100644 --- a/src/server/response.rs +++ b/src/server/response.rs @@ -66,7 +66,7 @@ impl Response { status: status::Ok, version: version::Http11, headers: header::Headers::new(), - body: BufferedWriter::new(box stream as Box) + body: BufferedWriter::new(stream.abstract()) } } From 632250b431958b6cf9a236058091da53e9a514c2 Mon Sep 17 00:00:00 2001 From: Jonathan Reem Date: Tue, 9 Sep 2014 17:02:12 -0700 Subject: [PATCH 5/5] Update benchmarks and introduce more default type params for remaining generics The client benchmarks did not have to be changed at all for this whole refactor, and the server benchmark only had to specify a single type parameter, and only because it writes out the type of Listener, which is not normal usage. --- benches/client.rs | 3 ++- src/server/mod.rs | 5 ++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/benches/client.rs b/benches/client.rs index 680084f1f1..ad92b98501 100644 --- a/benches/client.rs +++ b/benches/client.rs @@ -8,8 +8,9 @@ extern crate test; use std::fmt::{mod, Show}; use std::io::net::ip::Ipv4Addr; use hyper::server::{Incoming, Server}; +use hyper::net::HttpAcceptor; -fn listen() -> hyper::server::Listening { +fn listen() -> hyper::server::Listening { let server = Server::http(Ipv4Addr(127, 0, 0, 1), 0); server.listen(handle).unwrap() } diff --git a/src/server/mod.rs b/src/server/mod.rs index 6362df0c1d..5afbbf95db 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -5,8 +5,7 @@ use std::io::net::ip::{IpAddr, Port, SocketAddr}; pub use self::request::Request; pub use self::response::{Response, Fresh, Streaming}; -use net::{NetworkListener, NetworkAcceptor, NetworkStream}; -use net::HttpListener; +use net::{NetworkListener, NetworkAcceptor, NetworkStream, HttpAcceptor, HttpListener}; pub mod request; pub mod response; @@ -60,7 +59,7 @@ impl, S: NetworkStream, A: NetworkAcceptor> Server { +pub struct Incoming<'a, A: 'a = HttpAcceptor> { from: IncomingConnections<'a, A> }