Skip to content

Commit

Permalink
cleaned up uv/net
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric Reed committed Jun 26, 2013
1 parent 87ecfb7 commit ce97bd4
Showing 1 changed file with 32 additions and 68 deletions.
100 changes: 32 additions & 68 deletions src/libstd/rt/uv/net.rs
Expand Up @@ -44,8 +44,8 @@ pub fn ip4_as_uv_ip4<T>(addr: IpAddr, f: &fn(*sockaddr_in) -> T) -> T {
pub fn uv_ip4_to_ip4(addr: *sockaddr_in) -> IpAddr {
let ip4_size = 16;
let buf = vec::from_elem(ip4_size + 1 /*null terminated*/, 0u8);
unsafe { ip4_name(addr, vec::raw::to_ptr(buf), ip4_size as u64) };
let port = unsafe { ip4_port(addr) };
unsafe { uvll::ip4_name(addr, vec::raw::to_ptr(buf), ip4_size as u64) };
let port = unsafe { uvll::ip4_port(addr) };
let ip_str = str::from_bytes_slice(buf).trim_right_chars(&'\x00');
let ip: ~[u8] = ip_str.split_iter('.')
.transform(|s: &str| -> u8 {
Expand All @@ -71,22 +71,19 @@ impl StreamWatcher {
data.read_cb = Some(cb);
}

let handle = self.native_handle();
unsafe { uvll::read_start(handle, alloc_cb, read_cb); }
unsafe { uvll::read_start(self.native_handle(), alloc_cb, read_cb); }

extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
let data = stream_watcher.get_watcher_data();
let alloc_cb = data.alloc_cb.get_ref();
let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
return (*alloc_cb)(suggested_size as uint);
}

extern fn read_cb(stream: *uvll::uv_stream_t, nread: ssize_t, buf: Buf) {
rtdebug!("buf addr: %x", buf.base as uint);
rtdebug!("buf len: %d", buf.len as int);
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
let data = stream_watcher.get_watcher_data();
let cb = data.read_cb.get_ref();
let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
let status = status_to_maybe_uv_error(stream, nread as c_int);
(*cb)(stream_watcher, nread as int, buf, status);
}
Expand All @@ -108,22 +105,15 @@ impl StreamWatcher {
}

let req = WriteRequest::new();
let bufs = [buf];
unsafe {
assert!(0 == uvll::write(req.native_handle(),
self.native_handle(),
bufs, write_cb));
assert_eq!(0, uvll::write(req.native_handle(), self.native_handle(), [buf], write_cb));
}

extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
let write_request: WriteRequest = NativeHandle::from_native_handle(req);
let mut stream_watcher = write_request.stream();
write_request.delete();
let cb = {
let data = stream_watcher.get_watcher_data();
let cb = data.write_cb.swap_unwrap();
cb
};
let cb = stream_watcher.get_watcher_data().write_cb.swap_unwrap();
let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
cb(stream_watcher, status);
}
Expand All @@ -132,9 +122,7 @@ impl StreamWatcher {
pub fn accept(&mut self, stream: StreamWatcher) {
let self_handle = self.native_handle() as *c_void;
let stream_handle = stream.native_handle() as *c_void;
unsafe {
assert_eq!(0, uvll::accept(self_handle, stream_handle));
}
assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } );
}

pub fn close(self, cb: NullCallback) {
Expand All @@ -149,19 +137,15 @@ impl StreamWatcher {

extern fn close_cb(handle: *uvll::uv_stream_t) {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
{
let data = stream_watcher.get_watcher_data();
data.close_cb.swap_unwrap()();
}
stream_watcher.get_watcher_data().close_cb.swap_unwrap()();
stream_watcher.drop_watcher_data();
unsafe { free_handle(handle as *c_void) }
}
}
}

impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
fn from_native_handle(
handle: *uvll::uv_stream_t) -> StreamWatcher {
fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
StreamWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_stream_t {
Expand All @@ -188,9 +172,7 @@ impl TcpWatcher {
match address {
Ipv4(*) => {
do ip4_as_uv_ip4(address) |addr| {
let result = unsafe {
uvll::tcp_bind(self.native_handle(), addr)
};
let result = unsafe { uvll::tcp_bind(self.native_handle(), addr) };
if result == 0 {
Ok(())
} else {
Expand All @@ -212,9 +194,9 @@ impl TcpWatcher {
Ipv4(*) => {
do ip4_as_uv_ip4(address) |addr| {
rtdebug!("connect_t: %x", connect_handle as uint);
assert!(0 == uvll::tcp_connect(connect_handle,
self.native_handle(),
addr, connect_cb));
assert_eq!(0,
uvll::tcp_connect(connect_handle, self.native_handle(),
addr, connect_cb));
}
}
_ => fail!()
Expand All @@ -225,10 +207,7 @@ impl TcpWatcher {
let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
let mut stream_watcher = connect_request.stream();
connect_request.delete();
let cb: ConnectionCallback = {
let data = stream_watcher.get_watcher_data();
data.connect_cb.swap_unwrap()
};
let cb = stream_watcher.get_watcher_data().connect_cb.swap_unwrap();
let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
cb(stream_watcher, status);
}
Expand All @@ -245,15 +224,13 @@ impl TcpWatcher {
unsafe {
static BACKLOG: c_int = 128; // XXX should be configurable
// XXX: This can probably fail
assert!(0 == uvll::listen(self.native_handle(),
BACKLOG, connection_cb));
assert_eq!(0, uvll::listen(self.native_handle(), BACKLOG, connection_cb));
}

extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
rtdebug!("connection_cb");
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
let data = stream_watcher.get_watcher_data();
let cb = data.connect_cb.get_ref();
let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
let status = status_to_maybe_uv_error(handle, status);
(*cb)(stream_watcher, status);
}
Expand Down Expand Up @@ -314,8 +291,7 @@ impl UdpWatcher {
data.udp_recv_cb = Some(cb);
}

let handle = self.native_handle();
unsafe { uvll::udp_recv_start(handle, alloc_cb, recv_cb); }
unsafe { uvll::udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }

extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
Expand All @@ -331,17 +307,14 @@ impl UdpWatcher {
rtdebug!("buf addr: %x", buf.base as uint);
rtdebug!("buf len: %d", buf.len as int);
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
let data = udp_watcher.get_watcher_data();
let cb = data.udp_recv_cb.get_ref();
let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
let status = status_to_maybe_uv_error(handle, nread as c_int);
let address = uv_ip4_to_ip4(addr);
(*cb)(udp_watcher, nread as int, buf, address, flags as uint, status);
(*cb)(udp_watcher, nread as int, buf, uv_ip4_to_ip4(addr), flags as uint, status);
}
}

pub fn recv_stop(&self) {
let handle = self.native_handle();
unsafe { uvll::udp_recv_stop(handle); }
unsafe { uvll::udp_recv_stop(self.native_handle()); }
}

pub fn send(&self, buf: Buf, address: IpAddr, cb: UdpSendCallback) {
Expand All @@ -357,7 +330,7 @@ impl UdpWatcher {
Ipv4(*) => {
do ip4_as_uv_ip4(address) |addr| {
unsafe {
assert!(0 == uvll::udp_send(req.native_handle(),
assert_eq!(0, uvll::udp_send(req.native_handle(),
self.native_handle(),
[buf], addr, send_cb));
}
Expand Down Expand Up @@ -411,12 +384,9 @@ impl Request for ConnectRequest { }
impl ConnectRequest {

fn new() -> ConnectRequest {
let connect_handle = unsafe {
malloc_req(UV_CONNECT)
};
let connect_handle = unsafe { malloc_req(UV_CONNECT) };
assert!(connect_handle.is_not_null());
let connect_handle = connect_handle as *uvll::uv_connect_t;
ConnectRequest(connect_handle)
ConnectRequest(connect_handle as *uvll::uv_connect_t)
}

fn stream(&self) -> StreamWatcher {
Expand All @@ -432,8 +402,7 @@ impl ConnectRequest {
}

impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
fn from_native_handle(
handle: *uvll:: uv_connect_t) -> ConnectRequest {
fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
ConnectRequest(handle)
}
fn native_handle(&self) -> *uvll::uv_connect_t {
Expand All @@ -447,12 +416,9 @@ impl Request for WriteRequest { }

impl WriteRequest {
pub fn new() -> WriteRequest {
let write_handle = unsafe {
malloc_req(UV_WRITE)
};
let write_handle = unsafe { malloc_req(UV_WRITE) };
assert!(write_handle.is_not_null());
let write_handle = write_handle as *uvll::uv_write_t;
WriteRequest(write_handle)
WriteRequest(write_handle as *uvll::uv_write_t)
}

pub fn stream(&self) -> StreamWatcher {
Expand Down Expand Up @@ -483,16 +449,14 @@ impl UdpSendRequest {
pub fn new() -> UdpSendRequest {
let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
assert!(send_handle.is_not_null());
let send_handle = send_handle as *uvll::uv_udp_send_t;
UdpSendRequest(send_handle)
UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
}

pub fn handle(&self) -> UdpWatcher {
unsafe {
NativeHandle::from_native_handle(
uvll::get_udp_handle_from_send_req(
self.native_handle()))
}
let send_request_handle = unsafe {
uvll::get_udp_handle_from_send_req(self.native_handle())
};
NativeHandle::from_native_handle(send_request_handle)
}

pub fn delete(self) {
Expand Down

0 comments on commit ce97bd4

Please sign in to comment.