Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nydusd: make DELETE /api/v2/blobs API support cull fscache blob #894

Merged
merged 1 commit into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion api/openapi/nydus-api-v2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/ErrorMsg"
/blob_objects:
/blobs:
summary: Manage cached blob objects
####################################################################
get:
Expand Down Expand Up @@ -96,6 +96,21 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/ErrorMsg"
operationId: deleteBlobFile
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/BlobId"
responses:
"204":
description: "Successfully deleted the blob file!"
"500":
description: "Can't delete the blob file!"
content:
application/json:
schema:
$ref: "#/components/schemas/ErrorMsg"
################################################################
components:
schemas:
Expand Down
4 changes: 4 additions & 0 deletions api/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ pub enum ApiRequest {
GetBlobObject(BlobCacheObjectId),
/// Delete a blob cache entry
DeleteBlobObject(BlobCacheObjectId),
/// Delete a blob cache file
DeleteBlobFile(String),
kevinXYin marked this conversation as resolved.
Show resolved Hide resolved
}

/// Kinds for daemon related error messages.
Expand Down Expand Up @@ -291,6 +293,8 @@ pub enum HttpError {
CreateBlobObject(ApiError),
/// Failed to delete blob object
DeleteBlobObject(ApiError),
/// Failed to delete blob file
DeleteBlobFile(ApiError),
/// Failed to list existing blob objects
GetBlobObjects(ApiError),
}
Expand Down
4 changes: 4 additions & 0 deletions api/src/http_endpoint_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ impl EndpointHandler for BlobObjectListHandlerV2 {
let r = kicker(ApiRequest::DeleteBlobObject(param));
return Ok(convert_to_response(r, HttpError::DeleteBlobObject));
}
if let Some(blob_id) = extract_query_part(req, "blob_id") {
let r = kicker(ApiRequest::DeleteBlobFile(blob_id));
return Ok(convert_to_response(r, HttpError::DeleteBlobFile));
}
Err(HttpError::BadRequest)
}
_ => Err(HttpError::BadRequest),
Expand Down
8 changes: 8 additions & 0 deletions src/bin/nydusd/api_server_glue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl ApiServer {
ApiRequest::GetBlobObject(_param) => todo!(),
ApiRequest::CreateBlobObject(entry) => self.create_blob_cache_entry(&entry),
ApiRequest::DeleteBlobObject(param) => self.remove_blob_cache_entry(&param),
ApiRequest::DeleteBlobFile(blob_id) => self.blob_cache_gc(blob_id),
};

self.respond(resp);
Expand Down Expand Up @@ -326,6 +327,13 @@ impl ApiServer {
}
}

fn blob_cache_gc(&self, blob_id: String) -> ApiResponse {
self.get_daemon_object()?
kevinXYin marked this conversation as resolved.
Show resolved Hide resolved
.delete_blob(blob_id)
.map_err(|e| ApiError::DaemonAbnormal(e.into()))
.map(|_| ApiResponsePayload::Empty)
}

