Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions rust/cubestore/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,8 @@ pub trait ConfigObj: DIService {
fn minimum_metastore_snapshots_count(&self) -> u64;

fn metastore_snapshots_lifetime(&self) -> u64;

fn max_disk_space(&self) -> u64;
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -456,6 +458,7 @@ pub struct ConfigObjImpl {
pub skip_kafka_parsing_errors: bool,
pub minimum_metastore_snapshots_count: u64,
pub metastore_snapshots_lifetime: u64,
pub max_disk_space: u64,
}

crate::di_service!(ConfigObjImpl, [ConfigObj]);
Expand Down Expand Up @@ -659,6 +662,10 @@ impl ConfigObj for ConfigObjImpl {
fn metastore_snapshots_lifetime(&self) -> u64 {
self.metastore_snapshots_lifetime
}

fn max_disk_space(&self) -> u64 {
self.max_disk_space
}
}

lazy_static! {
Expand Down Expand Up @@ -873,6 +880,7 @@ impl Config {
"CUBESTORE_METASTORE_SNAPSHOTS_LIFETIME",
24 * 60 * 60,
),
max_disk_space: env_parse("CUBESTORE_MAX_DISK_SPACE_GB", 0) * 1024 * 1024 * 1024,
}),
}
}
Expand Down Expand Up @@ -942,6 +950,7 @@ impl Config {
skip_kafka_parsing_errors: false,
minimum_metastore_snapshots_count: 3,
metastore_snapshots_lifetime: 24 * 3600,
max_disk_space: 0,
}),
}
}
Expand Down
28 changes: 28 additions & 0 deletions rust/cubestore/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,10 @@ pub trait MetaStore: DIService + Send + Sync {
partition_id: u64,
include_inactive: bool,
) -> Result<Vec<IdRow<Chunk>>, CubeError>;
async fn get_used_disk_space_out_of_queue(&self) -> Result<u64, CubeError>;
async fn get_all_partitions_and_chunks_out_of_queue(
&self,
) -> Result<(Vec<IdRow<Partition>>, Vec<IdRow<Chunk>>), CubeError>;
async fn get_chunks_by_partition_out_of_queue(
&self,
partition_id: u64,
Expand Down Expand Up @@ -2232,6 +2236,30 @@ impl MetaStore for RocksMetaStore {
.await
}

async fn get_used_disk_space_out_of_queue(&self) -> Result<u64, CubeError> {
let (partitions, chunks) = self.get_all_partitions_and_chunks_out_of_queue().await?;
let partitions_size: u64 = partitions
.into_iter()
.map(|p| p.get_row().file_size().unwrap_or(0))
.sum();
let chunks_size: u64 = chunks
.into_iter()
.map(|c| c.get_row().file_size().unwrap_or(0))
.sum();
Ok(partitions_size + chunks_size)
}

async fn get_all_partitions_and_chunks_out_of_queue(
&self,
) -> Result<(Vec<IdRow<Partition>>, Vec<IdRow<Chunk>>), CubeError> {
self.read_operation_out_of_queue(move |db| {
let partitions = PartitionRocksTable::new(db.clone()).all_rows()?;
let chunks = ChunkRocksTable::new(db).all_rows()?;
Ok((partitions, chunks))
})
.await
}

#[tracing::instrument(level = "trace", skip(self))]
async fn get_partition_chunk_sizes(&self, partition_id: u64) -> Result<u64, CubeError> {
let chunks = self
Expand Down
88 changes: 88 additions & 0 deletions rust/cubestore/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,19 @@ impl SqlServiceImpl {
} else {
None
};

let max_disk_space = self.config_obj.max_disk_space();
if max_disk_space > 0 {
let used_space = self.db.get_used_disk_space_out_of_queue().await?;
if max_disk_space < used_space {
return Err(CubeError::user(format!(
"Exceeded available storage space: {:.3} GB out of {} GB allowed. Please consider changing pre-aggregations build range, reducing index count or pre-aggregations granularity.",
used_space as f64 / 1024. / 1024. / 1024.,
max_disk_space as f64 / 1024. / 1024. / 1024.
)));
}
}

if !external {
return self
.db
Expand Down Expand Up @@ -3582,6 +3595,81 @@ mod tests {
.await;
}

#[tokio::test]
async fn disk_space_limit() {
Config::test("disk_space_limit")
.update_config(|mut c| {
c.partition_split_threshold = 1000000;
c.compaction_chunks_count_threshold = 100;
c.max_disk_space = 300_000;
c.select_workers = vec!["127.0.0.1:24306".to_string()];
c.metastore_bind_address = Some("127.0.0.1:25312".to_string());
c
})
.start_test(async move |services| {
let service = services.sql_service;

Config::test("disk_space_limit_worker_1")
.update_config(|mut c| {
c.worker_bind_address = Some("127.0.0.1:24306".to_string());
c.server_name = "127.0.0.1:24306".to_string();
c.metastore_remote_address = Some("127.0.0.1:25312".to_string());
c
})
.start_test_worker(async move |_| {
let paths = {
let dir = env::temp_dir();

let path_1 = dir.clone().join("foo-cluster-1.csv");
let path_2 = dir.clone().join("foo-cluster-2.csv.gz");
let mut file = File::create(path_1.clone()).unwrap();

file.write_all("id,city,arr,t\n".as_bytes()).unwrap();
for i in 0..100000
{
file.write_all(format!("{},\"New York\",\"[\"\"\"\"]\",2021-01-24 19:12:23.123 UTC\n", i).as_bytes()).unwrap();
}


let mut file = GzipEncoder::new(BufWriter::new(tokio::fs::File::create(path_2.clone()).await.unwrap()));

file.write_all("id,city,arr,t\n".as_bytes()).await.unwrap();
for i in 0..100000
{
file.write_all(format!("{},San Francisco,\"[\"\"Foo\"\",\"\"Bar\"\",\"\"FooBar\"\"]\",\"2021-01-24 12:12:23 UTC\"\n", i).as_bytes()).await.unwrap();
}

file.shutdown().await.unwrap();

vec![path_1, path_2]
};

let _ = service.exec_query("CREATE SCHEMA IF NOT EXISTS Foo").await.unwrap();
let _ = service.exec_query(
&format!(
"CREATE TABLE Foo.Persons (id int, city text, t timestamp, arr text) INDEX persons_city (`city`, `id`) LOCATION {}",
paths.iter().map(|p| format!("'{}'", p.to_string_lossy())).join(",")
)
).await.unwrap();

let res = service.exec_query(
&format!(
"CREATE TABLE Foo.Persons2 (id int, city text, t timestamp, arr text) INDEX persons_city (`city`, `id`) LOCATION {}",
paths.iter().map(|p| format!("'{}'", p.to_string_lossy())).join(",")
)
).await;
if let Err(err) = res {
assert!(err.message.starts_with("Exceeded available storage "));
} else {
assert!(false);
}

})
.await;
})
.await;
}

#[tokio::test]
async fn compaction() {
Config::test("compaction").update_config(|mut config| {
Expand Down