Skip to content

Commit

Permalink
implement splice read/write
Browse files Browse the repository at this point in the history
  • Loading branch information
Mic92 committed Jan 2, 2018
1 parent 722be55 commit bb1c89f
Show file tree
Hide file tree
Showing 12 changed files with 418 additions and 39 deletions.
58 changes: 49 additions & 9 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
//! Raw communication channel to the FUSE kernel driver.

use std::io;
use sys;
use std::ffi::{CString, CStr, OsStr};
use std::os::unix::ffi::OsStrExt;
use std::path::{PathBuf, Path};
use libc::{self, c_int, c_void, size_t};
use libc::{self, c_int, c_void, size_t, F_SETPIPE_SZ, SPLICE_F_MOVE, SPLICE_F_NONBLOCK};
use std::os::unix::io::RawFd;
#[cfg(feature = "libfuse")]
use libfuse::{fuse_args, fuse_mount_compat25};
Expand All @@ -26,7 +27,17 @@ fn with_fuse_args<T, F: FnOnce(&fuse_args) -> T>(options: &[&OsStr], f: F) -> T
#[derive(Debug)]
pub struct Channel {
mountpoint: PathBuf,
fd: RawFd,
pub fd: RawFd,
pub read_pipe_fd: RawFd,
pub write_pipe_fd: RawFd,
}

fn create_pipe(buffer_size: usize) -> io::Result<[RawFd; 2]> {
let pipe = try!(sys::pipe());
if sys::DEFAULT_PIPE_SIZE != buffer_size {
try!(sys::fcntl(pipe[0], F_SETPIPE_SZ, buffer_size + 4096));
}
Ok(pipe)
}

