diff --git a/object_store/src/local.rs b/object_store/src/local.rs index d3bfab8ededd..4847389299ff 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -240,6 +240,8 @@ impl From for super::Error { #[derive(Debug)] pub struct LocalFileSystem { config: Arc, + // if you want to delete empty directories when deleting files + automatic_cleanup: bool, } #[derive(Debug)] @@ -266,6 +268,7 @@ impl LocalFileSystem { config: Arc::new(Config { root: Url::parse("file:///").unwrap(), }), + automatic_cleanup: false, } } @@ -282,6 +285,7 @@ impl LocalFileSystem { config: Arc::new(Config { root: absolute_path_to_url(path)?, }), + automatic_cleanup: false, }) } @@ -295,6 +299,12 @@ impl LocalFileSystem { ); self.config.prefix_to_filesystem(location) } + + /// Enable automatic cleanup of empty directories when deleting files + pub fn with_automatic_cleanup(mut self, automatic_cleanup: bool) -> Self { + self.automatic_cleanup = automatic_cleanup; + self + } } impl Config { @@ -465,13 +475,36 @@ impl ObjectStore for LocalFileSystem { } async fn delete(&self, location: &Path) -> Result<()> { + let config = Arc::clone(&self.config); let path = self.path_to_filesystem(location)?; - maybe_spawn_blocking(move || match std::fs::remove_file(&path) { - Ok(_) => Ok(()), - Err(e) => Err(match e.kind() { - ErrorKind::NotFound => Error::NotFound { path, source: e }.into(), - _ => Error::UnableToDeleteFile { path, source: e }.into(), - }), + let automactic_cleanup = self.automatic_cleanup; + maybe_spawn_blocking(move || { + if let Err(e) = std::fs::remove_file(&path) { + Err(match e.kind() { + ErrorKind::NotFound => Error::NotFound { path, source: e }.into(), + _ => Error::UnableToDeleteFile { path, source: e }.into(), + }) + } else if automactic_cleanup { + let root = &config.root; + let root = root + .to_file_path() + .map_err(|_| Error::InvalidUrl { url: root.clone() })?; + + // here we will try to traverse up and delete an empty dir if possible until we reach the root or get an error + let mut parent = path.parent(); + + while let Some(loc) = parent { + if loc != root && std::fs::remove_dir(loc).is_ok() { + parent = loc.parent(); + } else { + break; + } + } + + Ok(()) + } else { + Ok(()) + } }) .await } @@ -1010,6 +1043,8 @@ fn convert_walkdir_result( #[cfg(test)] mod tests { + use std::fs; + use futures::TryStreamExt; use tempfile::{NamedTempFile, TempDir}; @@ -1445,6 +1480,34 @@ mod tests { list.sort_unstable(); assert_eq!(list, vec![c, a]); } + + #[tokio::test] + async fn delete_dirs_automatically() { + let root = TempDir::new().unwrap(); + let integration = LocalFileSystem::new_with_prefix(root.path()) + .unwrap() + .with_automatic_cleanup(true); + let location = Path::from("nested/file/test_file"); + let data = Bytes::from("arbitrary data"); + + integration + .put(&location, data.clone().into()) + .await + .unwrap(); + + let read_data = integration + .get(&location) + .await + .unwrap() + .bytes() + .await + .unwrap(); + + assert_eq!(&*read_data, data); + assert!(fs::read_dir(root.path()).unwrap().count() > 0); + integration.delete(&location).await.unwrap(); + assert!(fs::read_dir(root.path()).unwrap().count() == 0); + } } #[cfg(not(target_arch = "wasm32"))]