Skip to content

Commit

Permalink
Merge branch 'ring-in-ring'
Browse files Browse the repository at this point in the history
* ring-in-ring:
  Conditionalize the test out for OS X
  Remove debug chunder
  Make a test that works in Linux (and fails on OS X, wow)
  Correctly initialize backing_buf
  This typechecks, but misbehaves
  First step to being able to store a ring in a ring
  • Loading branch information
antifuchs committed May 1, 2016
2 parents 89179cf + 0151bf7 commit 2cf5a69
Show file tree
Hide file tree
Showing 4 changed files with 291 additions and 143 deletions.
155 changes: 120 additions & 35 deletions file-descriptor-fun/src/ring.rs
Expand Up @@ -4,16 +4,21 @@ use nix::sys::uio::IoVec;
use nix::unistd;
use std::result;
use std::fmt;
use std::num;
use std::str;
use std::str::FromStr;

use std::os::unix::io::RawFd;

// OS X doesn't let us go beyond 256kB for the buffer size, so this is the max:
const SEND_BUF_SIZE: usize = 900 * 1024;
const SEND_BUF_SIZE: usize = 100 * 1024;

/// A ring buffer containing file descriptors.
///
/// You can stuff FDs in with the [`add`](#method.add) method, and
/// iterate over them one by one using the iterator structure returned
/// by [`iter`](#method.iter).
#[derive(Clone)]
pub struct Ring {
read: RawFd,
write: RawFd,
Expand All @@ -24,7 +29,7 @@ pub struct Ring {

impl fmt::Display for Ring {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "#<Ring containing {} fds>", self.count)
write!(f, "#<Ring containing {} entries>", self.count)
}
}

Expand All @@ -41,7 +46,10 @@ impl Drop for Ring {
#[derive(Copy, PartialEq, Eq, Clone, Debug)]
pub enum ProtocolError {
/// Expected to receive an FD, but did not get one
NoFDReceived,
NoFDReceived(u64),

/// Something approximating a Ring was sent over the socket, but the number format didn't parse
RingFormatError,

/// Expected one FD, got more
TooManyFDsReceived,
Expand Down Expand Up @@ -73,6 +81,18 @@ impl From<nix::Error> for Error {
}
}

impl From<num::ParseIntError> for Error {
fn from(_: num::ParseIntError) -> Error {
Error::Protocol(ProtocolError::RingFormatError)
}
}

impl From<str::Utf8Error> for Error {
fn from(_: str::Utf8Error) -> Error {
Error::Protocol(ProtocolError::RingFormatError)
}
}

/// A specialized Result type for fd Ring buffer operations.
pub type Result<T> = result::Result<T, Error>;

Expand All @@ -91,71 +111,134 @@ pub fn new() -> Result<Ring> {
});
}

impl Ring {
#[derive(Clone)]
pub enum StashableThing<'a> {
One(RawFd),
Pair(&'a Ring),
}

#[derive(Clone)]
pub enum StashedThing {
One(RawFd),
Pair(Ring)
}

impl<'a> From<RawFd> for StashableThing<'a> {
fn from(fd: RawFd) -> StashableThing<'a> {
StashableThing::One(fd)
}
}

impl<'a> From<&'a Ring> for StashableThing<'a> {
fn from(ring: &'a Ring) -> StashableThing<'a> {
StashableThing::Pair(ring)
}
}

