From 4f59e3e8d452a4b1325e0e93978dd44c827c709d Mon Sep 17 00:00:00 2001 From: Alexandr Romanenko Date: Sun, 29 Jan 2023 00:48:42 +0300 Subject: [PATCH 1/3] feat(cubestore): Max disk space limit --- rust/cubestore/cubestore/src/config/mod.rs | 9 +++++++++ rust/cubestore/cubestore/src/metastore/mod.rs | 18 ++++++++++++++++++ rust/cubestore/cubestore/src/sql/mod.rs | 12 ++++++++++++ 3 files changed, 39 insertions(+) diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index 338e3c7c295a2..485045071ac69 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -399,6 +399,8 @@ pub trait ConfigObj: DIService { fn minimum_metastore_snapshots_count(&self) -> u64; fn metastore_snapshots_lifetime(&self) -> u64; + + fn max_disk_space_gb(&self) -> u64; } #[derive(Debug, Clone)] @@ -456,6 +458,7 @@ pub struct ConfigObjImpl { pub skip_kafka_parsing_errors: bool, pub minimum_metastore_snapshots_count: u64, pub metastore_snapshots_lifetime: u64, + pub max_disk_space_gb: u64, } crate::di_service!(ConfigObjImpl, [ConfigObj]); @@ -659,6 +662,10 @@ impl ConfigObj for ConfigObjImpl { fn metastore_snapshots_lifetime(&self) -> u64 { self.metastore_snapshots_lifetime } + + fn max_disk_space_gb(&self) -> u64 { + self.max_disk_space_gb + } } lazy_static! { @@ -873,6 +880,7 @@ impl Config { "CUBESTORE_METASTORE_SNAPSHOTS_LIFETIME", 24 * 60 * 60, ), + max_disk_space_gb: env_parse("CUBESTORE_MAX_DISK_SPACE_GB", 0), }), } } @@ -942,6 +950,7 @@ impl Config { skip_kafka_parsing_errors: false, minimum_metastore_snapshots_count: 3, metastore_snapshots_lifetime: 24 * 3600, + max_disk_space_gb: 0, }), } } diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index a7601b1a0efb2..0b91018dc4a6d 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -908,6 +908,7 @@ pub trait MetaStore: DIService + Send + Sync { partition_id: u64, include_inactive: bool, ) -> Result>, CubeError>; + async fn get_used_disk_space_out_of_queue(&self) -> Result; async fn get_chunks_by_partition_out_of_queue( &self, partition_id: u64, @@ -2232,6 +2233,23 @@ impl MetaStore for RocksMetaStore { .await } + async fn get_used_disk_space_out_of_queue(&self) -> Result { + self.read_operation_out_of_queue(move |db| { + let partitions_size: u64 = PartitionRocksTable::new(db.clone()) + .all_rows()? + .iter() + .map(|p| p.get_row().file_size().unwrap_or(0)) + .sum(); + let chunks_size: u64 = ChunkRocksTable::new(db) + .all_rows()? + .iter() + .map(|c| c.get_row().file_size().unwrap_or(0)) + .sum(); + Ok(partitions_size + chunks_size) + }) + .await + } + #[tracing::instrument(level = "trace", skip(self))] async fn get_partition_chunk_sizes(&self, partition_id: u64) -> Result { let chunks = self diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index fea0fd1d2115d..1d3b244c382fe 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -317,6 +317,18 @@ impl SqlServiceImpl { } else { None }; + + let max_disk_space = self.config_obj.max_disk_space_gb(); + if max_disk_space > 0 { + let used_space = self.db.get_used_disk_space_out_of_queue().await?; + if max_disk_space * 1024 * 1024 * 1024 < used_space { + return Err(CubeError::user(format!( + "Exceeded available storage space ({} Gb)", + max_disk_space + ))); + } + } + if !external { return self .db From 56f34681a975dec166e277a1c9366ea0f1047b3c Mon Sep 17 00:00:00 2001 From: Alexandr Romanenko Date: Sun, 29 Jan 2023 01:01:34 +0300 Subject: [PATCH 2/3] update --- rust/cubestore/cubestore/src/sql/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 1d3b244c382fe..7b6bd0a2e1bbc 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -323,7 +323,8 @@ impl SqlServiceImpl { let used_space = self.db.get_used_disk_space_out_of_queue().await?; if max_disk_space * 1024 * 1024 * 1024 < used_space { return Err(CubeError::user(format!( - "Exceeded available storage space ({} Gb)", + "Exceeded available storage space: {:.3} GB out of {} GB allowed. Please consider changing pre-aggregations build range, reducing index count or pre-aggregations granularity.", + used_space as f64 / 1024. / 1024. / 1024., max_disk_space ))); } From 653f2e9dfbb2962f886f7f27281780c22e598996 Mon Sep 17 00:00:00 2001 From: Alexandr Romanenko Date: Sun, 29 Jan 2023 02:34:26 +0300 Subject: [PATCH 3/3] tests --- rust/cubestore/cubestore/src/config/mod.rs | 12 +-- rust/cubestore/cubestore/src/metastore/mod.rs | 32 +++++--- rust/cubestore/cubestore/src/sql/mod.rs | 81 ++++++++++++++++++- 3 files changed, 105 insertions(+), 20 deletions(-) diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index 485045071ac69..274573558ec56 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -400,7 +400,7 @@ pub trait ConfigObj: DIService { fn metastore_snapshots_lifetime(&self) -> u64; - fn max_disk_space_gb(&self) -> u64; + fn max_disk_space(&self) -> u64; } #[derive(Debug, Clone)] @@ -458,7 +458,7 @@ pub struct ConfigObjImpl { pub skip_kafka_parsing_errors: bool, pub minimum_metastore_snapshots_count: u64, pub metastore_snapshots_lifetime: u64, - pub max_disk_space_gb: u64, + pub max_disk_space: u64, } crate::di_service!(ConfigObjImpl, [ConfigObj]); @@ -663,8 +663,8 @@ impl ConfigObj for ConfigObjImpl { self.metastore_snapshots_lifetime } - fn max_disk_space_gb(&self) -> u64 { - self.max_disk_space_gb + fn max_disk_space(&self) -> u64 { + self.max_disk_space } } @@ -880,7 +880,7 @@ impl Config { "CUBESTORE_METASTORE_SNAPSHOTS_LIFETIME", 24 * 60 * 60, ), - max_disk_space_gb: env_parse("CUBESTORE_MAX_DISK_SPACE_GB", 0), + max_disk_space: env_parse("CUBESTORE_MAX_DISK_SPACE_GB", 0) * 1024 * 1024 * 1024, }), } } @@ -950,7 +950,7 @@ impl Config { skip_kafka_parsing_errors: false, minimum_metastore_snapshots_count: 3, metastore_snapshots_lifetime: 24 * 3600, - max_disk_space_gb: 0, + max_disk_space: 0, }), } } diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index 0b91018dc4a6d..dc9af33cdf57d 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -909,6 +909,9 @@ pub trait MetaStore: DIService + Send + Sync { include_inactive: bool, ) -> Result>, CubeError>; async fn get_used_disk_space_out_of_queue(&self) -> Result; + async fn get_all_partitions_and_chunks_out_of_queue( + &self, + ) -> Result<(Vec>, Vec>), CubeError>; async fn get_chunks_by_partition_out_of_queue( &self, partition_id: u64, @@ -2234,18 +2237,25 @@ impl MetaStore for RocksMetaStore { } async fn get_used_disk_space_out_of_queue(&self) -> Result { + let (partitions, chunks) = self.get_all_partitions_and_chunks_out_of_queue().await?; + let partitions_size: u64 = partitions + .into_iter() + .map(|p| p.get_row().file_size().unwrap_or(0)) + .sum(); + let chunks_size: u64 = chunks + .into_iter() + .map(|c| c.get_row().file_size().unwrap_or(0)) + .sum(); + Ok(partitions_size + chunks_size) + } + + async fn get_all_partitions_and_chunks_out_of_queue( + &self, + ) -> Result<(Vec>, Vec>), CubeError> { self.read_operation_out_of_queue(move |db| { - let partitions_size: u64 = PartitionRocksTable::new(db.clone()) - .all_rows()? - .iter() - .map(|p| p.get_row().file_size().unwrap_or(0)) - .sum(); - let chunks_size: u64 = ChunkRocksTable::new(db) - .all_rows()? - .iter() - .map(|c| c.get_row().file_size().unwrap_or(0)) - .sum(); - Ok(partitions_size + chunks_size) + let partitions = PartitionRocksTable::new(db.clone()).all_rows()?; + let chunks = ChunkRocksTable::new(db).all_rows()?; + Ok((partitions, chunks)) }) .await } diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 7b6bd0a2e1bbc..55c7feaf85ec2 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -318,14 +318,14 @@ impl SqlServiceImpl { None }; - let max_disk_space = self.config_obj.max_disk_space_gb(); + let max_disk_space = self.config_obj.max_disk_space(); if max_disk_space > 0 { let used_space = self.db.get_used_disk_space_out_of_queue().await?; - if max_disk_space * 1024 * 1024 * 1024 < used_space { + if max_disk_space < used_space { return Err(CubeError::user(format!( "Exceeded available storage space: {:.3} GB out of {} GB allowed. Please consider changing pre-aggregations build range, reducing index count or pre-aggregations granularity.", used_space as f64 / 1024. / 1024. / 1024., - max_disk_space + max_disk_space as f64 / 1024. / 1024. / 1024. ))); } } @@ -3595,6 +3595,81 @@ mod tests { .await; } + #[tokio::test] + async fn disk_space_limit() { + Config::test("disk_space_limit") + .update_config(|mut c| { + c.partition_split_threshold = 1000000; + c.compaction_chunks_count_threshold = 100; + c.max_disk_space = 300_000; + c.select_workers = vec!["127.0.0.1:24306".to_string()]; + c.metastore_bind_address = Some("127.0.0.1:25312".to_string()); + c + }) + .start_test(async move |services| { + let service = services.sql_service; + + Config::test("disk_space_limit_worker_1") + .update_config(|mut c| { + c.worker_bind_address = Some("127.0.0.1:24306".to_string()); + c.server_name = "127.0.0.1:24306".to_string(); + c.metastore_remote_address = Some("127.0.0.1:25312".to_string()); + c + }) + .start_test_worker(async move |_| { + let paths = { + let dir = env::temp_dir(); + + let path_1 = dir.clone().join("foo-cluster-1.csv"); + let path_2 = dir.clone().join("foo-cluster-2.csv.gz"); + let mut file = File::create(path_1.clone()).unwrap(); + + file.write_all("id,city,arr,t\n".as_bytes()).unwrap(); + for i in 0..100000 + { + file.write_all(format!("{},\"New York\",\"[\"\"\"\"]\",2021-01-24 19:12:23.123 UTC\n", i).as_bytes()).unwrap(); + } + + + let mut file = GzipEncoder::new(BufWriter::new(tokio::fs::File::create(path_2.clone()).await.unwrap())); + + file.write_all("id,city,arr,t\n".as_bytes()).await.unwrap(); + for i in 0..100000 + { + file.write_all(format!("{},San Francisco,\"[\"\"Foo\"\",\"\"Bar\"\",\"\"FooBar\"\"]\",\"2021-01-24 12:12:23 UTC\"\n", i).as_bytes()).await.unwrap(); + } + + file.shutdown().await.unwrap(); + + vec![path_1, path_2] + }; + + let _ = service.exec_query("CREATE SCHEMA IF NOT EXISTS Foo").await.unwrap(); + let _ = service.exec_query( + &format!( + "CREATE TABLE Foo.Persons (id int, city text, t timestamp, arr text) INDEX persons_city (`city`, `id`) LOCATION {}", + paths.iter().map(|p| format!("'{}'", p.to_string_lossy())).join(",") + ) + ).await.unwrap(); + + let res = service.exec_query( + &format!( + "CREATE TABLE Foo.Persons2 (id int, city text, t timestamp, arr text) INDEX persons_city (`city`, `id`) LOCATION {}", + paths.iter().map(|p| format!("'{}'", p.to_string_lossy())).join(",") + ) + ).await; + if let Err(err) = res { + assert!(err.message.starts_with("Exceeded available storage ")); + } else { + assert!(false); + } + + }) + .await; + }) + .await; + } + #[tokio::test] async fn compaction() { Config::test("compaction").update_config(|mut config| {