Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions rust/cubestore/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,10 @@ pub trait ConfigObj: DIService {

fn transport_max_message_size(&self) -> usize;
fn transport_max_frame_size(&self) -> usize;

fn local_files_cleanup_interval_secs(&self) -> u64;

fn local_files_cleanup_size_threshold(&self) -> u64;
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -526,6 +530,8 @@ pub struct ConfigObjImpl {
pub disk_space_cache_duration_secs: u64,
pub transport_max_message_size: usize,
pub transport_max_frame_size: usize,
pub local_files_cleanup_interval_secs: u64,
pub local_files_cleanup_size_threshold: u64,
}

crate::di_service!(ConfigObjImpl, [ConfigObj]);
Expand Down Expand Up @@ -772,6 +778,14 @@ impl ConfigObj for ConfigObjImpl {
fn transport_max_frame_size(&self) -> usize {
self.transport_max_frame_size
}

fn local_files_cleanup_interval_secs(&self) -> u64 {
self.local_files_cleanup_interval_secs
}

fn local_files_cleanup_size_threshold(&self) -> u64 {
self.local_files_cleanup_size_threshold
}
}

lazy_static! {
Expand Down Expand Up @@ -1075,6 +1089,16 @@ impl Config {
Some(256 << 20),
Some(4 << 20),
),
local_files_cleanup_interval_secs: env_parse(
"CUBESTORE_LOCAL_FILES_CLEANUP_INTERVAL_SECS",
3 * 600,
),
local_files_cleanup_size_threshold: env_parse_size(
"CUBESTORE_LOCAL_FILES_CLEANUP_SIZE_THRESHOLD",
1024 * 1024 * 1024,
None,
None,
) as u64,
}),
}
}
Expand Down Expand Up @@ -1155,6 +1179,8 @@ impl Config {
disk_space_cache_duration_secs: 0,
transport_max_message_size: 64 << 20,
transport_max_frame_size: 16 << 20,
local_files_cleanup_interval_secs: 600,
local_files_cleanup_size_threshold: 0,
}),
}
}
Expand Down
3 changes: 3 additions & 0 deletions rust/cubestore/cubestore/src/remotefs/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ impl RemoteFs for GCSRemoteFs {
if result.len() % 1_000 > 0 {
pages_count += 1;
}
if pages_count > 100 {
log::warn!("S3 list returned more than 100 pages: {}", pages_count);
}
app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags(
pages_count as i64,
Some(&vec![
Expand Down
31 changes: 22 additions & 9 deletions rust/cubestore/cubestore/src/remotefs/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ impl QueueRemoteFs {
Ok(())
}

const CLEANUP_INTERVAL: Duration = Duration::from_secs(600);
/// Periodically cleans up the local directory from the files removed on the remote side.
/// This function currently removes only direct sibling files and does not touch subdirectories.
/// So e.g. we remove the `.parquet` files, but not directories like `metastore` or heartbeat.
Expand All @@ -201,10 +200,12 @@ impl QueueRemoteFs {
async fn cleanup_loop(&self) -> () {
let local_dir = self.local_path().await;
let mut stopped_rx = self.stopped_rx.clone();
let cleanup_interval = Duration::from_secs(self.config.local_files_cleanup_interval_secs());
let cleanup_files_size_threshold = self.config.local_files_cleanup_size_threshold();
loop {
// Do the cleanup every now and then.
tokio::select! {
() = tokio::time::sleep(Self::CLEANUP_INTERVAL) => {},
() = tokio::time::sleep(cleanup_interval) => {},
res = stopped_rx.changed() => {
if res.is_err() || *stopped_rx.borrow() {
return;
Expand All @@ -216,9 +217,10 @@ impl QueueRemoteFs {
// We rely on RemoteFs implementations to upload the file to the server before they make
// it available on the local filesystem.
let local_dir_copy = local_dir.clone();
let res_local_files =
cube_ext::spawn_blocking(move || -> Result<HashSet<String>, std::io::Error> {
let res_local_files = cube_ext::spawn_blocking(
move || -> Result<(HashSet<String>, u64), std::io::Error> {
let mut local_files = HashSet::new();
let mut local_files_size = 0;
for res_entry in Path::new(&local_dir_copy).read_dir()? {
let entry = match res_entry {
Err(_) => continue, // ignore errors, might come from concurrent fs ops.
Expand All @@ -233,6 +235,12 @@ impl QueueRemoteFs {
continue;
}

local_files_size = if let Ok(metadata) = entry.metadata() {
metadata.len()
} else {
0
};

let file_name = match entry.file_name().into_string() {
Err(_) => {
log::error!("could not convert file name {:?}", entry.file_name());
Expand All @@ -243,19 +251,24 @@ impl QueueRemoteFs {

local_files.insert(file_name);
}
Ok(local_files)
})
.await
.unwrap();
Ok((local_files, local_files_size))
},
)
.await
.unwrap();

let mut local_files = match res_local_files {
let (mut local_files, local_files_size) = match res_local_files {
Err(e) => {
log::error!("error while trying to list local files: {}", e);
continue;
}
Ok(f) => f,
};

if local_files_size < cleanup_files_size_threshold {
continue;
}

let res_remote_files = self.list("").await;
let remote_files = match res_remote_files {
Err(e) => {
Expand Down
3 changes: 3 additions & 0 deletions rust/cubestore/cubestore/src/remotefs/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ impl RemoteFs for S3RemoteFs {
pages_count as i64,
Some(&vec!["operation:list".to_string(), "driver:s3".to_string()]),
);
if pages_count > 100 {
log::warn!("S3 list returned more than 100 pages: {}", pages_count);
}
let leading_slash = Regex::new(format!("^{}", self.s3_path("")).as_str()).unwrap();
let result = list
.iter()
Expand Down