Skip to content

Commit

Permalink
nydusd: make DELETE /api/v2/blobs API support cull fscache blob
Browse files Browse the repository at this point in the history
If only set blob_id parameter in DELETE /api/v2/blobs, will try to cull
fscache blob. Then snapshotter can use this to delete fscache blob files.

Signed-off-by: Xin Yin <yinxin.x@bytedance.com>
  • Loading branch information
Xin Yin committed Nov 25, 2022
1 parent 535d029 commit 849bec3
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 2 deletions.
4 changes: 4 additions & 0 deletions api/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,8 @@ pub enum ApiRequest {
GetBlobObject(BlobCacheObjectId),
/// Delete a blob cache entry
DeleteBlobObject(BlobCacheObjectId),
/// Delete a blob cache file
DeleteBlobFile(String),
}

/// Kinds for daemon related error messages.
Expand Down Expand Up @@ -621,6 +623,8 @@ pub enum HttpError {
CreateBlobObject(ApiError),
/// Failed to delete blob object
DeleteBlobObject(ApiError),
/// Failed to delete blob file
DeleteBlobFile(ApiError),
/// Failed to list existing blob objects
GetBlobObjects(ApiError),
}
Expand Down
4 changes: 4 additions & 0 deletions api/src/http_endpoint_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ impl EndpointHandler for BlobObjectListHandlerV2 {
let r = kicker(ApiRequest::DeleteBlobObject(param));
return Ok(convert_to_response(r, HttpError::DeleteBlobObject));
}
if let Some(blob_id) = extract_query_part(req, "blob_id") {
let r = kicker(ApiRequest::DeleteBlobFile(blob_id));
return Ok(convert_to_response(r, HttpError::DeleteBlobFile));
}
Err(HttpError::BadRequest)
}
_ => Err(HttpError::BadRequest),
Expand Down
8 changes: 8 additions & 0 deletions src/bin/nydusd/api_server_glue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl ApiServer {
ApiRequest::GetBlobObject(_param) => todo!(),
ApiRequest::CreateBlobObject(entry) => self.create_blob_cache_entry(&entry),
ApiRequest::DeleteBlobObject(param) => self.remove_blob_cache_entry(&param),
ApiRequest::DeleteBlobFile(blob_id) => self.blob_cache_gc(blob_id),
};

self.respond(resp);
Expand Down Expand Up @@ -326,6 +327,13 @@ impl ApiServer {
}
}

fn blob_cache_gc(&self, blob_id: String) -> ApiResponse {
self.get_daemon_object()?
.delete_blob(blob_id)
.map_err(|e| ApiError::DaemonAbnormal(e.into()))
.map(|_| ApiResponsePayload::Empty)
}

