New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async wip #12

Open
wants to merge 1 commit into
base: master
from
Open

Async wip #12

Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.
+240 −9
Diff settings

Always

Just for now

Copy path View file
@@ -14,6 +14,10 @@ edition = '2018'


[dependencies]
futures = "0.1.25"
libc = "~0.2"
libnfs-sys = "~0.2"
log = "~0.4"
mio = "~0.6"
nix = "~0.11"
tokio = "~0.1"
Copy path View file
@@ -6,10 +6,19 @@ use nix::{fcntl::OFlag, sys::stat::Mode};

fn main() -> Result<()> {
let mut nfs = Nfs::new()?;
nfs.set_uid(1000)?;
nfs.set_gid(1000)?;
//nfs.set_uid(0)?;
//nfs.set_gid(0)?;
nfs.set_debug(9)?;
nfs.mount("0.0.0.0", "/srv/nfs")?;
println!("mounting");
nfs.mount("192.168.122.78", "/var/nfs")?;
nfs.stat64_async(&Path::new("foo"), |result|{
println!("async stat result: {:?}", result);

// Pass it on
result
})?;
nfs.run_async()?;
/*
let dir = nfs.opendir(&Path::new("/"))?;
for f in dir {
@@ -29,5 +38,6 @@ fn main() -> Result<()> {
let file = nfs.open(&Path::new("/rust"), OFlag::O_RDONLY)?;
let buff = file.read(1024)?;
println!("read file: {}", String::from_utf8_lossy(&buff));
*/
Ok(())
}
Copy path View file
@@ -3,19 +3,27 @@
//! version=4 or programatically calling nfs_set_version(nfs, NFS_V4) before
//! connecting to the server/share.
//!
use futures::future::Future;
use futures::unsync::oneshot;
use libnfs_sys::*;
use log::error;
use mio::event::Evented;
use mio::unix::EventedFd;
use nix::fcntl::OFlag;
use nix::poll::poll;
use nix::poll::{EventFlags, PollFd};
use nix::sys::stat::Mode;
use tokio::prelude::*;

use std::ffi::{CStr, CString};
use std::ffi::{c_void, CStr, CString};
use std::io::{Error, ErrorKind, Result};
use std::mem::zeroed;
use std::mem::{transmute, zeroed};
use std::os::unix::ffi::OsStrExt;
use std::path::{Path, PathBuf};
use std::ptr;
use std::rc::Rc;

#[derive(Clone)]
#[derive(Clone, Debug)]
struct NfsPtr(*mut nfs_context);

impl Drop for NfsPtr {
@@ -48,7 +56,95 @@ fn check_retcode(ctx: *mut nfs_context, code: i32) -> Result<()> {
}
}

#[derive(Clone)]
#[no_mangle]
pub extern "C" fn nfs_mount_callback(
result: i32,
context: *mut nfs_context,
// Data is null for mount
_data: *mut c_void,
private_data: *mut c_void,
) {
println!("Callback called");
unsafe {
// box the callback function
let callback: Box<fn(Result<()>) -> Result<()>> =
Box::from_raw(private_data as *mut fn(Result<()>) -> Result<()>);
// Check the async result from libnfs
let result: Result<()> = if result < 0 {
let err_str = nfs_get_error(context);
let e = CStr::from_ptr(err_str).to_string_lossy().into_owned();
Err(Error::new(ErrorKind::Other, e))
} else {
Ok(())
};
if let Err(e) = callback(result) {
error!("nfs_mount callback error: {:?}", e);
};
}
}

#[no_mangle]
pub extern "C" fn nfs_open_callback(
result: i32,
context: *mut nfs_context,
// data is an *mut nfsfh
data: *mut c_void,
private_data: *mut c_void,
) {
println!("open callback called");
unsafe {
// box the callback function
let callback: Box<fn(Result<NfsFile>) -> Result<NfsFile>> =
Box::from_raw(private_data as *mut fn(Result<NfsFile>) -> Result<NfsFile>);
// Check the async result from libnfs
let result: Result<NfsFile> = if result < 0 {
let err_str = nfs_get_error(context);
let e = CStr::from_ptr(err_str).to_string_lossy().into_owned();
Err(Error::new(ErrorKind::Other, e))
} else {
let nfsfh = NfsFile {
nfs: Rc::new(NfsPtr(context)),
handle: data as *mut nfsfh,
};
Ok(nfsfh)
};
if let Err(e) = callback(result) {
error!("nfs_mount callback error: {:?}", e);
};
}
}

#[no_mangle]
pub extern "C" fn stat64_callback(
result: i32,
context: *mut nfs_context,
// data is an *mut nfs_stat_64
data: *mut c_void,
private_data: *mut c_void,
) {
unsafe {
// box the callback function
println!("callback called with private_data: {:p}", &private_data);
let callback: &mut Box<FnMut(Result<*mut nfs_stat_64>) -> Result<()>> = transmute(private_data);
println!("boxed callback: {:p}", &callback);
// Check the async result from libnfs
let result: Result<*mut nfs_stat_64> = if result < 0 {
let err_str = nfs_get_error(context);
let e = CStr::from_ptr(err_str).to_string_lossy().into_owned();
Err(Error::new(ErrorKind::Other, e))
} else {
let stat = data as *mut nfs_stat_64;
Ok(stat)
};
println!("Calling user callback with: {:?}", result);
if let Err(e) = callback(result) {
error!("nfs_mount callback error: {:?}", e);
};
println!("Done");
}
}

