diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index e5a58ab95c78a..28a662b8b4e30 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -458,6 +458,10 @@ pub trait ConfigObj: DIService { fn transport_max_message_size(&self) -> usize; fn transport_max_frame_size(&self) -> usize; + + fn local_files_cleanup_interval_secs(&self) -> u64; + + fn local_files_cleanup_size_threshold(&self) -> u64; } #[derive(Debug, Clone)] @@ -526,6 +530,8 @@ pub struct ConfigObjImpl { pub disk_space_cache_duration_secs: u64, pub transport_max_message_size: usize, pub transport_max_frame_size: usize, + pub local_files_cleanup_interval_secs: u64, + pub local_files_cleanup_size_threshold: u64, } crate::di_service!(ConfigObjImpl, [ConfigObj]); @@ -772,6 +778,14 @@ impl ConfigObj for ConfigObjImpl { fn transport_max_frame_size(&self) -> usize { self.transport_max_frame_size } + + fn local_files_cleanup_interval_secs(&self) -> u64 { + self.local_files_cleanup_interval_secs + } + + fn local_files_cleanup_size_threshold(&self) -> u64 { + self.local_files_cleanup_size_threshold + } } lazy_static! { @@ -1075,6 +1089,16 @@ impl Config { Some(256 << 20), Some(4 << 20), ), + local_files_cleanup_interval_secs: env_parse( + "CUBESTORE_LOCAL_FILES_CLEANUP_INTERVAL_SECS", + 3 * 600, + ), + local_files_cleanup_size_threshold: env_parse_size( + "CUBESTORE_LOCAL_FILES_CLEANUP_SIZE_THRESHOLD", + 1024 * 1024 * 1024, + None, + None, + ) as u64, }), } } @@ -1155,6 +1179,8 @@ impl Config { disk_space_cache_duration_secs: 0, transport_max_message_size: 64 << 20, transport_max_frame_size: 16 << 20, + local_files_cleanup_interval_secs: 600, + local_files_cleanup_size_threshold: 0, }), } } diff --git a/rust/cubestore/cubestore/src/remotefs/gcs.rs b/rust/cubestore/cubestore/src/remotefs/gcs.rs index f99e7e05dc362..cf497f8f30116 100644 --- a/rust/cubestore/cubestore/src/remotefs/gcs.rs +++ b/rust/cubestore/cubestore/src/remotefs/gcs.rs @@ -278,6 +278,9 @@ impl RemoteFs for GCSRemoteFs { if result.len() % 1_000 > 0 { pages_count += 1; } + if pages_count > 100 { + log::warn!("S3 list returned more than 100 pages: {}", pages_count); + } app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags( pages_count as i64, Some(&vec![ diff --git a/rust/cubestore/cubestore/src/remotefs/queue.rs b/rust/cubestore/cubestore/src/remotefs/queue.rs index b8327a68aefc0..39a823c29c013 100644 --- a/rust/cubestore/cubestore/src/remotefs/queue.rs +++ b/rust/cubestore/cubestore/src/remotefs/queue.rs @@ -191,7 +191,6 @@ impl QueueRemoteFs { Ok(()) } - const CLEANUP_INTERVAL: Duration = Duration::from_secs(600); /// Periodically cleans up the local directory from the files removed on the remote side. /// This function currently removes only direct sibling files and does not touch subdirectories. /// So e.g. we remove the `.parquet` files, but not directories like `metastore` or heartbeat. @@ -201,10 +200,12 @@ impl QueueRemoteFs { async fn cleanup_loop(&self) -> () { let local_dir = self.local_path().await; let mut stopped_rx = self.stopped_rx.clone(); + let cleanup_interval = Duration::from_secs(self.config.local_files_cleanup_interval_secs()); + let cleanup_files_size_threshold = self.config.local_files_cleanup_size_threshold(); loop { // Do the cleanup every now and then. tokio::select! { - () = tokio::time::sleep(Self::CLEANUP_INTERVAL) => {}, + () = tokio::time::sleep(cleanup_interval) => {}, res = stopped_rx.changed() => { if res.is_err() || *stopped_rx.borrow() { return; @@ -216,9 +217,10 @@ impl QueueRemoteFs { // We rely on RemoteFs implementations to upload the file to the server before they make // it available on the local filesystem. let local_dir_copy = local_dir.clone(); - let res_local_files = - cube_ext::spawn_blocking(move || -> Result, std::io::Error> { + let res_local_files = cube_ext::spawn_blocking( + move || -> Result<(HashSet, u64), std::io::Error> { let mut local_files = HashSet::new(); + let mut local_files_size = 0; for res_entry in Path::new(&local_dir_copy).read_dir()? { let entry = match res_entry { Err(_) => continue, // ignore errors, might come from concurrent fs ops. @@ -233,6 +235,12 @@ impl QueueRemoteFs { continue; } + local_files_size = if let Ok(metadata) = entry.metadata() { + metadata.len() + } else { + 0 + }; + let file_name = match entry.file_name().into_string() { Err(_) => { log::error!("could not convert file name {:?}", entry.file_name()); @@ -243,12 +251,13 @@ impl QueueRemoteFs { local_files.insert(file_name); } - Ok(local_files) - }) - .await - .unwrap(); + Ok((local_files, local_files_size)) + }, + ) + .await + .unwrap(); - let mut local_files = match res_local_files { + let (mut local_files, local_files_size) = match res_local_files { Err(e) => { log::error!("error while trying to list local files: {}", e); continue; @@ -256,6 +265,10 @@ impl QueueRemoteFs { Ok(f) => f, }; + if local_files_size < cleanup_files_size_threshold { + continue; + } + let res_remote_files = self.list("").await; let remote_files = match res_remote_files { Err(e) => { diff --git a/rust/cubestore/cubestore/src/remotefs/s3.rs b/rust/cubestore/cubestore/src/remotefs/s3.rs index 9821ed7fad583..43cfff7f0df2e 100644 --- a/rust/cubestore/cubestore/src/remotefs/s3.rs +++ b/rust/cubestore/cubestore/src/remotefs/s3.rs @@ -282,6 +282,9 @@ impl RemoteFs for S3RemoteFs { pages_count as i64, Some(&vec!["operation:list".to_string(), "driver:s3".to_string()]), ); + if pages_count > 100 { + log::warn!("S3 list returned more than 100 pages: {}", pages_count); + } let leading_slash = Regex::new(format!("^{}", self.s3_path("")).as_str()).unwrap(); let result = list .iter()