Skip to content

Commit ccc9f1f

Browse files
committed
abstract out NetworkStreams
1 parent 2ac1305 commit ccc9f1f

File tree

7 files changed

+199
-81
lines changed

7 files changed

+199
-81
lines changed

benches/client.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,19 @@ extern crate hyper;
55
extern crate test;
66

77
use std::fmt::{mod, Show};
8-
use std::str::{SendStr, Slice};
9-
use std::io::IoResult;
108
use std::io::net::ip::Ipv4Addr;
11-
use hyper::server::{Request, Response, Server};
9+
use hyper::server::{Server, Incoming};
1210

1311
fn listen() -> hyper::server::Listening {
1412
let server = Server::http(Ipv4Addr(127, 0, 0, 1), 0);
1513
server.listen(handle).unwrap()
1614
}
1715

18-
fn handle(_req: Request, mut res: Response) -> IoResult<()> {
19-
try!(res.write(b"Benchmarking hyper vs others!"));
20-
res.end()
16+
fn handle(mut incoming: Incoming) {
17+
for (_, mut res) in incoming {
18+
res.write(b"Benchmarking hyper vs others!").unwrap();
19+
res.end().unwrap();
20+
}
2121
}
2222

2323

examples/server.rs

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,9 @@ use std::io::util::copy;
77
use std::io::net::ip::Ipv4Addr;
88

99
use hyper::{Get, Post};
10-
use hyper::server::{Server, Handler, Incoming};
10+
use hyper::server::{Server, Incoming};
1111
use hyper::header::ContentLength;
1212

13-
struct Echo;
14-
1513
macro_rules! try_continue(
1614
($e:expr) => {{
1715
match $e {
@@ -21,39 +19,37 @@ macro_rules! try_continue(
2119
}}
2220
)
2321

24-
impl Handler for Echo {
25-
fn handle(self, mut incoming: Incoming) {
26-
for (mut req, mut res) in incoming {
27-
match req.uri {
28-
hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) {
29-
(&Get, "/") | (&Get, "/echo") => {
30-
let out = b"Try POST /echo";
31-
32-
res.headers.set(ContentLength(out.len()));
33-
try_continue!(res.write(out));
34-
try_continue!(res.end());
35-
continue;
36-
},
37-
(&Post, "/echo") => (), // fall through, fighting mutable borrows
38-
_ => {
39-
res.status = hyper::status::NotFound;
40-
try_continue!(res.end());
41-
continue;
42-
}
22+
fn echo(mut incoming: Incoming) {
23+
for (mut req, mut res) in incoming {
24+
match req.uri {
25+
hyper::uri::AbsolutePath(ref path) => match (&req.method, path.as_slice()) {
26+
(&Get, "/") | (&Get, "/echo") => {
27+
let out = b"Try POST /echo";
28+
29+
res.headers.set(ContentLength(out.len()));
30+
try_continue!(res.write(out));
31+
try_continue!(res.end());
32+
continue;
4333
},
34+
(&Post, "/echo") => (), // fall through, fighting mutable borrows
4435
_ => {
36+
res.status = hyper::status::NotFound;
4537
try_continue!(res.end());
46-
continue;
38+
continue;
4739
}
48-
};
49-
50-
try_continue!(copy(&mut req, &mut res));
51-
try_continue!(res.end());
52-
}
40+
},
41+
_ => {
42+
try_continue!(res.end());
43+
continue;
44+
}
45+
};
46+
47+
try_continue!(copy(&mut req, &mut res));
48+
try_continue!(res.end());
5349
}
5450
}
5551

5652
fn main() {
5753
let server = Server::http(Ipv4Addr(127, 0, 0, 1), 1337);
58-
server.listen(Echo).unwrap();
54+
server.listen(echo).unwrap();
5955
}

src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! # hyper
2-
#![feature(macro_rules, phase)]
2+
#![feature(macro_rules, phase, default_type_params)]
33
#![warn(missing_doc)]
4-
#![deny(warnings)]
4+
//#![deny(warnings)]
55
#![experimental]
66

77
extern crate time;
@@ -47,6 +47,7 @@ macro_rules! trace(
4747
pub mod client;
4848
pub mod method;
4949
pub mod header;
50+
pub mod net;
5051
pub mod server;
5152
pub mod status;
5253
pub mod uri;

src/net.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
//! A collection of traits abstracting over Listeners and Streams.
2+
use std::io::{IoResult, Stream, Listener, Acceptor};
3+
use std::io::net::ip::{SocketAddr, Port};
4+
use std::io::net::tcp::{TcpStream, TcpListener, TcpAcceptor};
5+
6+
/// An abstraction to listen for connections on a certain port.
7+
pub trait NetworkListener<S: NetworkStream, A: NetworkAcceptor<S>>: Listener<S, A> {
8+
/// Bind to a socket.
9+
///
10+
/// Note: This does not start listening for connections. You must call
11+
/// `listen()` to do that.
12+
fn bind(host: &str, port: Port) -> IoResult<Self>;
13+
14+
/// Get the address this Listener ended up listening on.
15+
fn socket_name(&mut self) -> IoResult<SocketAddr>;
16+
}
17+
18+
/// An abstraction to receive `HttpStream`s.
19+
pub trait NetworkAcceptor<S: NetworkStream>: Acceptor<S> + Clone + Send {
20+
/// Closes the Acceptor, so no more incoming connections will be handled.
21+
fn close(&mut self) -> IoResult<()>;
22+
}
23+
24+
/// An abstraction over streams that a Server can utilize.
25+
pub trait NetworkStream: Stream + Clone {
26+
/// Get the remote address of the underlying connection.
27+
fn peer_name(&mut self) -> IoResult<SocketAddr>;
28+
}
29+
30+
/// A `NetworkListener` for `HttpStream`s.
31+
pub struct HttpListener {
32+
inner: TcpListener
33+
}
34+
35+
impl Listener<HttpStream, HttpAcceptor> for HttpListener {
36+
#[inline]
37+
fn listen(self) -> IoResult<HttpAcceptor> {
38+
Ok(HttpAcceptor {
39+
inner: try!(self.inner.listen())
40+
})
41+
}
42+
}
43+
44+
impl NetworkListener<HttpStream, HttpAcceptor> for HttpListener {
45+
#[inline]
46+
fn bind(host: &str, port: Port) -> IoResult<HttpListener> {
47+
Ok(HttpListener {
48+
inner: try!(TcpListener::bind(host, port))
49+
})
50+
}
51+
52+
#[inline]
53+
fn socket_name(&mut self) -> IoResult<SocketAddr> {
54+
self.inner.socket_name()
55+
}
56+
}
57+
58+
/// A `NetworkAcceptor` for `HttpStream`s.
59+
#[deriving(Clone)]
60+
pub struct HttpAcceptor {
61+
inner: TcpAcceptor
62+
}
63+
64+
impl Acceptor<HttpStream> for HttpAcceptor {
65+
#[inline]
66+
fn accept(&mut self) -> IoResult<HttpStream> {
67+
Ok(HttpStream {
68+
inner: try!(self.inner.accept())
69+
})
70+
}
71+
}
72+
73+
impl NetworkAcceptor<HttpStream> for HttpAcceptor {
74+
#[inline]
75+
fn close(&mut self) -> IoResult<()> {
76+
self.inner.close_accept()
77+
}
78+
}
79+
80+
/// A wrapper around a TcpStream.
81+
#[deriving(Clone)]
82+
pub struct HttpStream {
83+
inner: TcpStream
84+
}
85+
86+
impl Reader for HttpStream {
87+
#[inline]
88+
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
89+
self.inner.read(buf)
90+
}
91+
}
92+
93+
impl Writer for HttpStream {
94+
#[inline]
95+
fn write(&mut self, msg: &[u8]) -> IoResult<()> {
96+
self.inner.write(msg)
97+
}
98+
#[inline]
99+
fn flush(&mut self) -> IoResult<()> {
100+
self.inner.flush()
101+
}
102+
}
103+
104+
105+
impl NetworkStream for HttpStream {
106+
#[inline]
107+
fn peer_name(&mut self) -> IoResult<SocketAddr> {
108+
self.inner.peer_name()
109+
}
110+
}

src/server/mod.rs

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,55 @@
11
//! HTTP Server
2-
use std::io::net::tcp::{TcpListener, TcpAcceptor};
32
use std::io::{Acceptor, Listener, IoResult, EndOfFile, IncomingConnections};
43
use std::io::net::ip::{IpAddr, Port, SocketAddr};
54

65
pub use self::request::Request;
76
pub use self::response::Response;
87

8+
use net::{NetworkListener, NetworkAcceptor, NetworkStream};
9+
use net::{HttpListener, HttpAcceptor};
10+
911
pub mod request;
1012
pub mod response;
1113

1214
/// A server can listen on a TCP socket.
1315
///
1416
/// Once listening, it will create a `Request`/`Response` pair for each
1517
/// incoming connection, and hand them to the provided handler.
16-
pub struct Server {
18+
pub struct Server<L = HttpListener> {
1719
ip: IpAddr,
1820
port: Port
1921
}
2022

23+
impl Server<HttpListener> {
24+
/// Creates a new server that will handle `HttpStream`s.
25+
pub fn http(ip: IpAddr, port: Port) -> Server {
26+
Server {
27+
ip: ip,
28+
port: port
29+
}
30+
}
31+
}
2132

22-
impl Server {
33+
impl<L: NetworkListener<S, A>, S: NetworkStream, A: NetworkAcceptor<S>> Server<L> {
2334

24-
/// Creates a server to be used for `http` conenctions.
25-
pub fn http(ip: IpAddr, port: Port) -> Server {
35+
/// Creates a server that can listen for and handle `NetworkStreams`.
36+
pub fn new(ip: IpAddr, port: Port) -> Server<L> {
2637
Server {
2738
ip: ip,
2839
port: port
2940
}
3041
}
3142

43+
3244
/// Binds to a socket, and starts handling connections.
33-
pub fn listen<H: Handler + 'static>(self, handler: H) -> IoResult<Listening> {
34-
let mut listener = try!(TcpListener::bind(self.ip.to_string().as_slice(), self.port));
45+
pub fn listen<H: Handler<A, S> + 'static>(self, handler: H) -> IoResult<Listening<A>> {
46+
let mut listener: L = try!(NetworkListener::bind(self.ip.to_string().as_slice(), self.port));
3547
let socket = try!(listener.socket_name());
3648
let acceptor = try!(listener.listen());
37-
let worker = acceptor.clone();
49+
let mut worker = acceptor.clone();
3850

3951
spawn(proc() {
40-
let mut acceptor = worker;
41-
handler.handle(Incoming { from: acceptor.incoming() });
52+
handler.handle(Incoming { from: worker.incoming() });
4253
});
4354

4455
Ok(Listening {
@@ -51,12 +62,12 @@ impl Server {
5162

5263
/// An iterator over incoming connections, represented as pairs of
5364
/// hyper Requests and Responses.
54-
pub struct Incoming<'a> {
55-
from: IncomingConnections<'a, TcpAcceptor>
65+
pub struct Incoming<'a, A: 'a = HttpAcceptor> {
66+
from: IncomingConnections<'a, A>
5667
}
5768

58-
impl<'a> Iterator<(Request, Response)> for Incoming<'a> {
59-
fn next(&mut self) -> Option<(Request, Response)> {
69+
impl<'a, A: NetworkAcceptor<S>, S: NetworkStream> Iterator<(Request<S>, Response<S>)> for Incoming<'a, A> {
70+
fn next(&mut self) -> Option<(Request<S>, Response<S>)> {
6071
for conn in self.from {
6172
match conn {
6273
Ok(stream) => {
@@ -85,30 +96,30 @@ impl<'a> Iterator<(Request, Response)> for Incoming<'a> {
8596
}
8697

8798
/// A listening server, which can later be closed.
88-
pub struct Listening {
89-
acceptor: TcpAcceptor,
99+
pub struct Listening<A = HttpAcceptor> {
100+
acceptor: A,
90101
/// The socket address that the server is bound to.
91102
pub socket_addr: SocketAddr,
92103
}
93104

94-
impl Listening {
105+
impl<A: NetworkAcceptor<S>, S: NetworkStream> Listening<A> {
95106
/// Stop the server from listening to it's socket address.
96107
pub fn close(mut self) -> IoResult<()> {
97108
debug!("closing server");
98-
self.acceptor.close_accept()
109+
self.acceptor.close()
99110
}
100111
}
101112

102113
/// A handler that can handle incoming requests for a server.
103-
pub trait Handler: Send {
114+
pub trait Handler<A: NetworkAcceptor<S>, S: NetworkStream>: Send {
104115
/// Receives a `Request`/`Response` pair, and should perform some action on them.
105116
///
106117
/// This could reading from the request, and writing to the response.
107-
fn handle(self, Incoming);
118+
fn handle(self, Incoming<A>);
108119
}
109120

110-
impl Handler for fn(Incoming) {
111-
fn handle(self, incoming: Incoming) {
121+
impl<A: NetworkAcceptor<S>, S: NetworkStream> Handler<A, S> for fn(Incoming<A>) {
122+
fn handle(self, incoming: Incoming<A>) {
112123
(self)(incoming)
113124
}
114125
}

0 commit comments

Comments
 (0)