fn do_start(&self) -> ApiResponse {
let d = self.get_daemon_object()?;
d.trigger_start()
Expand Down
4 changes: 4 additions & 0 deletions src/bin/nydusd/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ pub trait NydusDaemon: DaemonStateMachineSubscriber + Send + Sync {

// For backward compatibility.
fn get_default_fs_service(&self) -> Option<Arc<dyn FsService>>;

fn delete_blob(&self, _blob_id: String) -> DaemonResult<()> {
Ok(())
}
}

// State machine for Nydus daemon workflow.
Expand Down
168 changes: 166 additions & 2 deletions src/bin/nydusd/fs_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ use std::cmp;
use std::collections::hash_map::Entry::Vacant;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fs::{File, OpenOptions};
use std::env;
use std::fs::{self, DirEntry, File, OpenOptions};
use std::io::{Error, ErrorKind, Result, Write};
use std::ops::Deref;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::path::{Path, PathBuf};
use std::ptr::read_unaligned;
use std::string::String;
use std::sync::atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -248,6 +250,7 @@ pub struct FsCacheHandler {
state: Arc<Mutex<FsCacheState>>,
poller: Mutex<Poll>,
waker: Arc<Waker>,
cache_dir: Mutex<PathBuf>,
}

impl FsCacheHandler {
Expand Down Expand Up @@ -303,7 +306,7 @@ impl FsCacheHandler {
id_to_config_map: Default::default(),
blob_cache_mgr,
};

let cache_dir = PathBuf::new().join(dir).join("cache");
Ok(FsCacheHandler {
active: AtomicBool::new(true),
barrier: Barrier::new(threads + 1),
Expand All @@ -312,6 +315,7 @@ impl FsCacheHandler {
state: Arc::new(Mutex::new(state)),
poller: Mutex::new(poller),
waker: Arc::new(waker),
cache_dir: Mutex::new(cache_dir),
})
}

Expand Down Expand Up @@ -743,6 +747,166 @@ impl FsCacheHandler {
unsafe { fscache_cread(fd as i32, hdr.msg_id as u64).unwrap() };
}

pub fn cull_cache(&self, blob_id: String) -> Result<()> {
//should hold lock during cull_cache, cause we need change pwd when call inuse and cull
let children = fs::read_dir(self.cache_dir.lock().unwrap().clone())?;
let children = children.collect::<Result<Vec<DirEntry>>>()?;
let mut res = true;
let cwd_old = env::current_dir()?;

info!("try to cull blob {}", blob_id);

//calc blob path in all volumes then try to cull them
for child in children {
let path = child.path();
if !path.is_dir() || !child.file_name().to_str().unwrap().starts_with("Ierofs,") {
continue;
}

//get volume_key form volume dir name e.g. Ierofs,SharedDomain
let volume_key = &child.file_name().to_str().unwrap().to_string()[1..];
let (cookie_dir, cookie_name) = self.generate_cookie_path(&path, volume_key, &blob_id);
let cookie_path = cookie_dir.join(&cookie_name);
if cookie_path.is_file() {
match self.inuse(&cookie_dir, &cookie_name) {
Err(e) => {
warn!(
"blob {} call inuse err {}, cull failed!",
cookie_path.display(),
e
);
res = false;
}
Ok(true) => {
warn!("blob {} in use, cull failed!", cookie_path.display());
res = false;
}
Ok(false) => {
if let Err(e) = self.cull(&cookie_dir, &cookie_name) {
warn!(
"blob {} call cull err {}, cull failed!",
cookie_path.display(),
e
);
res = false;
}
}
}
}
}
env::set_current_dir(&cwd_old)?;
if res {
Ok(())
} else {
Err(eother!("cull blob failed"))
}
}

#[inline]
fn hash_32(&self, val: u32) -> u32 {
val * 0x61C88647
}

#[inline]
fn rol32(&self, word: u32, shift: i32) -> u32 {
word << (shift & 31) | (word >> ((-shift) & 31))
}

#[inline]
fn round_up_u32(&self, size: usize) -> usize {
(size + 3) / 4 * 4
}

//address from kernel fscache_hash()
fn fscache_hash(&self, salt: u32, data: &[u8]) -> u32 {
assert_eq!(data.len() % 4, 0);

let mut x = 0;
let mut y = salt;
let mut buf_le32: [u8; 4] = [0; 4];
let n = data.len() / 4;

for i in 0..n {
buf_le32.clone_from_slice(&data[i * 4..i * 4 + 4]);
let a = unsafe { std::mem::transmute::<[u8; 4], u32>(buf_le32) }.to_le();
x ^= a;
y ^= x;
x = self.rol32(x, 7);
x += y;
y = self.rol32(y, 20);
y *= 9;
}
self.hash_32(y ^ self.hash_32(x))
}

fn generate_cookie_path(
&self,
volume_path: &Path,
volume_key: &str,
cookie_key: &str,
) -> (PathBuf, String) {
//calc volume hash
let mut volume_hash_key: Vec<u8> =
Vec::with_capacity(self.round_up_u32(volume_key.len() + 2));
volume_hash_key.push(volume_key.len() as u8);
volume_hash_key.append(&mut volume_key.as_bytes().to_vec());
while volume_hash_key.len() != volume_hash_key.capacity() {
volume_hash_key.push(0);
}
let volume_hash = self.fscache_hash(0, volume_hash_key.as_slice());

//calc cookie hash
let mut cookie_hash_key: Vec<u8> = Vec::with_capacity(self.round_up_u32(cookie_key.len()));
cookie_hash_key.append(&mut cookie_key.as_bytes().to_vec());
while cookie_hash_key.len() != cookie_hash_key.capacity() {
cookie_hash_key.push(0);
}
let dir_hash = self.fscache_hash(volume_hash, cookie_hash_key.as_slice());
let dir = format!("@{:02x}", dir_hash as u8);
let cookie = format!("D{}", cookie_key);
(volume_path.join(dir), cookie)
}

fn inuse(&self, cookie_dir: &Path, cookie_name: &str) -> Result<bool> {
env::set_current_dir(&cookie_dir)?;
let msg = format!("inuse {}", cookie_name);
let ret = unsafe {
libc::write(
self.file.as_raw_fd(),
msg.as_bytes().as_ptr() as *const u8 as *const libc::c_void,
msg.len(),
)
};
if ret < 0 {
let err = Error::last_os_error();
if let Some(e) = err.raw_os_error() {
if e == libc::EBUSY {
return Ok(true);
}
}
Err(err)
} else {
Ok(false)
}
}

fn cull(&self, cookie_dir: &Path, cookie_name: &str) -> Result<()> {
env::set_current_dir(&cookie_dir)?;
let msg = format!("cull {}", cookie_name);
let ret = unsafe {
libc::write(
self.file.as_raw_fd(),
msg.as_bytes().as_ptr() as *const u8 as *const libc::c_void,
msg.len(),
)
};
if ret as usize != msg.len() {
Err(Error::last_os_error())
} else {
Ok(())
}
}

#[inline]
fn reply(&self, result: &str) {
// Safe because the fd and data buffer are valid. And we trust the fscache driver which
Expand Down
12 changes: 12 additions & 0 deletions src/bin/nydusd/service_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@ impl NydusDaemon for ServiceController {
fn get_default_fs_service(&self) -> Option<Arc<dyn FsService>> {
None
}

fn delete_blob(&self, _blob_id: String) -> DaemonResult<()> {
#[cfg(target_os = "linux")]
if self.fscache_enabled.load(Ordering::Acquire) {
if let Some(fscache) = self.fscache.lock().unwrap().clone() {
return fscache
.cull_cache(_blob_id)
.map_err(|e| DaemonError::StartService(format!("{}", e)));
}
}
Err(DaemonError::Unsupported)
}
}

impl DaemonStateMachineSubscriber for ServiceController {
Expand Down

0 comments on commit 849bec3

Please sign in to comment.