From db05a62e299d9c40dd7d9a82e323e1b1a0df9d3d Mon Sep 17 00:00:00 2001 From: Alexandr Romanenko Date: Wed, 10 May 2023 20:36:56 +0300 Subject: [PATCH 1/3] chore(cubestore): Tuning local files cleanup --- rust/cubestore/cubestore/src/remotefs/gcs.rs | 3 ++ .../cubestore/cubestore/src/remotefs/queue.rs | 29 ++++++++++++++----- rust/cubestore/cubestore/src/remotefs/s3.rs | 3 ++ 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/rust/cubestore/cubestore/src/remotefs/gcs.rs b/rust/cubestore/cubestore/src/remotefs/gcs.rs index f99e7e05dc362..e663bdfb57a4f 100644 --- a/rust/cubestore/cubestore/src/remotefs/gcs.rs +++ b/rust/cubestore/cubestore/src/remotefs/gcs.rs @@ -278,6 +278,9 @@ impl RemoteFs for GCSRemoteFs { if result.len() % 1_000 > 0 { pages_count += 1; } + if pages_count > 50 { + log::warn!("S3 list returned more than 50 pages: {}", pages_count); + } app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags( pages_count as i64, Some(&vec![ diff --git a/rust/cubestore/cubestore/src/remotefs/queue.rs b/rust/cubestore/cubestore/src/remotefs/queue.rs index b8327a68aefc0..8f0b56d8f6381 100644 --- a/rust/cubestore/cubestore/src/remotefs/queue.rs +++ b/rust/cubestore/cubestore/src/remotefs/queue.rs @@ -191,7 +191,8 @@ impl QueueRemoteFs { Ok(()) } - const CLEANUP_INTERVAL: Duration = Duration::from_secs(600); + const CLEANUP_INTERVAL: Duration = Duration::from_secs(3 * 600); + const CLEANUP_FILE_SIZE_THRESHOLD: u64 = 1024 * 1024 * 1024; /// Periodically cleans up the local directory from the files removed on the remote side. /// This function currently removes only direct sibling files and does not touch subdirectories. /// So e.g. we remove the `.parquet` files, but not directories like `metastore` or heartbeat. @@ -216,9 +217,10 @@ impl QueueRemoteFs { // We rely on RemoteFs implementations to upload the file to the server before they make // it available on the local filesystem. let local_dir_copy = local_dir.clone(); - let res_local_files = - cube_ext::spawn_blocking(move || -> Result, std::io::Error> { + let res_local_files = cube_ext::spawn_blocking( + move || -> Result<(HashSet, u64), std::io::Error> { let mut local_files = HashSet::new(); + let mut local_files_size = 0; for res_entry in Path::new(&local_dir_copy).read_dir()? { let entry = match res_entry { Err(_) => continue, // ignore errors, might come from concurrent fs ops. @@ -233,6 +235,12 @@ impl QueueRemoteFs { continue; } + local_files_size = if let Ok(metadata) = entry.metadata() { + metadata.len() + } else { + 0 + }; + let file_name = match entry.file_name().into_string() { Err(_) => { log::error!("could not convert file name {:?}", entry.file_name()); @@ -243,12 +251,13 @@ impl QueueRemoteFs { local_files.insert(file_name); } - Ok(local_files) - }) - .await - .unwrap(); + Ok((local_files, local_files_size)) + }, + ) + .await + .unwrap(); - let mut local_files = match res_local_files { + let (mut local_files, local_files_size) = match res_local_files { Err(e) => { log::error!("error while trying to list local files: {}", e); continue; @@ -256,6 +265,10 @@ impl QueueRemoteFs { Ok(f) => f, }; + if local_files_size < Self::CLEANUP_FILE_SIZE_THRESHOLD { + continue; + } + let res_remote_files = self.list("").await; let remote_files = match res_remote_files { Err(e) => { diff --git a/rust/cubestore/cubestore/src/remotefs/s3.rs b/rust/cubestore/cubestore/src/remotefs/s3.rs index 9821ed7fad583..5bba1324b8345 100644 --- a/rust/cubestore/cubestore/src/remotefs/s3.rs +++ b/rust/cubestore/cubestore/src/remotefs/s3.rs @@ -282,6 +282,9 @@ impl RemoteFs for S3RemoteFs { pages_count as i64, Some(&vec!["operation:list".to_string(), "driver:s3".to_string()]), ); + if pages_count > 50 { + log::warn!("S3 list returned more than 50 pages: {}", pages_count); + } let leading_slash = Regex::new(format!("^{}", self.s3_path("")).as_str()).unwrap(); let result = list .iter() From 17cf6237fc239b4724d808403116f4f4f96a7e5d Mon Sep 17 00:00:00 2001 From: Alexandr Romanenko Date: Wed, 10 May 2023 21:58:13 +0300 Subject: [PATCH 2/3] config --- rust/cubestore/cubestore/src/config/mod.rs | 26 +++++++++++++++++++ .../cubestore/cubestore/src/remotefs/queue.rs | 8 +++--- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index e5a58ab95c78a..28a662b8b4e30 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -458,6 +458,10 @@ pub trait ConfigObj: DIService { fn transport_max_message_size(&self) -> usize; fn transport_max_frame_size(&self) -> usize; + + fn local_files_cleanup_interval_secs(&self) -> u64; + + fn local_files_cleanup_size_threshold(&self) -> u64; } #[derive(Debug, Clone)] @@ -526,6 +530,8 @@ pub struct ConfigObjImpl { pub disk_space_cache_duration_secs: u64, pub transport_max_message_size: usize, pub transport_max_frame_size: usize, + pub local_files_cleanup_interval_secs: u64, + pub local_files_cleanup_size_threshold: u64, } crate::di_service!(ConfigObjImpl, [ConfigObj]); @@ -772,6 +778,14 @@ impl ConfigObj for ConfigObjImpl { fn transport_max_frame_size(&self) -> usize { self.transport_max_frame_size } + + fn local_files_cleanup_interval_secs(&self) -> u64 { + self.local_files_cleanup_interval_secs + } + + fn local_files_cleanup_size_threshold(&self) -> u64 { + self.local_files_cleanup_size_threshold + } } lazy_static! { @@ -1075,6 +1089,16 @@ impl Config { Some(256 << 20), Some(4 << 20), ), + local_files_cleanup_interval_secs: env_parse( + "CUBESTORE_LOCAL_FILES_CLEANUP_INTERVAL_SECS", + 3 * 600, + ), + local_files_cleanup_size_threshold: env_parse_size( + "CUBESTORE_LOCAL_FILES_CLEANUP_SIZE_THRESHOLD", + 1024 * 1024 * 1024, + None, + None, + ) as u64, }), } } @@ -1155,6 +1179,8 @@ impl Config { disk_space_cache_duration_secs: 0, transport_max_message_size: 64 << 20, transport_max_frame_size: 16 << 20, + local_files_cleanup_interval_secs: 600, + local_files_cleanup_size_threshold: 0, }), } } diff --git a/rust/cubestore/cubestore/src/remotefs/queue.rs b/rust/cubestore/cubestore/src/remotefs/queue.rs index 8f0b56d8f6381..39a823c29c013 100644 --- a/rust/cubestore/cubestore/src/remotefs/queue.rs +++ b/rust/cubestore/cubestore/src/remotefs/queue.rs @@ -191,8 +191,6 @@ impl QueueRemoteFs { Ok(()) } - const CLEANUP_INTERVAL: Duration = Duration::from_secs(3 * 600); - const CLEANUP_FILE_SIZE_THRESHOLD: u64 = 1024 * 1024 * 1024; /// Periodically cleans up the local directory from the files removed on the remote side. /// This function currently removes only direct sibling files and does not touch subdirectories. /// So e.g. we remove the `.parquet` files, but not directories like `metastore` or heartbeat. @@ -202,10 +200,12 @@ impl QueueRemoteFs { async fn cleanup_loop(&self) -> () { let local_dir = self.local_path().await; let mut stopped_rx = self.stopped_rx.clone(); + let cleanup_interval = Duration::from_secs(self.config.local_files_cleanup_interval_secs()); + let cleanup_files_size_threshold = self.config.local_files_cleanup_size_threshold(); loop { // Do the cleanup every now and then. tokio::select! { - () = tokio::time::sleep(Self::CLEANUP_INTERVAL) => {}, + () = tokio::time::sleep(cleanup_interval) => {}, res = stopped_rx.changed() => { if res.is_err() || *stopped_rx.borrow() { return; @@ -265,7 +265,7 @@ impl QueueRemoteFs { Ok(f) => f, }; - if local_files_size < Self::CLEANUP_FILE_SIZE_THRESHOLD { + if local_files_size < cleanup_files_size_threshold { continue; } From 45c86715cf088d148ebccd0378ae8ae3f360a68c Mon Sep 17 00:00:00 2001 From: Alexandr Romanenko Date: Wed, 10 May 2023 22:02:21 +0300 Subject: [PATCH 3/3] update --- rust/cubestore/cubestore/src/remotefs/gcs.rs | 4 ++-- rust/cubestore/cubestore/src/remotefs/s3.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/cubestore/cubestore/src/remotefs/gcs.rs b/rust/cubestore/cubestore/src/remotefs/gcs.rs index e663bdfb57a4f..cf497f8f30116 100644 --- a/rust/cubestore/cubestore/src/remotefs/gcs.rs +++ b/rust/cubestore/cubestore/src/remotefs/gcs.rs @@ -278,8 +278,8 @@ impl RemoteFs for GCSRemoteFs { if result.len() % 1_000 > 0 { pages_count += 1; } - if pages_count > 50 { - log::warn!("S3 list returned more than 50 pages: {}", pages_count); + if pages_count > 100 { + log::warn!("S3 list returned more than 100 pages: {}", pages_count); } app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags( pages_count as i64, diff --git a/rust/cubestore/cubestore/src/remotefs/s3.rs b/rust/cubestore/cubestore/src/remotefs/s3.rs index 5bba1324b8345..43cfff7f0df2e 100644 --- a/rust/cubestore/cubestore/src/remotefs/s3.rs +++ b/rust/cubestore/cubestore/src/remotefs/s3.rs @@ -282,8 +282,8 @@ impl RemoteFs for S3RemoteFs { pages_count as i64, Some(&vec!["operation:list".to_string(), "driver:s3".to_string()]), ); - if pages_count > 50 { - log::warn!("S3 list returned more than 50 pages: {}", pages_count); + if pages_count > 100 { + log::warn!("S3 list returned more than 100 pages: {}", pages_count); } let leading_slash = Regex::new(format!("^{}", self.s3_path("")).as_str()).unwrap(); let result = list