#[derive(Clone, Debug)]
pub struct Nfs {
context: Rc<NfsPtr>,
}
@@ -105,7 +201,7 @@ pub struct DirEntry {
pub ctime_nsec: u32,
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct NfsDirectory {
nfs: Rc<NfsPtr>,
handle: *mut nfsdir,
@@ -121,7 +217,7 @@ impl Drop for NfsDirectory {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct NfsFile {
nfs: Rc<NfsPtr>,
handle: *mut nfsfh,
@@ -332,6 +428,32 @@ impl Nfs {
}
}

/// Accepts an optional callback function pointer
pub fn mount_async<F>(&self, server: &str, export_name: &str, callback: F) -> Result<()>
where
F: FnOnce(Result<()>) -> Result<()> + 'static,
// This should accept a Nfs and return a Future
// how does the async chain get created?
// mount -> stat -> open -> etc etc etc all needs to be chained
{
let server = CString::new(server.as_bytes()).unwrap();
let export = CString::new(export_name.as_bytes()).unwrap();
let callback_ptr = Box::into_raw(Box::new(callback));
unsafe {
check_retcode(
self.context.0,
nfs_mount_async(
self.context.0,
server.as_ptr(),
export.as_ptr(),
Some(nfs_mount_callback),
callback_ptr as *mut c_void,
),
)?;
}
Ok(())
}

/// Supported flags are
/// O_APPEND
/// O_RDONLY
@@ -359,6 +481,31 @@ impl Nfs {
}
}

/// Accepts an optional callback function pointer
pub fn open_async<F>(&self, path: &Path, flags: OFlag, callback: F) -> Result<()>
where
F: FnOnce(Result<NfsFile>) -> Result<NfsFile> + 'static,
// This should accept a Nfs and return a Future
// how does the async chain get created?
// mount -> stat -> open -> etc etc etc all needs to be chained
{
let path = CString::new(path.as_os_str().as_bytes())?;
let callback_ptr = Box::into_raw(Box::new(callback));
unsafe {
check_retcode(
self.context.0,
nfs_open_async(
self.context.0,
path.as_ptr(),
flags.bits(),
Some(nfs_open_callback),
callback_ptr as *mut c_void,
),
)?;
}
Ok(())
}

pub fn opendir(&mut self, path: &Path) -> Result<NfsDirectory> {
let path = CString::new(path.as_os_str().as_bytes())?;
unsafe {
@@ -525,6 +672,55 @@ impl Nfs {
Ok(())
}

pub fn run_async(&mut self) -> Result<()> {
loop {
let mut file_desc: Vec<PollFd> = Vec::new();
let fd = unsafe { nfs_get_fd(self.context.0) };
let events_needed = unsafe { nfs_which_events(self.context.0) };
println!("events_needed: {:?}", events_needed);
let nfs_fd = PollFd::new(fd, EventFlags::from_bits_truncate(events_needed as i16));
file_desc.push(nfs_fd);
if poll(&mut file_desc, -1).expect("poll failed") < 0 {
println!("poll failed");
break;
} else {
println!("polled. Got revents: {:?}", file_desc[0].revents());
if unsafe {
nfs_service(
self.context.0,
file_desc[0].revents().expect("nfs_service failed").bits() as i32,
) < 0
} {
println!("nfs_service failed");
break;
}
}
}
Ok(())
}

pub fn setup_async(&mut self) -> Result<()> {
let register = tokio::reactor::Registration::new();
let fd = unsafe { nfs_get_fd(self.context.0) };
let events_needed = unsafe { nfs_which_events(self.context.0) };
println!("nsf_get_fd: {}", fd);
println!("nfs_which_events: {}", events_needed);
let registered = register.register(&EventedFd(&fd))?;
if registered {
println!("registered");
register.poll_read_ready()?;
println!("read ready");
if unsafe { nfs_service(self.context.0, events_needed) } < 0 {
println!("nfs_service failed");
}
} else {
// I/O resource has been previously registered

}

Ok(())
}

pub fn stat64(&self, path: &Path) -> Result<nfs_stat_64> {
let path = CString::new(path.as_os_str().as_bytes())?;
unsafe {
@@ -537,6 +733,27 @@ impl Nfs {
}
}

pub fn stat64_async<F>(&self, path: &Path, callback: F) -> Result<()>
where
F: FnMut(Result<NfsFile>) -> Result<NfsFile>, F: 'static,
{
let path = CString::new(path.as_os_str().as_bytes())?;
let cb: Box<Box<FnMut(Result<NfsFile>) -> Result<NfsFile>>> = Box::new(Box::new(callback));
println!("stat64_async callback_ptr: {:p}", &cb);
unsafe {
check_retcode(
self.context.0,
nfs_stat64_async(
self.context.0,
path.as_ptr(),
Some(stat64_callback),
Box::into_raw(cb) as _
),
)?;
Ok(())
}
}

pub fn statvfs(&self, path: &Path) -> Result<statvfs> {
let path = CString::new(path.as_os_str().as_bytes())?;
unsafe {
ProTip! Use n and p to navigate between commits in a pull request.