impl Channel {
Expand All @@ -35,32 +46,45 @@ impl Channel {
/// the given path to the channel. If the channel is dropped, the path is
/// unmounted.
#[cfg(feature = "libfuse")]
pub fn new(mountpoint: &Path, options: &[&OsStr]) -> io::Result<Channel> {
pub fn new(mountpoint: &Path, options: &[&OsStr], buffer_size: usize) -> io::Result<Channel> {
let mountpoint = try!(mountpoint.canonicalize());
with_fuse_args(options, |args| {
let mnt = try!(CString::new(mountpoint.as_os_str().as_bytes()));
let pipe = try!(create_pipe(buffer_size));
let fd = unsafe { fuse_mount_compat25(mnt.as_ptr(), args) };
if fd < 0 {
unsafe {
libc::close(pipe[0]);
libc::close(pipe[1]);
};
Err(io::Error::last_os_error())
} else {
Ok(Channel { mountpoint: mountpoint, fd: fd })
Ok(Channel {
mountpoint: mountpoint,
fd: fd,
read_pipe_fd: pipe[0],
write_pipe_fd: pipe[1],
})
}
})
}

pub fn new_from_fd(fd: RawFd, mountpoint: &Path) -> Channel {
Channel {
pub fn new_from_fd(fd: RawFd, mountpoint: &Path, buffer_size: usize) -> io::Result<Channel> {
let pipe = try!(create_pipe(buffer_size));
Ok(Channel {
fd: fd,
mountpoint: mountpoint.to_path_buf(),
}
read_pipe_fd: pipe[0],
write_pipe_fd: pipe[1],
})
}

/// Return path of the mounted filesystem
pub fn mountpoint(&self) -> &Path {
&self.mountpoint
}

/// Receives data up to the capacity of the given buffer (can block).
/// receives data up to the capacity of the given buffer (can block).
pub fn receive(&self, buffer: &mut Vec<u8>) -> io::Result<()> {
let rc = unsafe { libc::read(self.fd, buffer.as_ptr() as *mut c_void, buffer.capacity() as size_t) };
if rc < 0 {
Expand All @@ -71,6 +95,11 @@ impl Channel {
}
}

/// receives data up to the capacity of the given buffer (can block).
pub fn receive_splice(&self, buffer_size: usize) -> io::Result<size_t> {
Ok(try!(sys::splice(self.fd, None, self.write_pipe_fd, None, buffer_size, 0)))
}

/// Returns a sender object for this channel. The sender object can be
/// used to send to the channel. Multiple sender objects can be used
/// and they can safely be sent to other threads.
Expand All @@ -88,7 +117,11 @@ impl Drop for Channel {
// TODO: send ioctl FUSEDEVIOCSETDAEMONDEAD on macOS before closing the fd
// Close the communication channel to the kernel driver
// (closing it before unmount prevents sync unmount deadlock)
unsafe { libc::close(self.fd); }
unsafe {
libc::close(self.fd);
libc::close(self.read_pipe_fd);
libc::close(self.write_pipe_fd);
}
// Unmount this channel's mount point
#[cfg(feature = "libfuse")]
let _ = unmount(&self.mountpoint);
Expand Down Expand Up @@ -121,6 +154,13 @@ impl ReplySender for ChannelSender {
error!("Failed to send FUSE reply: {}", err);
}
}

fn splice(&self, from: RawFd, size: usize) {
let res = sys::splice(from, None, self.fd, None, size, SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
if let Err(err) = res {
error!("Failed to splice FUSE reply: {}", err);
};
}
}

/// Unmount an arbitrary mount point
Expand Down
2 changes: 1 addition & 1 deletion src/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ pub mod consts {
}

#[repr(C)]
#[derive(Debug)]
#[derive(Debug,PartialEq)]
pub enum fuse_opcode {
FUSE_LOOKUP = 1,
FUSE_FORGET = 2, // no reply
Expand Down
12 changes: 7 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ use std::convert::AsRef;
use std::io;
use std::ffi::OsStr;
use std::path::Path;
use std::os::unix::io::RawFd;
use libc::{c_int, ENOSYS};
use time::Timespec;

pub use kernel::FUSE_ROOT_ID;
pub use kernel::consts;
pub use reply::{Reply, ReplyEmpty, ReplyData, ReplyEntry, ReplyAttr, ReplyOpen};
pub use reply::{ReplyWrite, ReplyStatfs, ReplyCreate, ReplyLock, ReplyBmap, ReplyDirectory, ReplyDirectoryPlus};
pub use reply::{ReplyXattr, ReplyIoctl, ReplyLseek};
pub use reply::{ReplyXattr, ReplyIoctl, ReplyLseek, ReplyRead};
#[cfg(target_os = "macos")]
pub use reply::ReplyXTimes;
pub use request::{Request, UtimeSpec};
Expand All @@ -37,6 +38,7 @@ mod libfuse;
mod reply;
mod request;
mod session;
mod sys;

/// File types
#[derive(Clone, Copy, Debug, Hash, PartialEq)]
Expand Down Expand Up @@ -204,7 +206,7 @@ pub trait Filesystem {
/// return value of the read system call will reflect the return value of this
/// operation. fh will contain the value set by the open method, or will be undefined
/// if the open method didn't set any value.
fn read(&mut self, _req: &Request, _ino: u64, _fh: u64, _offset: i64, _size: u32, reply: ReplyData) {
fn read(&mut self, _req: &Request, _ino: u64, _fh: u64, _offset: i64, _size: u32, reply: ReplyRead) {
reply.error(ENOSYS);
}

Expand All @@ -214,7 +216,7 @@ pub trait Filesystem {
/// which case the return value of the write system call will reflect the return
/// value of this operation. fh will contain the value set by the open method, or
/// will be undefined if the open method didn't set any value.
fn write(&mut self, _req: &Request, _ino: u64, _fh: u64, _offset: i64, _data: &[u8], _flags: u32, reply: ReplyWrite) {
fn write(&mut self, _req: &Request, _ino: u64, _fh: u64, _offset: i64, _source_file: Option<RawFd>, _data: &[u8], _size: u32, _flags: u32, reply: ReplyWrite) {
reply.error(ENOSYS);
}

Expand Down Expand Up @@ -412,7 +414,7 @@ pub trait Filesystem {
/// not return until the filesystem is unmounted.
#[cfg(feature = "libfuse")]
pub fn mount<FS: Filesystem, P: AsRef<Path>>(filesystem: FS, mountpoint: &P, options: &[&OsStr]) -> io::Result<()>{
Session::new(filesystem, mountpoint.as_ref(), options).and_then(|mut se| se.run())
Session::new(filesystem, mountpoint.as_ref(), options, false).and_then(|mut se| se.run())
}

/// Mount the given filesystem to the given mountpoint. This function spawns
Expand All @@ -422,5 +424,5 @@ pub fn mount<FS: Filesystem, P: AsRef<Path>>(filesystem: FS, mountpoint: &P, opt
/// be unmounted.
#[cfg(feature = "libfuse")]
pub unsafe fn spawn_mount<'a, FS: Filesystem+Send+'a, P: AsRef<Path>>(filesystem: FS, mountpoint: &P, options: &[&OsStr]) -> io::Result<BackgroundSession<'a>> {
Session::new(filesystem, mountpoint.as_ref(), options).and_then(|se| se.spawn())
Session::new(filesystem, mountpoint.as_ref(), options, false).and_then(|se| se.spawn())
}
83 changes: 82 additions & 1 deletion src/reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,24 @@ use std::ffi::OsStr;
use std::fmt;
use std::marker::PhantomData;
use std::os::unix::ffi::OsStrExt;
use libc::{c_int, S_IFIFO, S_IFCHR, S_IFBLK, S_IFDIR, S_IFREG, S_IFLNK, S_IFSOCK, EIO};
use std::os::unix::io::RawFd;
use libc::{self, c_int, size_t, c_void, S_IFIFO, S_IFCHR, S_IFBLK, S_IFDIR, S_IFREG, S_IFLNK, S_IFSOCK, EIO, SPLICE_F_MOVE, SPLICE_F_NONBLOCK};
use time::Timespec;
use kernel::{fuse_attr, fuse_kstatfs, fuse_file_lock, fuse_entry_out, fuse_attr_out, fuse_lseek_out};
use kernel::{fuse_open_out, fuse_write_out, fuse_statfs_out, fuse_lk_out, fuse_bmap_out, fuse_ioctl_out};
use kernel::fuse_getxattr_out;
#[cfg(target_os = "macos")]
use kernel::fuse_getxtimes_out;
use kernel::{fuse_out_header, fuse_dirent, fuse_direntplus};
use sys;
use {FileType, FileAttr};

/// Generic reply callback to send data
pub trait ReplySender: Send + 'static {
/// Send data.
fn send(&self, data: &[&[u8]]);

fn splice(&self, from: RawFd, size: usize);
}

impl fmt::Debug for Box<ReplySender> {
Expand Down Expand Up @@ -158,6 +162,11 @@ impl<T> ReplyRaw<T> {
});
}

fn splice(&mut self, from: RawFd, size: usize) {
let sender = self.sender.take().unwrap();
sender.splice(from, size);
}

/// Reply to a request with the given type
pub fn ok(mut self, data: &T) {
as_bytes(data, |bytes| {
Expand Down Expand Up @@ -206,6 +215,78 @@ impl ReplyEmpty {
}
}

///
/// Read reply
///
#[derive(Debug)]
pub struct ReplyRead {
reply: ReplyRaw<()>,
read_pipe_fd: RawFd,
write_pipe_fd: RawFd,
unique: u64
}

impl ReplyRead {
/// Create a new reply for a read request
pub fn new<S: ReplySender>(unique: u64, sender: S, read_pipe_fd: RawFd, write_pipe_fd: RawFd) -> ReplyRead {
ReplyRead { reply: Reply::new(unique, sender), read_pipe_fd: read_pipe_fd, write_pipe_fd: write_pipe_fd, unique: unique }
}

/// Reply to a request with the given data
pub fn data(mut self, data: &[u8]) {
self.reply.send(0, &[data]);
}

/// Reply to a request with a file descriptor
pub fn fd(mut self, fd: RawFd, mut offset: i64, size: u32) {
let mut header = fuse_out_header {
unique: self.unique,
len: (mem::size_of::<fuse_out_header>() as u32 + size) as u32,
error: 0,
};

let iovec = libc::iovec {
iov_base: &mut header as *mut fuse_out_header as *mut c_void,
iov_len: mem::size_of::<fuse_out_header>() as size_t,
};

if let Err(err) = sys::vmsplice(self.write_pipe_fd, &iovec, 1, SPLICE_F_NONBLOCK) {
return self.reply.error(err.raw_os_error().unwrap());
}

let res = sys::splice(fd,
Some(&mut offset),
self.write_pipe_fd,
None,
size as usize,
SPLICE_F_MOVE|SPLICE_F_NONBLOCK);

let actual_read = match res {
Err(err) => return self.reply.error(err.raw_os_error().unwrap()),
Ok(bytes) => bytes as usize,
};

// short read -> fallback to clear pipe in userspace
if (size as usize) != actual_read {
let mut out = vec![0; actual_read + mem::size_of::<fuse_out_header>()];
match sys::read(self.read_pipe_fd, &mut out[..(actual_read + mem::size_of::<fuse_out_header>())]) {
Ok(_) => {
self.data(&out[mem::size_of::<fuse_out_header>()..]);
return;
},
Err(err) => return self.reply.error(err.raw_os_error().unwrap()),
};
}

self.reply.splice(self.read_pipe_fd, size as usize + mem::size_of::<fuse_out_header>());
}

/// Reply to a request with the given error code
pub fn error(self, err: c_int) {
self.reply.error(err);
}
}

///
/// Data reply
///
Expand Down
Loading

0 comments on commit bb1c89f

Please sign in to comment.