Skip to content

Commit

Permalink
chore(cubestore): allow partitions with empty main tables
Browse files Browse the repository at this point in the history
  • Loading branch information
ilya-biryukov committed Nov 2, 2021
1 parent 092e2cc commit 108a4e3
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 9 deletions.
3 changes: 2 additions & 1 deletion rust/cubestore/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1428,7 +1428,8 @@ impl ClusterImpl {
if node_name != self.server_name {
continue;
}
if let Some(file) = partition_file_name(p.parent_partition_id, p.partition_id) {
if p.has_main_table {
let file = partition_file_name(p.partition_id);
if self.stop_token.is_cancelled() {
log::debug!("Startup warmup cancelled");
return;
Expand Down
5 changes: 2 additions & 3 deletions rust/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ pub trait MetaStore: DIService + Send + Sync {
/// Information required to produce partition name on remote fs.
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct PartitionName {
pub parent_partition_id: Option<u64>,
pub has_main_table: bool,
pub partition_id: u64,
}

Expand Down Expand Up @@ -3085,10 +3085,9 @@ impl MetaStore for RocksMetaStore {
.cloned(),
);
}

partitions.push((
PartitionName {
parent_partition_id: p.row.parent_partition_id,
has_main_table: p.row.has_main_table_file(),
partition_id: p.id,
},
chunks,
Expand Down
13 changes: 10 additions & 3 deletions rust/cubestore/src/metastore/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ impl Partition {
}

pub fn get_full_name(&self, partition_id: u64) -> Option<String> {
partition_file_name(self.parent_partition_id, partition_id)
match self.has_main_table_file() {
false => None,
true => Some(partition_file_name(partition_id)),
}
}

pub fn has_main_table_file(&self) -> bool {
self.main_table_row_count != 0
}

pub fn to_active(&self, active: bool) -> Partition {
Expand Down Expand Up @@ -100,8 +107,8 @@ impl Partition {
}
}

pub fn partition_file_name(parent_partition_id: Option<u64>, partition_id: u64) -> Option<String> {
parent_partition_id.and(Some(format!("{}.parquet", partition_id)))
pub fn partition_file_name(partition_id: u64) -> String {
format!("{}.parquet", partition_id)
}

#[derive(Clone, Copy, Debug)]
Expand Down
5 changes: 3 additions & 2 deletions rust/cubestore/src/store/compaction.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::config::injection::DIService;
use crate::config::ConfigObj;
use crate::metastore::MetaStore;
use crate::metastore::partition::partition_file_name;
use crate::remotefs::RemoteFs;
use crate::store::{ChunkDataStore, ROW_GROUP_SIZE};
use crate::table::data::cmp_partition_key;
Expand Down Expand Up @@ -122,7 +123,7 @@ impl CompactionService for CompactionServiceImpl {

let mut new_partition_local_files = Vec::new();
for p in new_partitions.iter() {
let new_remote_path = p.get_row().get_full_name(p.get_id()).unwrap();
let new_remote_path = partition_file_name(p.get_id());
new_partition_local_files.push(self.remote_fs.temp_upload_path(&new_remote_path).await?)
}

Expand Down Expand Up @@ -200,7 +201,7 @@ impl CompactionService for CompactionServiceImpl {
{
match p {
EitherOrBoth::Both(p, _) => {
let new_remote_path = p.get_row().get_full_name(p.get_id()).unwrap();
let new_remote_path = partition_file_name(p.get_id());
self.remote_fs
.upload_file(&new_partition_local_files[i], new_remote_path.as_str())
.await?;
Expand Down

0 comments on commit 108a4e3

Please sign in to comment.