From 366f193b46a7dd072272743096d7320b0d0d8608 Mon Sep 17 00:00:00 2001 From: Xiaoyang Han Date: Tue, 23 Jul 2024 22:49:37 +0800 Subject: [PATCH] [scorpio]: Dictionary Readonly fuse.sys. promote work. unstable. Signed-off-by: Xiaoyang Han --- scorpio/Cargo.toml | 2 +- scorpio/src/dicfuse/fuse.rs | 57 ++++++ scorpio/src/dicfuse/mod.rs | 354 ++++++++++++++++++++++++++--------- scorpio/src/dicfuse/store.rs | 179 ++++++++++++++++++ 4 files changed, 506 insertions(+), 86 deletions(-) create mode 100644 scorpio/src/dicfuse/fuse.rs create mode 100644 scorpio/src/dicfuse/store.rs diff --git a/scorpio/Cargo.toml b/scorpio/Cargo.toml index 517074d5e..0b1981436 100644 --- a/scorpio/Cargo.toml +++ b/scorpio/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -fuse-backend-rs = "0.12.0" +fuse-backend-rs = { version = "0.12.0"} fuser = "0.14.0" libc = "0.2.155" log = "0.4.22" diff --git a/scorpio/src/dicfuse/fuse.rs b/scorpio/src/dicfuse/fuse.rs new file mode 100644 index 000000000..7549a6229 --- /dev/null +++ b/scorpio/src/dicfuse/fuse.rs @@ -0,0 +1,57 @@ +use std::time::Duration; + +use libc::stat64; +use fuse_backend_rs::{abi::fuse_abi::Attr, api::filesystem::Entry}; +const BLOCK_SIZE: u32 = 512; +fn default_stat64(inode:u64) -> stat64 { + let t = Attr{ + ino: inode, // Default inode number + size: 0, // Default file size + blocks: 0, // Default number of blocks + atime: 0, // Default last access time + mtime: 0, // Default last modification time + ctime: 0, // Default last status change time + atimensec: 0, // Default nanoseconds of last access time + mtimensec: 0, // Default nanoseconds of last modification time + ctimensec: 0, // Default nanoseconds of last status change time + mode: 0o444, // Default file mode (r--r--r--) + nlink: 1, // Default number of hard links + uid: 1000, // Default user ID + gid: 1000, // Default group ID + rdev: 0, // Default device ID + blksize: BLOCK_SIZE, // Default block size + flags: 0, // Default flags + }; + t.into() +} +#[allow(unused)] +pub fn default_entry(inode:u64) -> Entry { + Entry{ + inode, + generation: 0, + attr: default_stat64(inode), + attr_flags: 0, + attr_timeout: Duration::from_secs(u64::MAX), + entry_timeout: Duration::from_secs(u64::MAX), + } // Return a default Entry instance +} +// pub struct stat64 { +// pub st_dev: ::dev_t, // Device ID of the device containing the file +// pub st_ino: ::ino64_t, // Inode number of the file +// pub st_nlink: ::nlink_t, // Number of hard links to the file +// pub st_mode: ::mode_t, // File type and mode (permissions) +// pub st_uid: ::uid_t, // User ID of the file's owner +// pub st_gid: ::gid_t, // Group ID of the file's owner +// __pad0: ::c_int, // Padding for alignment (not used) +// pub st_rdev: ::dev_t, // Device ID (if the file is a special file) +// pub st_size: ::off_t, // Total size of the file in bytes +// pub st_blksize: ::blksize_t, // Block size for filesystem I/O +// pub st_blocks: ::blkcnt64_t, // Number of blocks allocated for the file +// pub st_atime: ::time_t, // Time of last access +// pub st_atime_nsec: i64, // Nanoseconds of last access time +// pub st_mtime: ::time_t, // Time of last modification +// pub st_mtime_nsec: i64, // Nanoseconds of last modification time +// pub st_ctime: ::time_t, // Time of last status change +// pub st_ctime_nsec: i64, // Nanoseconds of last status change time +// __reserved: [i64; 3], // Reserved for future use (not used) +// } \ No newline at end of file diff --git a/scorpio/src/dicfuse/mod.rs b/scorpio/src/dicfuse/mod.rs index 88c793055..c9ce5b4a0 100644 --- a/scorpio/src/dicfuse/mod.rs +++ b/scorpio/src/dicfuse/mod.rs @@ -1,105 +1,289 @@ -use model::GPath; -/// Read only file system for obtaining and displaying monorepo directory information -use reqwest::Client; // Import Response explicitly -use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, error::Error}; -use once_cell::sync::Lazy; -use radix_trie::{self, TrieCommon}; -mod model; -const MEGA_TREE_URL: &str = "localhost:8000";//TODO: make it configable +pub mod model; +mod store; +mod fuse; -#[derive(Serialize, Deserialize, Debug)] -struct Item { - name: String, - path: String, - content_type: String, -} -#[allow(unused)] -struct DicItem{ - inode:u64, - name:GPath, - content_type: ContentType, -} +use std::{sync::Arc, time::Duration}; +use std::io::Result; +use fuse::default_entry; +use fuse_backend_rs::{abi::fuse_abi::FsOptions, api::filesystem::{Context, Entry, FileSystem}}; +use tokio::task::JoinHandle; -#[allow(unused)] -enum ContentType { - File, - Dictionary, +use store::DictionaryStore; + +struct Dicfuse{ + store: Arc, + //runtime: Arc, } #[allow(unused)] -impl DicItem { - pub fn new(inode:u64, item:Item) -> Self { - DicItem { - inode, - name: item.name.into(), // Assuming GPath can be created from String - content_type: match item.content_type.as_str() { - "file" => ContentType::File, - "directory" => ContentType::Dictionary, - _ => panic!("Unknown content type"), - }, +impl Dicfuse{ + pub fn new() -> Self { + Self { + store: DictionaryStore::new().into(), // Assuming DictionaryStore has a new() method + //runtime: tokio::runtime::Runtime::new().unwrap().into(), // Create a new runtime } } -} -#[derive(Serialize, Deserialize, Debug,Default)] -struct ApiResponse { - req_result: bool, - data: Vec, - err_message: String, + fn spawn(&self, f: F) -> JoinHandle + where + F: FnOnce(Arc) -> Fut, + Fut: std::future::Future + Send + 'static, + O: Send + 'static, + { + let inner = self.store.clone(); + tokio::task::spawn(f(inner)) + } } -// Get Mega dictionary tree from server + #[allow(unused)] -async fn fetch_tree(path: &str) -> Result> { - static CLIENT: Lazy = Lazy::new(Client::new); - let client = CLIENT.clone(); - let url = format!("http://{}/api/v1/tree?path={}", MEGA_TREE_URL, path); - let resp:ApiResponse = client.get(&url).send().await?.json().await?; - if resp.req_result { - Ok(resp) - }else{ - todo!(); +impl FileSystem for Dicfuse{ + type Inode = u64; + + type Handle = u64; + + fn init(&self, capable:FsOptions) -> Result { + self.store.import(); + Ok(fuse_backend_rs::abi::fuse_abi::FsOptions::empty()) } -} + + fn destroy(&self) {} + + fn lookup(&self, ctx: &Context, parent: Self::Inode, name: &std::ffi::CStr) -> Result { + let store = self.store.clone(); + let pitem = store.find_path(parent).ok_or_else(|| std::io::Error::from_raw_os_error(libc::ENODATA))?; + Ok(Entry::default()) + } + -#[allow(unused)] -pub struct Dicfuse{ - next_inode : u64, - radix_trie: radix_trie::Trie, - inodes:HashMap, -} -#[allow(unused)] -impl Dicfuse { - pub fn new() -> Self { - Dicfuse { - next_inode: 1, - radix_trie: radix_trie::Trie::new(), - inodes: HashMap::new(), + fn forget(&self, ctx: &Context, inode: Self::Inode, count: u64) {} + + fn batch_forget(&self, ctx: &Context, requests: Vec<(Self::Inode, u64)>) { + for (inode, count) in requests { + self.forget(ctx, inode, count) } } - pub fn import(&self){ - + + fn getattr( + &self, + ctx: &Context, + inode: Self::Inode, + handle: Option, + ) -> std::io::Result<(libc::stat64, std::time::Duration)> { + let store = self.store.clone(); + let i = store.find_path(inode).ok_or_else(|| std::io::Error::from_raw_os_error(libc::ENODATA))?; + let entry = default_entry(inode); + Ok((entry.attr,Duration::from_secs(u64::MAX))) + } - pub fn get_root(&self)->radix_trie::iter::Children { - let it = self.radix_trie.subtrie("/").unwrap(); - it.children() + + fn setattr( + &self, + ctx: &Context, + inode: Self::Inode, + attr: libc::stat64, + handle: Option, + valid: fuse_backend_rs::abi::fuse_abi::SetattrValid, + ) -> std::io::Result<(libc::stat64, std::time::Duration)> { + Err(std::io::Error::from_raw_os_error(libc::ENOSYS)) } - fn lookup(&self,inode :u64)-> Option{ - self.inodes.get(&inode).map(|item| item.name.clone()) + + + fn mknod( + &self, + ctx: &Context, + inode: Self::Inode, + name: &std::ffi::CStr, + mode: u32, + rdev: u32, + umask: u32, + ) -> std::io::Result { + Err(std::io::Error::from_raw_os_error(libc::ENOSYS)) + } + + fn mkdir( + &self, + ctx: &Context, + parent: Self::Inode, + name: &std::ffi::CStr, + mode: u32, + umask: u32, + ) -> std::io::Result { + Err(std::io::Error::from_raw_os_error(libc::ENOSYS)) } -} - -#[cfg(test)] -mod tests { - use super::*; - #[tokio::test] - #[ignore] // This will prevent the test from running by default - async fn test_fetch_tree_success() { - let path: &str = "/third-part/mega"; - - let result = fetch_tree(path).await.unwrap(); - println!("result: {:?}", result); + fn unlink(&self, ctx: &Context, parent: Self::Inode, name: &std::ffi::CStr) -> std::io::Result<()> { + Err(std::io::Error::from_raw_os_error(libc::ENOSYS)) } -} + + fn rmdir(&self, ctx: &Context, parent: Self::Inode, name: &std::ffi::CStr) -> std::io::Result<()> { + Err(std::io::Error::from_raw_os_error(libc::ENOSYS)) + } + + fn rename( + &self, + ctx: &Context, + olddir: Self::Inode, + oldname: &std::ffi::CStr, + newdir: Self::Inode, + newname: &std::ffi::CStr, + flags: u32, + ) -> std::io::Result<()> { + Err(std::io::Error::from_raw_os_error(libc::ENOSYS)) + } + + fn link( + &self, + ctx: &Context, + inode: Self::Inode, + newparent: Self::Inode, + newname: &std::ffi::CStr, + ) -> std::io::Result { + Err(std::io::Error::from_raw_os_error(libc::ENOSYS)) + } + + fn open( + &self, + ctx: &Context, + inode: Self::Inode, + flags: u32, + fuse_flags: u32, + ) -> std::io::Result<(Option, fuse_backend_rs::abi::fuse_abi::OpenOptions, Option)> { + // Matches the behavior of libfuse. + Ok((None, fuse_backend_rs::abi::fuse_abi::OpenOptions::empty(), None)) + } + + fn create( + &self, + ctx: &Context, + parent: Self::Inode, + name: &std::ffi::CStr, + args: fuse_backend_rs::abi::fuse_abi::CreateIn, + ) -> std::io::Result<(Entry, Option, fuse_backend_rs::abi::fuse_abi::OpenOptions, Option)> { + Err(std::io::Error::from_raw_os_error(libc::ENOSYS)) + } + + fn flush( + &self, + ctx: &Context, + inode: Self::Inode, + handle: Self::Handle, + lock_owner: u64, + ) -> std::io::Result<()> { + Err(std::io::Error::from_raw_os_error(libc::ENOSYS)) + } + + fn fsync( + &self, + ctx: &Context, + inode: Self::Inode, + datasync: bool, + handle: Self::Handle, + ) -> std::io::Result<()> { + Err(std::io::Error::from_raw_os_error(libc::ENOSYS)) + } + + fn fallocate( + &self, + ctx: &Context, + inode: Self::Inode, + handle: Self::Handle, + mode: u32, + offset: u64, + length: u64, + ) -> std::io::Result<()> { + Err(std::io::Error::from_raw_os_error(libc::ENOSYS)) + } + + fn release( + &self, + ctx: &Context, + inode: Self::Inode, + flags: u32, + handle: Self::Handle, + flush: bool, + flock_release: bool, + lock_owner: Option, + ) -> std::io::Result<()> { + Err(std::io::Error::from_raw_os_error(libc::ENOSYS)) + } + + fn statfs(&self, ctx: &Context, inode: Self::Inode) -> std::io::Result { + // Safe because we are zero-initializing a struct with only POD fields. + let mut st: libc::statvfs64 = unsafe { std::mem::zeroed() }; + // This matches the behavior of libfuse as it returns these values if the + // filesystem doesn't implement this method. + st.f_namemax = 255; + st.f_bsize = 512; + Ok(st) + } + + fn setxattr( + &self, + ctx: &Context, + inode: Self::Inode, + name: &std::ffi::CStr, + value: &[u8], + flags: u32, + ) -> std::io::Result<()> { + Err(std::io::Error::from_raw_os_error(libc::ENOSYS)) + } + + fn getxattr( + &self, + ctx: &Context, + inode: Self::Inode, + name: &std::ffi::CStr, + size: u32, + ) -> std::io::Result { + Err(std::io::Error::from_raw_os_error(libc::ENOSYS)) + } + + fn listxattr( + &self, + ctx: &Context, + inode: Self::Inode, + size: u32, + ) -> std::io::Result { + Err(std::io::Error::from_raw_os_error(libc::ENOSYS)) + } + + fn opendir( + &self, + ctx: &Context, + inode: Self::Inode, + flags: u32, + ) -> std::io::Result<(Option, fuse_backend_rs::abi::fuse_abi::OpenOptions)> { + // Matches the behavior of libfuse. + Ok((None, fuse_backend_rs::abi::fuse_abi::OpenOptions::empty())) + } + + fn readdir( + &self, + ctx: &Context, + inode: Self::Inode, + handle: Self::Handle, + size: u32, + offset: u64, + add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> std::io::Result, + ) -> std::io::Result<()> { + Err(std::io::Error::from_raw_os_error(libc::ENOSYS)) + } + + fn readdirplus( + &self, + ctx: &Context, + inode: Self::Inode, + handle: Self::Handle, + size: u32, + offset: u64, + add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry, Entry) -> std::io::Result, + ) -> std::io::Result<()> { + Err(std::io::Error::from_raw_os_error(libc::ENOSYS)) + } + + fn access(&self, ctx: &Context, inode: Self::Inode, mask: u32) -> std::io::Result<()> { + Err(std::io::Error::from_raw_os_error(libc::ENOSYS)) + } + +} \ No newline at end of file diff --git a/scorpio/src/dicfuse/store.rs b/scorpio/src/dicfuse/store.rs new file mode 100644 index 000000000..9fa0c77c4 --- /dev/null +++ b/scorpio/src/dicfuse/store.rs @@ -0,0 +1,179 @@ + +/// Read only file system for obtaining and displaying monorepo directory information +use reqwest::Client; +// Import Response explicitly +use serde::{Deserialize, Serialize}; +use std::io; +use std::sync::atomic::AtomicU64; +use std::{collections::HashMap, error::Error}; +use std::collections::VecDeque; +use once_cell::sync::Lazy; +use radix_trie::{self, TrieCommon}; +use std::sync::{Arc,Mutex}; + + +use super::model::GPath; +const MEGA_TREE_URL: &str = "localhost:8000";//TODO: make it configable + + +#[derive(Serialize, Deserialize, Debug)] +pub struct Item { + name: String, + path: String, + content_type: String, +} +#[allow(unused)] +struct DicItem{ + inode:u64, + name:GPath, + content_type: ContentType, +} + +#[allow(unused)] +#[derive(PartialEq)] +enum ContentType { + File, + Dictionary, +} +#[allow(unused)] +impl DicItem { + pub fn new(inode:u64, item:Item) -> Self { + DicItem { + inode, + name: item.name.into(), // Assuming GPath can be created from String + content_type: match item.content_type.as_str() { + "file" => ContentType::File, + "directory" => ContentType::Dictionary, + _ => panic!("Unknown content type"), + }, + } + } + pub fn get_path(&self) -> String { + self.name.name() + } +} +#[derive(Serialize, Deserialize, Debug,Default)] +struct ApiResponse { + req_result: bool, + data: Vec, + err_message: String, +} +impl Iterator for ApiResponse{ + type Item = Item; + fn next(&mut self) -> Option { + self.data.pop() + } +} +// Get Mega dictionary tree from server +async fn fetch_tree(path: &str) -> Result> { + static CLIENT: Lazy = Lazy::new(Client::new); + let client = CLIENT.clone(); + let url = format!("http://{}/api/v1/tree?path={}", MEGA_TREE_URL, path); + let resp:ApiResponse = client.get(&url).send().await?.json().await?; + if resp.req_result { + Ok(resp) + }else{ + todo!(); + } +} + +#[allow(unused)] +pub struct DictionaryStore { + inodes: Arc>>, + next_inode: AtomicU64, + queue: Arc>>, + radix_trie: Arc>>, +} + + +#[allow(unused)] +impl DictionaryStore { + pub fn new() -> Self { + DictionaryStore { + next_inode: AtomicU64::new(1), + inodes: Arc::new(Mutex::new(HashMap::new())), + radix_trie: Arc::new(Mutex::new(radix_trie::Trie::new())), + queue: Arc::new(Mutex::new(VecDeque::new())), + } + } + fn update_inode(&self,item:Item){ + self.next_inode.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let alloc_inode = self.next_inode.load(std::sync::atomic::Ordering::Relaxed); + self.radix_trie.lock().unwrap().insert(item.path.clone(), alloc_inode); + self.inodes.lock().unwrap().insert(alloc_inode, DicItem::new(alloc_inode, item)); + self.queue.lock().unwrap().push_back(alloc_inode); + } + pub fn import(&self){ + const ROOT_DIR: &str ="/"; + let mut queue = VecDeque::new(); + let items: Vec = tokio::runtime::Runtime::new().unwrap().block_on(fetch_tree(ROOT_DIR)).unwrap().collect();//todo: can't tokio + for it in items{ + self.update_inode(it); + } + while !queue.is_empty() {//BFS to look up all dictionary + let one_inode = queue.pop_back().unwrap(); + let mut new_items = Vec::new(); + { + let inodes_lock = self.inodes.lock().unwrap(); + let it = inodes_lock.get(&one_inode).unwrap(); + if it.content_type == ContentType::Dictionary{ + let path = it.get_path(); + new_items = tokio::runtime::Runtime::new().unwrap().block_on(fetch_tree(&path)).unwrap().collect(); + } + } + for newit in new_items { + self.update_inode(newit); // Await the update_inode call + } + new_items = Vec::new(); + } + //queue.clear(); + } + + + pub fn find_path(&self,inode :u64)-> Option{ + self.inodes.lock().unwrap().get(&inode).map(|item| item.name.clone()) + } + + fn find_children(&self,parent: u64) -> Result{ + let path = self.inodes.lock().unwrap().get(&parent).map(|item| item.name.clone()); + if let Some(parent_path) = path{ + let l = self.radix_trie.lock().unwrap(); + let pathstr:String =parent_path.name(); + let u = l.subtrie(&pathstr).unwrap(); + let c = u.children(); + } + todo!() + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[tokio::test] + #[ignore] // This will prevent the test from running by default + async fn test_fetch_tree_success() { + let path: &str = "/third-part/mega"; + + let result = fetch_tree(path).await.unwrap(); + println!("result: {:?}", result); + + } + + #[test] + fn test_tree(){ + let mut t = radix_trie::Trie::::new(); + t.insert(String::from("/a"), 0); + t.insert(String::from("/a/b"), 0); + t.insert(String::from("/a/c"), 0); + t.insert(String::from("/a/d"), 0); + t.insert(String::from("/a/c/1"), 0); + t.insert(String::from("/a/c/2"), 0); + t.insert(String::from("/a/c/2"), 0); + t.insert(String::from("/a/b/1"), 0); + + let c = t.children(); + c.into_iter().for_each(|it|println!("{:?}\n",it) + ) + } +} +