impl<'a> Ring {
/// Adds an FD to a Ring, updating the count of contained FDs.
/// Closing the FD to free up resources is left to the caller.
///
/// # Errors
/// * [`Bad(nix::Error)`](enum.Error.html#variant.Bad) - if any unforeseen condition occurs
/// * [`Limit(nix::Error)`](enum.Error.html#variant.Limit) - if
/// the socket would block or any other limit runs over.
pub fn add(&mut self, fd: RawFd) -> Result<()> {
try!(self.insert(fd));
self.count += 1;
pub fn add(&mut self, thing: &StashableThing) -> Result<()> {
let n = try!(self.insert(thing));
self.count += n;
Ok(())
}

/// (internal) Add an FD to the ring, sending it down the `.write` end.
fn insert(&self, fd: RawFd) -> Result<()> {
let buf = vec![IoVec::from_slice("!".as_bytes())];

let fds = vec![fd];
/// (internal) Add an FD to the ring, sending it down the `.write`
/// end, and returns the number of entries made
fn insert(&self, thing: &StashableThing) -> Result<u64> {
let mut msg = String::from("");
let mut fds: Vec<RawFd> = vec![];
let mut buf: Vec<IoVec<&[u8]>> = vec![];
match thing {
&StashableThing::One(fd) => {
msg.push('!');
fds.push(fd);
}
&StashableThing::Pair(ring) => {
msg.push_str(format!("{}", ring.count).as_str());
fds.push(ring.read);
fds.push(ring.write);
}
}
buf.push(IoVec::from_slice(msg.as_bytes()));
let cmsgs = vec![socket::ControlMessage::ScmRights(fds.as_slice())];
try!(socket::sendmsg(self.write,
&buf.as_slice(),
cmsgs.as_slice(),
socket::MsgFlags::empty(),
&buf.as_slice(),
cmsgs.as_slice(),
socket::MsgFlags::empty(),
None));
Ok(())
Ok(1)
}

/// Removes and returns the head of the fd ring, updating count.
pub fn pop(&mut self) -> Result<RawFd> {
let fd = try!(self.remove());
pub fn pop(&mut self) -> Result<StashedThing> {
let thing = try!(self.remove());
self.count -= 1;
Ok(fd)
Ok(thing)
}

/// (internal) Removes and returns the head of the ring from `.read`.
fn remove(&self) -> Result<RawFd> {
let mut backing_buf = vec![0];
let mut buf = vec![IoVec::from_mut_slice(&mut backing_buf)];
fn remove(&self) -> Result<StashedThing> {
// I assume we have no more than a 10^1023 FDs in there, but haha.
let mut backing_buf: Vec<u8> = vec![0;1024];

// TODO: deal with the constant 15 here.
let mut cmsg: socket::CmsgSpace<([RawFd; 15])> = socket::CmsgSpace::new();
let iov = IoVec::from_mut_slice(backing_buf.as_mut_slice());
let mut iovs = vec![iov];
let msg = try!(socket::recvmsg(self.read,
&mut buf.as_mut_slice(),
&mut iovs.as_mut_slice(),
Some(&mut cmsg),
socket::MsgFlags::empty()));

let read_buffer: &[u8] = iovs[0].as_slice();
let read_bytes: &[u8] = &read_buffer[..msg.bytes];
match msg.cmsgs().next() {
Some(socket::ControlMessage::ScmRights(fd)) => {
Some(socket::ControlMessage::ScmRights(fds)) => {
// TODO: this could probably handle the case of multiple FDs via buffers
match fd.len() {
match fds.len() {
1 => {
Ok(fd[0])
let fd = fds[0];
let thing = StashedThing::One(fd);
Ok(thing)
}
0 => Err(Error::Protocol(ProtocolError::NoFDReceived)),
2 => {
let count_str = try!(str::from_utf8(read_bytes));
let count: u64 = try!(u64::from_str(count_str));
let ring = Ring{
read: fds[0],
write: fds[1],
count: count
};
Ok(StashedThing::Pair(ring))
}
0 => Err(Error::Protocol(ProtocolError::NoFDReceived(1))),
_ => Err(Error::Protocol(ProtocolError::TooManyFDsReceived)),
}
}
_ => Err(Error::Protocol(ProtocolError::NoFDReceived)),
Some(_) => { panic!("Received something other than ScmRights! Wat."); }
_ => Err(Error::Protocol(ProtocolError::NoFDReceived(2)))
}
}

fn next(&self) -> Result<RawFd> {
let fd = try!(self.remove());
try!(self.insert(fd));
Ok(fd)
fn next(&self) -> Result<StashedThing> {
let thing = try!(self.remove());
match thing {
StashedThing::One(fd) => {
try!(self.insert(&StashableThing::from(fd)));
Ok(StashedThing::One(fd))
}
StashedThing::Pair(ring) => {
try!(self.insert(&StashableThing::from(&ring)));
Ok(StashedThing::Pair(ring))
}
}
}

/// Returns an iterator on the FDs contained in the ring buffer
Expand All @@ -174,16 +257,18 @@ pub struct RingIter<'a> {
}

impl<'a> Iterator for RingIter<'a> {
type Item = RawFd;
type Item = StashedThing;

fn next(&mut self) -> Option<RawFd> {
fn next(&mut self) -> Option<StashedThing> {
self.offset += 1;
if self.offset > self.ring.count {
return None;
}
match self.ring.next() {
Ok(next_fd) => Some(next_fd),
Err(_) => None,
Err(e) => {
panic!("Oops, {:?} happened at offset {} of {}", e, self.offset, self.ring.count);
}
}
}
}
94 changes: 66 additions & 28 deletions file-descriptor-fun/tests/ring.rs
Expand Up @@ -14,43 +14,70 @@ fn it_can_create_a_ringbuffer() {
fn adding_to_ring_works() {
let mut ring = ring::new().unwrap();
let (one, two) = filedes::unix_socket_pair().unwrap();
ring.add(one).unwrap();
ring.add(&ring::StashableThing::from(one)).unwrap();
assert_eq!(1, ring.count);
ring.add(two).unwrap();
ring.add(&ring::StashableThing::from(two)).unwrap();
assert_eq!(2, ring.count);

let other_ring = ring::new().unwrap();
ring.add(&ring::StashableThing::from(&other_ring)).unwrap();
assert_eq!(3, ring.count);

let received = ring.pop().unwrap();
match received {
ring::StashedThing::One(_) => {
println!("Yay!");
}
_ => {
panic!("Huh!");
}
}
}

fn add_two_sockets_to_ring(ring: &mut ring::Ring) -> ring::Result<()> {
let (one, two) = try!(filedes::unix_socket_pair());
match ring.add(&ring::StashableThing::from(one)) {
Ok(()) => {
try!(nix::unistd::close(one));
}
Err(ring::Error::Limit(e)) => {
println!("I hit {}", e);
try!(nix::unistd::close(one));
try!(nix::unistd::close(two));
return Err(ring::Error::Limit(e));
}
Err(e) => {
return Err(e);
}
}
match ring.add(&ring::StashableThing::from(two)) {
Ok(()) => {
try!(nix::unistd::close(two));
Ok(())
},
Err(ring::Error::Limit(e)) => {
println!("I hit {}", e);
try!(nix::unistd::close(two));
return Err(ring::Error::Limit(e));
}
Err(e) => {
return Err(e);
}
}
}

#[test]
fn adding_many_to_a_ring_works() {
let mut ring = ring::new().unwrap();

loop {
let (one, two) = filedes::unix_socket_pair().unwrap();
match ring.add(one) {
Ok(()) => {
nix::unistd::close(one).unwrap();
}
match add_two_sockets_to_ring(&mut ring) {
Ok(()) => {}
Err(ring::Error::Limit(e)) => {
println!("I hit {}", e);
nix::unistd::close(one).unwrap();
break;
}
Err(e) => {
panic!(e);
}
}
match ring.add(two) {
Ok(()) => {
nix::unistd::close(two).unwrap();
},
Err(ring::Error::Limit(e)) => {
println!("I hit {}", e);
nix::unistd::close(two).unwrap();
break;
}
Err(e) => {
panic!(e);
}
Err(e) => { panic!(e); }
}
}
let mut additional_fds: Vec<RawFd> = vec!();
Expand Down Expand Up @@ -83,15 +110,26 @@ fn adding_many_to_a_ring_works() {
let mut closed = 0;

println!("Closing the stashed FDs now...");
for fd in ring.iter() {
for thing in ring.iter() {
closed += 1;
nix::unistd::close(fd).unwrap();
match thing {
ring::StashedThing::One(fd) => {
nix::unistd::close(fd).unwrap();
}
ring::StashedThing::Pair(_) => {}
}
}
assert_eq!(should_close, closed);

println!("Closing the stashed FDs a second time, properly...");
while ring.count > 0 {
let fd = ring.pop().unwrap();
nix::unistd::close(fd).unwrap();
let thing = ring.pop().unwrap();
match thing {
ring::StashedThing::One(fd) => {
nix::unistd::close(fd).unwrap();
}
ring::StashedThing::Pair(_) => {
}
}
}
}

0 comments on commit 2cf5a69

Please sign in to comment.