fn do_start(&self) -> ApiResponse {
let d = self.get_daemon_object()?;
d.trigger_start()
Expand Down
4 changes: 4 additions & 0 deletions src/bin/nydusd/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ pub trait NydusDaemon: DaemonStateMachineSubscriber + Send + Sync {

// For backward compatibility.
fn get_default_fs_service(&self) -> Option<Arc<dyn FsService>>;

fn delete_blob(&self, _blob_id: String) -> DaemonResult<()> {
kevinXYin marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}
}

// State machine for Nydus daemon workflow.
Expand Down
173 changes: 171 additions & 2 deletions src/bin/nydusd/fs_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ use std::cmp;
use std::collections::hash_map::Entry::Vacant;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fs::{File, OpenOptions};
use std::env;
use std::fs::{self, DirEntry, File, OpenOptions};
use std::io::{Error, ErrorKind, Result, Write};
use std::ops::Deref;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::path::{Path, PathBuf};
use std::ptr::read_unaligned;
use std::string::String;
use std::sync::atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -248,6 +250,7 @@ pub struct FsCacheHandler {
state: Arc<Mutex<FsCacheState>>,
poller: Mutex<Poll>,
waker: Arc<Waker>,
cache_dir: PathBuf,
}

impl FsCacheHandler {
Expand Down Expand Up @@ -303,7 +306,7 @@ impl FsCacheHandler {
id_to_config_map: Default::default(),
blob_cache_mgr,
};

let cache_dir = PathBuf::new().join(dir).join("cache");
Ok(FsCacheHandler {
active: AtomicBool::new(true),
barrier: Barrier::new(threads + 1),
Expand All @@ -312,6 +315,7 @@ impl FsCacheHandler {
state: Arc::new(Mutex::new(state)),
poller: Mutex::new(poller),
waker: Arc::new(waker),
cache_dir,
})
}

Expand Down Expand Up @@ -742,6 +746,171 @@ impl FsCacheHandler {
unsafe { fscache_cread(fd as i32, hdr.msg_id as u64).unwrap() };
}

pub fn cull_cache(&self, blob_id: String) -> Result<()> {
let children = fs::read_dir(self.cache_dir.clone())?;
let children = children.collect::<Result<Vec<DirEntry>>>()?;
let mut res = true;
// This is safe, because only api server which is a single thread server will call this func,
// and no other func will change cwd.
let cwd_old = env::current_dir()?;

info!("try to cull blob {}", blob_id);

// calc blob path in all volumes then try to cull them
for child in children {
let path = child.path();
let file_name = match child.file_name().to_str() {
Some(n) => n.to_string(),
None => {
env::set_current_dir(&cwd_old)?;
return Err(eother!("get file name failed"));
}
};
if !path.is_dir() || !file_name.starts_with("Ierofs,") {
continue;
}

// get volume_key form volume dir name e.g. Ierofs,SharedDomain
let volume_key = &file_name[1..];
let (cookie_dir, cookie_name) = self.generate_cookie_path(&path, volume_key, &blob_id);
let cookie_path = cookie_dir.join(&cookie_name);
if cookie_path.is_file() {
match self.inuse(&cookie_dir, &cookie_name) {
Err(e) => {
warn!(
"blob {} call inuse err {}, cull failed!",
cookie_path.display(),
e
);
res = false;
}
Ok(true) => {
warn!("blob {} in use, cull failed!", cookie_path.display());
res = false;
}
Ok(false) => {
if let Err(e) = self.cull(&cookie_dir, &cookie_name) {
warn!(
"blob {} call cull err {}, cull failed!",
cookie_path.display(),
e
);
res = false;
}
}
}
}
}
env::set_current_dir(&cwd_old)?;
if res {
Ok(())
} else {
Err(eother!("cull blob failed"))
}
}

#[inline]
fn hash_32(&self, val: u32) -> u32 {
val * 0x61C88647
}

#[inline]
fn rol32(&self, word: u32, shift: i32) -> u32 {
word << (shift & 31) | (word >> ((-shift) & 31))
}

#[inline]
fn round_up_u32(&self, size: usize) -> usize {
(size + 3) / 4 * 4
}

//address from kernel fscache_hash()
fn fscache_hash(&self, salt: u32, data: &[u8]) -> u32 {
assert_eq!(data.len() % 4, 0);

let mut x = 0;
let mut y = salt;
let mut buf_le32: [u8; 4] = [0; 4];
let n = data.len() / 4;

for i in 0..n {
buf_le32.clone_from_slice(&data[i * 4..i * 4 + 4]);
let a = unsafe { std::mem::transmute::<[u8; 4], u32>(buf_le32) }.to_le();
x ^= a;
y ^= x;
x = self.rol32(x, 7);
x += y;
y = self.rol32(y, 20);
y *= 9;
}
self.hash_32(y ^ self.hash_32(x))
}

fn generate_cookie_path(
&self,
volume_path: &Path,
volume_key: &str,
cookie_key: &str,
) -> (PathBuf, String) {
//calc volume hash
let mut volume_hash_key: Vec<u8> =
Vec::with_capacity(self.round_up_u32(volume_key.len() + 2));
volume_hash_key.push(volume_key.len() as u8);
volume_hash_key.append(&mut volume_key.as_bytes().to_vec());
volume_hash_key.resize(volume_hash_key.capacity(), 0);
let volume_hash = self.fscache_hash(0, volume_hash_key.as_slice());

//calc cookie hash
let mut cookie_hash_key: Vec<u8> = Vec::with_capacity(self.round_up_u32(cookie_key.len()));
cookie_hash_key.append(&mut cookie_key.as_bytes().to_vec());
cookie_hash_key.resize(cookie_hash_key.capacity(), 0);
let dir_hash = self.fscache_hash(volume_hash, cookie_hash_key.as_slice());

let dir = format!("@{:02x}", dir_hash as u8);
let cookie = format!("D{}", cookie_key);
(volume_path.join(dir), cookie)
}

fn inuse(&self, cookie_dir: &Path, cookie_name: &str) -> Result<bool> {
env::set_current_dir(&cookie_dir)?;
let msg = format!("inuse {}", cookie_name);
let ret = unsafe {
libc::write(
self.file.as_raw_fd(),
msg.as_bytes().as_ptr() as *const u8 as *const libc::c_void,
msg.len(),
)
};
if ret < 0 {
let err = Error::last_os_error();
if let Some(e) = err.raw_os_error() {
if e == libc::EBUSY {
return Ok(true);
}
}
Err(err)
} else {
Ok(false)
}
}

fn cull(&self, cookie_dir: &Path, cookie_name: &str) -> Result<()> {
env::set_current_dir(&cookie_dir)?;
kevinXYin marked this conversation as resolved.
Show resolved Hide resolved
let msg = format!("cull {}", cookie_name);
let ret = unsafe {
libc::write(
self.file.as_raw_fd(),
msg.as_bytes().as_ptr() as *const u8 as *const libc::c_void,
msg.len(),
)
};
if ret as usize != msg.len() {
Err(Error::last_os_error())
} else {
Ok(())
}
}

#[inline]
fn reply(&self, result: &str) {
// Safe because the fd and data buffer are valid. And we trust the fscache driver which
Expand Down
12 changes: 12 additions & 0 deletions src/bin/nydusd/service_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@ impl NydusDaemon for ServiceController {
fn get_default_fs_service(&self) -> Option<Arc<dyn FsService>> {
None
}

fn delete_blob(&self, _blob_id: String) -> DaemonResult<()> {
#[cfg(target_os = "linux")]
if self.fscache_enabled.load(Ordering::Acquire) {
if let Some(fscache) = self.fscache.lock().unwrap().clone() {
return fscache
.cull_cache(_blob_id)
.map_err(|e| DaemonError::StartService(format!("{}", e)));
}
}
Err(DaemonError::Unsupported)
}
}

impl DaemonStateMachineSubscriber for ServiceController {
Expand Down