Skip to content

Commit

Permalink
Changed methods on UDP sockets and TCP/UDP watchers to &mut self to r…
Browse files Browse the repository at this point in the history
…eflect that libuv may change the underlying handle.
  • Loading branch information
Eric Reed committed Jul 19, 2013
1 parent 0d04aa7 commit 968f7f5
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 64 deletions.
17 changes: 9 additions & 8 deletions src/libstd/rt/io/net/udp.rs
Expand Up @@ -30,7 +30,7 @@ impl UdpSocket {
}
}

pub fn recvfrom(&self, buf: &mut [u8]) -> Option<(uint, IpAddr)> {
pub fn recvfrom(&mut self, buf: &mut [u8]) -> Option<(uint, IpAddr)> {
match (**self).recvfrom(buf) {
Ok((nread, src)) => Some((nread, src)),
Err(ioerr) => {
Expand All @@ -43,7 +43,7 @@ impl UdpSocket {
}
}

pub fn sendto(&self, buf: &[u8], dst: IpAddr) {
pub fn sendto(&mut self, buf: &[u8], dst: IpAddr) {
match (**self).sendto(buf, dst) {
Ok(_) => (),
Err(ioerr) => io_error::cond.raise(ioerr),
Expand All @@ -61,16 +61,17 @@ pub struct UdpStream {
}

impl UdpStream {
pub fn as_socket<T>(&self, f: &fn(&UdpSocket) -> T) -> T { f(&self.socket) }
pub fn as_socket<T>(&mut self, f: &fn(&mut UdpSocket) -> T) -> T { f(&mut self.socket) }

pub fn disconnect(self) -> UdpSocket { self.socket }
}

impl Reader for UdpStream {
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
let peer = self.connectedTo;
do self.as_socket |sock| {
match sock.recvfrom(buf) {
Some((_nread, src)) if src != self.connectedTo => Some(0),
Some((_nread, src)) if src != peer => Some(0),
Some((nread, _src)) => Some(nread),
None => None,
}
Expand Down Expand Up @@ -122,7 +123,7 @@ mod test {

do spawntask_immediately {
match UdpSocket::bind(server_ip) {
Some(server) => {
Some(ref mut server) => {
let mut buf = [0];
match server.recvfrom(buf) {
Some((nread, src)) => {
Expand All @@ -139,7 +140,7 @@ mod test {

do spawntask_immediately {
match UdpSocket::bind(client_ip) {
Some(client) => client.sendto([99], server_ip),
Some(ref mut client) => client.sendto([99], server_ip),
None => fail!()
}
}
Expand All @@ -154,7 +155,7 @@ mod test {

do spawntask_immediately {
match UdpSocket::bind(server_ip) {
Some(server) => {
Some(ref mut server) => {
let mut buf = [0];
match server.recvfrom(buf) {
Some((nread, src)) => {
Expand All @@ -171,7 +172,7 @@ mod test {

do spawntask_immediately {
match UdpSocket::bind(client_ip) {
Some(client) => client.sendto([99], server_ip),
Some(ref mut client) => client.sendto([99], server_ip),
None => fail!()
}
}
Expand Down
40 changes: 20 additions & 20 deletions src/libstd/rt/rtio.rs
Expand Up @@ -50,37 +50,37 @@ pub trait IoFactory {

pub trait RtioTcpListener : RtioSocket {
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>;
fn accept_simultaneously(&self);
fn dont_accept_simultaneously(&self);
fn accept_simultaneously(&mut self);
fn dont_accept_simultaneously(&mut self);
}

pub trait RtioTcpStream : RtioSocket {
fn read(&self, buf: &mut [u8]) -> Result<uint, IoError>;
fn write(&self, buf: &[u8]) -> Result<(), IoError>;
fn peer_name(&self) -> IpAddr;
fn control_congestion(&self);
fn nodelay(&self);
fn keepalive(&self, delay_in_seconds: uint);
fn letdie(&self);
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
fn peer_name(&mut self) -> IpAddr;
fn control_congestion(&mut self);
fn nodelay(&mut self);
fn keepalive(&mut self, delay_in_seconds: uint);
fn letdie(&mut self);
}

pub trait RtioSocket {
fn socket_name(&self) -> IpAddr;
fn socket_name(&mut self) -> IpAddr;
}

pub trait RtioUdpSocket : RtioSocket {
fn recvfrom(&self, buf: &mut [u8]) -> Result<(uint, IpAddr), IoError>;
fn sendto(&self, buf: &[u8], dst: IpAddr) -> Result<(), IoError>;
fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, IpAddr), IoError>;
fn sendto(&mut self, buf: &[u8], dst: IpAddr) -> Result<(), IoError>;

fn join_multicast(&self, multi: IpAddr);
fn leave_multicast(&self, multi: IpAddr);
fn join_multicast(&mut self, multi: IpAddr);
fn leave_multicast(&mut self, multi: IpAddr);

fn loop_multicast_locally(&self);
fn dont_loop_multicast_locally(&self);
fn loop_multicast_locally(&mut self);
fn dont_loop_multicast_locally(&mut self);

fn multicast_time_to_live(&self, ttl: int);
fn time_to_live(&self, ttl: int);
fn multicast_time_to_live(&mut self, ttl: int);
fn time_to_live(&mut self, ttl: int);

fn hear_broadcasts(&self);
fn ignore_broadcasts(&self);
fn hear_broadcasts(&mut self);
fn ignore_broadcasts(&mut self);
}
72 changes: 36 additions & 36 deletions src/libstd/rt/uv/uvio.rs
Expand Up @@ -317,7 +317,7 @@ impl Drop for UvTcpListener {

impl RtioSocket for UvTcpListener {
// XXX implement
fn socket_name(&self) -> IpAddr { fail!(); }
fn socket_name(&mut self) -> IpAddr { fail!(); }
}

impl RtioTcpListener for UvTcpListener {
Expand Down Expand Up @@ -357,8 +357,8 @@ impl RtioTcpListener for UvTcpListener {
}

// XXX implement
fn accept_simultaneously(&self) { fail!(); }
fn dont_accept_simultaneously(&self) { fail!(); }
fn accept_simultaneously(&mut self) { fail!(); }
fn dont_accept_simultaneously(&mut self) { fail!(); }
}

// FIXME #6090: Prefer newtype structs but Drop doesn't work
Expand All @@ -380,11 +380,11 @@ impl Drop for UvTcpStream {

impl RtioSocket for UvTcpStream {
// XXX implement
fn socket_name(&self) -> IpAddr { fail!(); }
fn socket_name(&mut self) -> IpAddr { fail!(); }
}

impl RtioTcpStream for UvTcpStream {
fn read(&self, buf: &mut [u8]) -> Result<uint, IoError> {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;

Expand Down Expand Up @@ -427,7 +427,7 @@ impl RtioTcpStream for UvTcpStream {
return result_cell.take();
}

fn write(&self, buf: &[u8]) -> Result<(), IoError> {
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
Expand Down Expand Up @@ -456,11 +456,11 @@ impl RtioTcpStream for UvTcpStream {
}

// XXX implement
fn peer_name(&self) -> IpAddr { fail!(); }
fn control_congestion(&self) { fail!(); }
fn nodelay(&self) { fail!(); }
fn keepalive(&self, _delay_in_seconds: uint) { fail!(); }
fn letdie(&self) { fail!(); }
fn peer_name(&mut self) -> IpAddr { fail!(); }
fn control_congestion(&mut self) { fail!(); }
fn nodelay(&mut self) { fail!(); }
fn keepalive(&mut self, _delay_in_seconds: uint) { fail!(); }
fn letdie(&mut self) { fail!(); }
}

pub struct UvUdpSocket(UdpWatcher);
Expand All @@ -481,11 +481,11 @@ impl Drop for UvUdpSocket {

impl RtioSocket for UvUdpSocket {
// XXX implement
fn socket_name(&self) -> IpAddr { fail!(); }
fn socket_name(&mut self) -> IpAddr { fail!(); }
}

impl RtioUdpSocket for UvUdpSocket {
fn recvfrom(&self, buf: &mut [u8]) -> Result<(uint, IpAddr), IoError> {
fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, IpAddr), IoError> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(uint, IpAddr), IoError>> = &result_cell;

Expand Down Expand Up @@ -521,7 +521,7 @@ impl RtioUdpSocket for UvUdpSocket {
return result_cell.take();
}

fn sendto(&self, buf: &[u8], dst: IpAddr) -> Result<(), IoError> {
fn sendto(&mut self, buf: &[u8], dst: IpAddr) -> Result<(), IoError> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
Expand Down Expand Up @@ -549,17 +549,17 @@ impl RtioUdpSocket for UvUdpSocket {
}

// XXX implement
fn join_multicast(&self, _multi: IpAddr) { fail!(); }
fn leave_multicast(&self, _multi: IpAddr) { fail!(); }
fn join_multicast(&mut self, _multi: IpAddr) { fail!(); }
fn leave_multicast(&mut self, _multi: IpAddr) { fail!(); }

fn loop_multicast_locally(&self) { fail!(); }
fn dont_loop_multicast_locally(&self) { fail!(); }
fn loop_multicast_locally(&mut self) { fail!(); }
fn dont_loop_multicast_locally(&mut self) { fail!(); }

fn multicast_time_to_live(&self, _ttl: int) { fail!(); }
fn time_to_live(&self, _ttl: int) { fail!(); }
fn multicast_time_to_live(&mut self, _ttl: int) { fail!(); }
fn time_to_live(&mut self, _ttl: int) { fail!(); }

fn hear_broadcasts(&self) { fail!(); }
fn ignore_broadcasts(&self) { fail!(); }
fn hear_broadcasts(&mut self) { fail!(); }
fn ignore_broadcasts(&mut self) { fail!(); }
}

#[test]
Expand Down Expand Up @@ -596,7 +596,7 @@ fn test_simple_tcp_server_and_client() {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut listener = (*io).tcp_bind(addr).unwrap();
let stream = listener.accept().unwrap();
let mut stream = listener.accept().unwrap();
let mut buf = [0, .. 2048];
let nread = stream.read(buf).unwrap();
assert_eq!(nread, 8);
Expand All @@ -610,7 +610,7 @@ fn test_simple_tcp_server_and_client() {
do spawntask_immediately {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let stream = (*io).tcp_connect(addr).unwrap();
let mut stream = (*io).tcp_connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
}
}
Expand All @@ -626,7 +626,7 @@ fn test_simple_udp_server_and_client() {
do spawntask_immediately {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let server_socket = (*io).udp_bind(server_addr).unwrap();
let mut server_socket = (*io).udp_bind(server_addr).unwrap();
let mut buf = [0, .. 2048];
let (nread,src) = server_socket.recvfrom(buf).unwrap();
assert_eq!(nread, 8);
Expand All @@ -641,7 +641,7 @@ fn test_simple_udp_server_and_client() {
do spawntask_immediately {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let client_socket = (*io).udp_bind(client_addr).unwrap();
let mut client_socket = (*io).udp_bind(client_addr).unwrap();
client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
}
}
Expand All @@ -656,7 +656,7 @@ fn test_read_and_block() {
do spawntask_immediately {
let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
let stream = listener.accept().unwrap();
let mut stream = listener.accept().unwrap();
let mut buf = [0, .. 2048];

let expected = 32;
Expand Down Expand Up @@ -689,7 +689,7 @@ fn test_read_and_block() {
do spawntask_immediately {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let stream = (*io).tcp_connect(addr).unwrap();
let mut stream = (*io).tcp_connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
Expand All @@ -710,7 +710,7 @@ fn test_read_read_read() {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut listener = (*io).tcp_bind(addr).unwrap();
let stream = listener.accept().unwrap();
let mut stream = listener.accept().unwrap();
let buf = [1, .. 2048];
let mut total_bytes_written = 0;
while total_bytes_written < MAX {
Expand All @@ -723,7 +723,7 @@ fn test_read_read_read() {
do spawntask_immediately {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let stream = (*io).tcp_connect(addr).unwrap();
let mut stream = (*io).tcp_connect(addr).unwrap();
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;
while total_bytes_read < MAX {
Expand All @@ -749,7 +749,7 @@ fn test_udp_twice() {
do spawntask_immediately {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let client = (*io).udp_bind(client_addr).unwrap();
let mut client = (*io).udp_bind(client_addr).unwrap();
assert!(client.sendto([1], server_addr).is_ok());
assert!(client.sendto([2], server_addr).is_ok());
}
Expand All @@ -758,7 +758,7 @@ fn test_udp_twice() {
do spawntask_immediately {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let server = (*io).udp_bind(server_addr).unwrap();
let mut server = (*io).udp_bind(server_addr).unwrap();
let mut buf1 = [0];
let mut buf2 = [0];
let (nread1, src1) = server.recvfrom(buf1).unwrap();
Expand Down Expand Up @@ -786,8 +786,8 @@ fn test_udp_many_read() {
do spawntask_immediately {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let server_out = (*io).udp_bind(server_out_addr).unwrap();
let server_in = (*io).udp_bind(server_in_addr).unwrap();
let mut server_out = (*io).udp_bind(server_out_addr).unwrap();
let mut server_in = (*io).udp_bind(server_in_addr).unwrap();
let msg = [1, .. 2048];
let mut total_bytes_sent = 0;
let mut buf = [1];
Expand All @@ -809,8 +809,8 @@ fn test_udp_many_read() {
do spawntask_immediately {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let client_out = (*io).udp_bind(client_out_addr).unwrap();
let client_in = (*io).udp_bind(client_in_addr).unwrap();
let mut client_out = (*io).udp_bind(client_out_addr).unwrap();
let mut client_in = (*io).udp_bind(client_in_addr).unwrap();
let mut total_bytes_recv = 0;
let mut buf = [0, .. 2048];
while total_bytes_recv < MAX {
Expand Down

0 comments on commit 968f7f5

Please sign in to comment.