From a0bd36c0898f66da907f3c04c439fc31826777a0 Mon Sep 17 00:00:00 2001 From: Alexandr Romanenko Date: Thu, 23 Jun 2022 16:45:22 +0300 Subject: [PATCH] feat(cubestore): Aggregating index (#4379) * feat(cubestore): Storing aggregating index (#4599) * feat(cubestore): Using aggregating indexes in queries (#4728) --- .../cubestore-sql-tests/src/tests.rs | 268 +++++++++++ .../cubestore/src/metastore/index.rs | 13 +- rust/cubestore/cubestore/src/metastore/mod.rs | 421 +++++++++++++++++- .../cubestore/src/metastore/table.rs | 109 ++++- .../info_schema/system_indexes.rs | 11 + .../queryplanner/info_schema/system_tables.rs | 12 + .../cubestore/src/queryplanner/mod.rs | 1 + .../cubestore/src/queryplanner/planning.rs | 273 ++++++++++-- rust/cubestore/cubestore/src/sql/mod.rs | 141 +++++- rust/cubestore/cubestore/src/sql/parser.rs | 99 +++- .../cubestore/src/store/compaction.rs | 235 +++++++++- rust/cubestore/cubestore/src/store/mod.rs | 191 +++++++- rust/cubestore/cubestore/src/table/parquet.rs | 4 + 13 files changed, 1710 insertions(+), 68 deletions(-) diff --git a/rust/cubestore/cubestore-sql-tests/src/tests.rs b/rust/cubestore/cubestore-sql-tests/src/tests.rs index 71ad7e765950..426f009884b6 100644 --- a/rust/cubestore/cubestore-sql-tests/src/tests.rs +++ b/rust/cubestore/cubestore-sql-tests/src/tests.rs @@ -179,6 +179,10 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> { ), t("panic_worker", panic_worker), t("planning_filter_index_selection", planning_filter_index_selection), + t("planning_aggregate_index", planning_aggregate_index), + t("aggregate_index", aggregate_index), + t("aggregate_index_hll", aggregate_index_hll), + t("aggregate_index_errors", aggregate_index_errors), ]; fn t(name: &'static str, f: fn(Box) -> F) -> (&'static str, TestFn) @@ -4747,6 +4751,270 @@ async fn filter_multiple_in_for_decimal(service: Box) { assert_eq!(to_rows(&r), rows(&[(2)])); } +async fn planning_aggregate_index(service: Box) { + service.exec_query("CREATE SCHEMA s").await.unwrap(); + service + .exec_query("CREATE TABLE s.Orders(a int, b int, c int, a_sum int, a_max int, a_min int, a_merge HYPERLOGLOG) + AGGREGATIONS(sum(a_sum), max(a_max), min(a_min), merge(a_merge)) + INDEX reg_index (a, b) + AGGREGATE INDEX aggr_index (a, b) + ") + .await + .unwrap(); + + let p = service + .plan_query("SELECT a, b, sum(a_sum) FROM s.Orders GROUP BY 1, 2") + .await + .unwrap(); + assert_eq!( + pp_phys_plan(p.worker.as_ref()), + "Projection, [a, b, SUM(s.Orders.a_sum)@2:SUM(a_sum)]\ + \n FinalInplaceAggregate\ + \n Worker\ + \n PartialInplaceAggregate\ + \n MergeSort\ + \n Scan, index: aggr_index:2:[2]:sort_on[a, b], fields: [a, b, a_sum]\ + \n Empty" + ); + + let p = service + .plan_query("SELECT a, b, sum(a_sum), max(a_max), min(a_min), merge(a_merge) FROM s.Orders GROUP BY 1, 2") + .await + .unwrap(); + assert_eq!( + pp_phys_plan(p.worker.as_ref()), + "Projection, [a, b, SUM(s.Orders.a_sum)@2:SUM(a_sum), MAX(s.Orders.a_max)@3:MAX(a_max), MIN(s.Orders.a_min)@4:MIN(a_min), MERGE(s.Orders.a_merge)@5:MERGE(a_merge)]\ + \n FinalInplaceAggregate\ + \n Worker\ + \n PartialInplaceAggregate\ + \n MergeSort\ + \n Scan, index: aggr_index:2:[2]:sort_on[a, b], fields: *\ + \n Empty" + ); + + let p = service + .plan_query("SELECT a, b, sum(a_sum), max(a_max), min(a_min), merge(a_merge) FROM s.Orders WHERE c = 1 GROUP BY 1, 2") + .await + .unwrap(); + assert_eq!( + pp_phys_plan(p.worker.as_ref()), + "Projection, [a, b, SUM(s.Orders.a_sum)@2:SUM(a_sum), MAX(s.Orders.a_max)@3:MAX(a_max), MIN(s.Orders.a_min)@4:MIN(a_min), MERGE(s.Orders.a_merge)@5:MERGE(a_merge)]\ + \n FinalInplaceAggregate\ + \n Worker\ + \n PartialInplaceAggregate\ + \n Filter\ + \n MergeSort\ + \n Scan, index: default:3:[3]:sort_on[a, b, c], fields: *\ + \n Empty" + ); + + let p = service + .plan_query("SELECT a, sum(a_sum), max(a_max), min(a_min), merge(a_merge) FROM s.Orders GROUP BY 1") + .await + .unwrap(); + assert_eq!( + pp_phys_plan(p.worker.as_ref()), + "Projection, [a, SUM(s.Orders.a_sum)@1:SUM(a_sum), MAX(s.Orders.a_max)@2:MAX(a_max), MIN(s.Orders.a_min)@3:MIN(a_min), MERGE(s.Orders.a_merge)@4:MERGE(a_merge)]\ + \n FinalInplaceAggregate\ + \n Worker\ + \n PartialInplaceAggregate\ + \n MergeSort\ + \n Scan, index: aggr_index:2:[2]:sort_on[a], fields: [a, a_sum, a_max, a_min, a_merge]\ + \n Empty" + ); + + let p = service + .plan_query("SELECT a, avg(a_sum) FROM s.Orders GROUP BY 1") + .await + .unwrap(); + assert_eq!( + pp_phys_plan(p.worker.as_ref()), + "Projection, [a, AVG(s.Orders.a_sum)@1:AVG(a_sum)]\ + \n FinalInplaceAggregate\ + \n Worker\ + \n PartialInplaceAggregate\ + \n MergeSort\ + \n Scan, index: reg_index:1:[1]:sort_on[a], fields: [a, a_sum]\ + \n Empty" + ); + + let p = service + .plan_query("SELECT a, sum(a_sum) FROM s.Orders WHERE b = 1 GROUP BY 1") + .await + .unwrap(); + assert_eq!( + pp_phys_plan(p.worker.as_ref()), + "Projection, [a, SUM(s.Orders.a_sum)@1:SUM(a_sum)]\ + \n FinalInplaceAggregate\ + \n Worker\ + \n PartialInplaceAggregate\ + \n Filter\ + \n MergeSort\ + \n Scan, index: aggr_index:2:[2]:sort_on[a, b], fields: [a, b, a_sum]\ + \n Empty" + ); +} + +async fn aggregate_index(service: Box) { + service.exec_query("CREATE SCHEMA s").await.unwrap(); + service + .exec_query("CREATE TABLE s.Orders(a int, b int, c int, a_sum int, a_max int, a_min int) + AGGREGATIONS(sum(a_sum), max(a_max), min(a_min)) + INDEX reg_index (a, b) + AGGREGATE INDEX aggr_index (a, b) + ") + .await + .unwrap(); + service + .exec_query( + "INSERT INTO s.Orders (a, b, c, a_sum, a_max, a_min) VALUES (1, 10, 100, 10, 10, 10), \ + (2, 20, 200, 10, 10, 10), \ + (1, 10, 300, 20, 20, 20), \ + (2, 30, 200, 100, 100, 100), \ + (1, 10, 400, 20, 40, 40), \ + (2, 20, 410, 20, 30, 30) \ + ").await.unwrap(); + + let res = service + .exec_query("SELECT a, b, sum(a_sum) as sum, max(a_max) as max, min(a_min) as min FROM s.Orders GROUP BY 1, 2 ORDER BY 1, 2") + .await + .unwrap(); + + assert_eq!( + to_rows(&res), + [ + [TableValue::Int(1), TableValue::Int(10), TableValue::Int(50), TableValue::Int(40), TableValue::Int(10)], + [TableValue::Int(2), TableValue::Int(20), TableValue::Int(30), TableValue::Int(30), TableValue::Int(10)], + [TableValue::Int(2), TableValue::Int(30), TableValue::Int(100), TableValue::Int(100), TableValue::Int(100)], + ] + ); + + let res = service + .exec_query("SELECT a, sum(a_sum) as sum, max(a_max) as max, min(a_min) as min FROM s.Orders GROUP BY 1 ORDER BY 1") + .await + .unwrap(); + + assert_eq!( + to_rows(&res), + [ + [TableValue::Int(1), TableValue::Int(50), TableValue::Int(40), TableValue::Int(10)], + [TableValue::Int(2), TableValue::Int(130), TableValue::Int(100), TableValue::Int(10)], + ] + ); + + let res = service + .exec_query("SELECT a, sum(a_sum) as sum, max(a_max) as max, min(a_min) as min FROM s.Orders WHERE b = 20 GROUP BY 1 ORDER BY 1") + .await + .unwrap(); + + assert_eq!( + to_rows(&res), + [ + [TableValue::Int(2), TableValue::Int(30), TableValue::Int(30), TableValue::Int(10)], + ] + ); +} + +async fn aggregate_index_hll(service: Box) { + service.exec_query("CREATE SCHEMA s").await.unwrap(); + service + .exec_query("CREATE TABLE s.Orders(a int, b int, a_hll hyperloglog) + AGGREGATIONS(merge(a_hll)) + AGGREGATE INDEX aggr_index (a, b) + ") + .await + .unwrap(); + service + .exec_query( + "INSERT INTO s.Orders (a, b, a_hll) VALUES \ + (1, 10, X'020C0200C02FF58941D5F0C6'), \ + (1, 20, X'020C0200C02FF58941D5F0C6'), \ + (1, 10, X'020C0200C02FF58941D5F0C6'), \ + (1, 20, X'020C0200C02FF58941D5F0C6') \ + ").await.unwrap(); + + let res = service + .exec_query("SELECT a, b, cardinality(merge(a_hll)) as hll FROM s.Orders GROUP BY 1, 2 ORDER BY 1, 2") + .await + .unwrap(); + assert_eq!( + to_rows(&res), + [ + [TableValue::Int(1), TableValue::Int(10), TableValue::Int(2)], + [TableValue::Int(1), TableValue::Int(20), TableValue::Int(2)], + ] + ); + + let res = service + .exec_query("SELECT a, cardinality(merge(a_hll)) as hll FROM s.Orders WHERE b = 20 GROUP BY 1 ORDER BY 1") + .await + .unwrap(); + assert_eq!( + to_rows(&res), + [ + [TableValue::Int(1), TableValue::Int(2)], + ] + ); + + let res = service + .exec_query("SELECT a, cardinality(merge(a_hll)) as hll FROM s.Orders GROUP BY 1 ORDER BY 1") + .await + .unwrap(); + assert_eq!( + to_rows(&res), + [ + [TableValue::Int(1), TableValue::Int(2)], + ] + ); + +} + +async fn aggregate_index_errors(service: Box) { + service.exec_query("CREATE SCHEMA s").await.unwrap(); + service + .exec_query("CREATE TABLE s.Orders(a int, b int, a_hll hyperloglog) + AGGREGATE INDEX aggr_index (a, b, a_hll) + ") + .await + .expect_err("Can't create aggregate index for table 'Orders' because aggregate columns (`AGGREGATIONS`) not specified for the table"); + + service + .exec_query("CREATE TABLE s.Orders(a int, b int, a_hll hyperloglog) + AGGREGATIONS(merge(a_hll)) + AGGREGATE INDEX aggr_index (a, b, a_hll) + ") + .await + .expect_err("Column 'a_hll' in aggregate index 'aggr_index' is in aggregations list for table 'Orders'. Aggregate index columns must be outside of aggregations list."); + + service + .exec_query("CREATE TABLE s.Orders(a int, b string, a_hll hyperloglog) + AGGREGATIONS (sum(a), sum(b)) + ") + .await + .expect_err("Aggregate function SUM not allowed for column type text"); + + service + .exec_query("CREATE TABLE s.Orders(a int, b string, a_hll hyperloglog) + AGGREGATIONS (sum(a), max(b), sum(a_hll)) + ") + .await + .expect_err("Aggregate function SUM not allowed for column type hyperloglog"); + + service + .exec_query("CREATE TABLE s.Orders(a int, b string, a_hll hyperloglog) + AGGREGATIONS (max(a_hll)) + ") + .await + .expect_err("Aggregate function MAX not allowed for column type hyperloglog"); + + service + .exec_query("CREATE TABLE s.Orders(a int, b string, a_hll hyperloglog) + AGGREGATIONS (merge(a)) + ") + .await + .expect_err("Aggregate function MERGE not allowed for column type integer"); + +} async fn panic_worker(service: Box) { let r = service.exec_query("SYS PANIC WORKER").await; assert_eq!(r, Err(CubeError::panic("worker panic".to_string()))); diff --git a/rust/cubestore/cubestore/src/metastore/index.rs b/rust/cubestore/cubestore/src/metastore/index.rs index 01167146560f..701f19e24031 100644 --- a/rust/cubestore/cubestore/src/metastore/index.rs +++ b/rust/cubestore/cubestore/src/metastore/index.rs @@ -1,5 +1,6 @@ use super::{ - BaseRocksSecondaryIndex, Column, Index, IndexId, RocksSecondaryIndex, RocksTable, TableId, + BaseRocksSecondaryIndex, Column, Index, IndexId, IndexType, RocksSecondaryIndex, RocksTable, + TableId, }; use crate::metastore::{IdRow, MetaStoreEvent}; use crate::{rocks_table_impl, CubeError}; @@ -16,6 +17,7 @@ impl Index { sort_key_size: u64, partition_split_key_size: Option, multi_index_id: Option, + index_type: IndexType, ) -> Result { if sort_key_size == 0 { return Err(CubeError::user(format!( @@ -30,6 +32,7 @@ impl Index { sort_key_size, partition_split_key_size, multi_index_id, + index_type, }) } @@ -41,6 +44,10 @@ impl Index { &self.name } + pub fn get_type(&self) -> IndexType { + self.index_type.clone() + } + pub fn columns(&self) -> &Vec { &self.columns } @@ -61,6 +68,10 @@ impl Index { pub fn multi_index_id(&self) -> Option { self.multi_index_id } + + pub fn index_type_default() -> IndexType { + IndexType::Regular + } } #[derive(Clone, Copy, Debug)] diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index cde4fef78aba..b6cfa6ea9e52 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -35,7 +35,7 @@ use crate::metastore::partition::PartitionIndexKey; use crate::metastore::source::{ Source, SourceCredentials, SourceIndexKey, SourceRocksIndex, SourceRocksTable, }; -use crate::metastore::table::{TableIndexKey, TablePath}; +use crate::metastore::table::{AggregateColumnIndex, TableIndexKey, TablePath}; use crate::metastore::wal::{WALIndexKey, WALRocksIndex}; use crate::remotefs::{LocalDirRemoteFs, RemoteFs}; use crate::table::{Row, TableValue}; @@ -278,6 +278,12 @@ impl DataFrameValue for Option { } } +impl DataFrameValue for IndexType { + fn value(v: &Self) -> String { + format!("{:?}", v) + } +} + impl DataFrameValue for Option { fn value(v: &Self) -> String { v.as_ref() @@ -319,6 +325,14 @@ impl DataFrameValue for Option { } } +impl DataFrameValue for Option> { + fn value(v: &Self) -> String { + v.as_ref() + .map(|v| v.iter().map(|v| format!("{}", v)).join(", ")) + .unwrap_or("NULL".to_string()) + } +} + #[derive(Clone, Copy, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] pub enum HllFlavour { Airlift, // Compatible with Presto, Athena, etc. @@ -514,6 +528,12 @@ pub struct Schema { } } +#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] +pub enum IndexType { + Regular = 1, + Aggregate = 2, +} + data_frame_from! { #[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] pub struct Index { @@ -524,8 +544,67 @@ pub struct Index { #[serde(default)] partition_split_key_size: Option, #[serde(default)] - multi_index_id: Option + multi_index_id: Option, + #[serde(default = "Index::index_type_default")] + index_type: IndexType +} } + +#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] +pub enum AggregateFunction { + SUM = 1, + MAX = 2, + MIN = 3, + MERGE = 4, +} + +impl FromStr for AggregateFunction { + type Err = CubeError; + + fn from_str(s: &str) -> Result { + match s.to_uppercase().as_ref() { + "SUM" => Ok(AggregateFunction::SUM), + "MAX" => Ok(AggregateFunction::MAX), + "MIN" => Ok(AggregateFunction::MIN), + "MERGE" => Ok(AggregateFunction::MERGE), + _ => Err(CubeError::user(format!( + "Function {} can't be used in aggregate index", + s + ))), + } + } +} + +impl fmt::Display for AggregateFunction { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let res = match self { + Self::SUM => "SUM", + Self::MAX => "MAX", + Self::MIN => "MIN", + Self::MERGE => "MERGE", + }; + + f.write_fmt(format_args!("{}", res)) + } +} + +impl AggregateFunction { + pub fn allowed_for_type(&self, col_type: &ColumnType) -> bool { + match self { + Self::MAX | Self::MIN => match col_type { + ColumnType::HyperLogLog(_) => false, + _ => true, + }, + Self::SUM => match col_type { + ColumnType::Int | ColumnType::Decimal { .. } | ColumnType::Float => true, + _ => false, + }, + Self::MERGE => match col_type { + ColumnType::HyperLogLog(_) => true, + _ => false, + }, + } + } } #[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] @@ -533,6 +612,7 @@ pub struct IndexDef { pub name: String, pub columns: Vec, pub multi_index: Option, + pub index_type: IndexType, } data_frame_from! { @@ -762,6 +842,7 @@ pub trait MetaStore: DIService + Send + Sync { indexes: Vec, is_ready: bool, unique_key_column_names: Option>, + aggregates: Option>, partition_split_threshold: Option, ) -> Result, CubeError>; async fn table_ready(&self, id: u64, is_ready: bool) -> Result, CubeError>; @@ -2604,6 +2685,37 @@ impl RocksMetaStore { multi_index: Option<&IdRow>, multi_partitions: &[IdRow], index_def: IndexDef, + ) -> Result, CubeError> { + match index_def.index_type { + IndexType::Regular => Self::add_regular_index( + batch_pipe, + rocks_index, + rocks_partition, + table_cols, + table_id, + multi_index, + multi_partitions, + index_def, + ), + IndexType::Aggregate => Self::add_aggregate_index( + batch_pipe, + rocks_index, + rocks_partition, + table_cols, + table_id, + index_def, + ), + } + } + fn add_regular_index( + batch_pipe: &mut BatchPipe, + rocks_index: &IndexRocksTable, + rocks_partition: &PartitionRocksTable, + table_cols: &Vec, + table_id: &IdRow, + multi_index: Option<&IdRow>, + multi_partitions: &[IdRow], + index_def: IndexDef, ) -> Result, CubeError> { debug_assert_eq!(multi_index.is_some(), !multi_partitions.is_empty()); if let Some(not_found) = index_def @@ -2672,7 +2784,7 @@ impl RocksMetaStore { if !taken[i] { taken[i] = true; - index_columns.push(seq_column.clone().replace_index(index_columns.len())); + index_columns.push(seq_column.replace_index(index_columns.len())); } } @@ -2683,7 +2795,7 @@ impl RocksMetaStore { continue; } - index_columns.push(table_cols[i].clone().replace_index(index_columns.len())); + index_columns.push(table_cols[i].replace_index(index_columns.len())); } assert_eq!(index_columns.len(), table_cols.len()); @@ -2719,6 +2831,7 @@ impl RocksMetaStore { // Seq column shouldn't participate in partition split. Otherwise we can't do shared nothing calculations across partitions. table_id.get_row().seq_column().map(|_| sorted_key_size - 1), multi_index.map(|i| i.id), + IndexType::Regular, )?; let index_id = rocks_index.insert(index, batch_pipe)?; if multi_partitions.is_empty() { @@ -2739,6 +2852,88 @@ impl RocksMetaStore { Ok(index_id) } + fn add_aggregate_index( + batch_pipe: &mut BatchPipe, + rocks_index: &IndexRocksTable, + rocks_partition: &PartitionRocksTable, + table_cols: &Vec, + table_id: &IdRow
, + index_def: IndexDef, + ) -> Result, CubeError> { + if let Some(not_found) = index_def + .columns + .iter() + .chain(index_def.columns.iter()) + .find(|dc| table_cols.iter().all(|c| c.name.as_str() != dc.as_str())) + { + return Err(CubeError::user(format!( + "Column '{}' in aggregate index '{}' is not found in table '{}'", + not_found, + index_def.name, + table_id.get_row().get_table_name() + ))); + } + + let aggregate_columns = table_id.get_row().aggregate_columns(); + if aggregate_columns.is_empty() { + return Err(CubeError::user(format!( + "Can't create aggregate index for table '{}' because aggregate columns (`AGGREGATIONS`) not specified for the table", + table_id.get_row().get_table_name() + ))); + } + if let Some(col_in_aggreations) = index_def.columns.iter().find(|dc| { + aggregate_columns + .iter() + .any(|c| c.column().name == dc.as_str()) + }) { + return Err(CubeError::user(format!( + "Column '{}' in aggregate index '{}' is in aggregations list for table '{}'. Aggregate index columns must be outside of aggregations list.", + col_in_aggreations, + index_def.name, + table_id.get_row().get_table_name() + ))); + } + let unique_key_columns = table_id.get_row().unique_key_columns(); + if unique_key_columns.is_some() { + return Err(CubeError::user(format!( + "Can't create aggregate index for table '{}' because aggregate index for the table with unique key is not supported yet", + table_id.get_row().get_table_name()))); + } + + // First put the columns from the sort key. + let mut taken = vec![false; table_cols.len()]; + let mut index_columns = Vec::with_capacity(index_def.columns.len()); + for c in index_def.columns { + let i = table_cols.iter().position(|tc| tc.name == c).unwrap(); + if taken[i] { + continue; // ignore duplicate columns inside the index. + } + taken[i] = true; + index_columns.push(table_cols[i].clone().replace_index(index_columns.len())); + } + + let sorted_key_size = index_columns.len() as u64; + // Put the rest of the columns. + for col in aggregate_columns { + index_columns.push(col.column().replace_index(index_columns.len())); + } + + let index = Index::try_new( + index_def.name, + table_id.get_id(), + index_columns, + sorted_key_size, + // Seq column shouldn't participate in partition split. Otherwise we can't do shared nothing calculations across partitions. + None, + None, + IndexType::Aggregate, + )?; + + let index_id = rocks_index.insert(index, batch_pipe)?; + rocks_partition.insert(Partition::new(index_id.id, None, None, None), batch_pipe)?; + Ok(index_id) + } + fn get_table_by_name( schema_name: String, table_name: String, @@ -2980,6 +3175,7 @@ impl MetaStore for RocksMetaStore { indexes: Vec, is_ready: bool, unique_key_column_names: Option>, + aggregates: Option>, partition_split_threshold: Option, ) -> Result, CubeError> { self.write_operation(move |db_ref, batch_pipe| { @@ -3020,6 +3216,47 @@ impl MetaStore for RocksMetaStore { } else { None }; + let aggregate_column_indices = if let Some(aggrs) = aggregates { + let res = aggrs.iter() + .map(|aggr| { + let aggr_column = &aggr.1; + let column = columns + .iter() + .find(|c| &c.name == aggr_column) + .ok_or_else(|| { + CubeError::user(format!( + "Aggregate column {} not found among column definitions {:?}", + aggr_column, columns + )) + })?; + + let index = column.column_index as u64; + if let Some(unique_indices) = &unique_key_column_indices { + if unique_indices.iter().find(|i| i == &&index).is_some() { + return Err(CubeError::user(format!( + "Aggregate column {} is in unique key. A column can't be in an unique key and an aggregation at the same time", + aggr_column + ))); + } + } + let function = aggr.0.parse::()?; + + if !function.allowed_for_type(&column.column_type) { + return Err(CubeError::user( + format!( + "Aggregate function {} not allowed for column type {}", + function, &column.column_type + ) +)) + } + Ok(AggregateColumnIndex::new(index, function)) + }) + .collect::,_>>()?; + + res + } else { + vec![] + }; let table = Table::new( table_name, schema_id.get_id(), @@ -3028,6 +3265,7 @@ impl MetaStore for RocksMetaStore { import_format, is_ready, unique_key_column_indices, + aggregate_column_indices, seq_column_index, partition_split_threshold, ); @@ -3096,6 +3334,7 @@ impl MetaStore for RocksMetaStore { name: "default".to_string(), multi_index: None, columns: def_index_columns, + index_type: IndexType::Regular }, )?; @@ -4822,7 +5061,7 @@ fn swap_active_partitions_impl( let table = PartitionRocksTable::new(db_ref.clone()); let chunk_table = ChunkRocksTable::new(db_ref.clone()); - // Rows are compacted using unique key columns and totals don't match + // Rows are compacted using unique key columns or aggregating index and totals don't match let skip_row_count_sanity_check = if let Some(current) = current_active.first() { let current_partition = table @@ -4833,7 +5072,8 @@ fn swap_active_partitions_impl( )))?; let index = index_table.get_row_or_not_found(current_partition.get_row().get_index_id())?; let table = table_table.get_row_or_not_found(index.get_row().table_id())?; - table.get_row().unique_key_columns().is_some() + index.get_row().get_type() == IndexType::Aggregate + || table.get_row().unique_key_columns().is_some() } else { false }; @@ -4961,6 +5201,7 @@ fn swap_active_partitions_impl( #[cfg(test)] mod tests { + use super::table::AggregateColumn; use super::*; use crate::config::Config; use crate::remotefs::LocalDirRemoteFs; @@ -5203,6 +5444,7 @@ mod tests { true, None, None, + None, ) .await .unwrap(); @@ -5218,6 +5460,7 @@ mod tests { true, None, None, + None, ) .await .unwrap(); @@ -5323,6 +5566,7 @@ mod tests { true, None, None, + None, ) .await .unwrap(); @@ -5340,6 +5584,7 @@ mod tests { true, None, None, + None, ) .await .is_err()); @@ -5359,6 +5604,7 @@ mod tests { columns.len() as u64 - 1, None, None, + Index::index_type_default(), ) .unwrap(); let expected_res = vec![IdRow::new(1, expected_index)]; @@ -5368,6 +5614,167 @@ mod tests { let _ = fs::remove_dir_all(remote_store_path.clone()); } + #[tokio::test] + async fn table_with_aggregate_index_test() { + let config = Config::test("table_with_aggregate_index_test"); + let store_path = env::current_dir() + .unwrap() + .join("test-table-aggregate-local"); + let remote_store_path = env::current_dir() + .unwrap() + .join("test-table-aggregate-remote"); + let _ = fs::remove_dir_all(store_path.clone()); + let _ = fs::remove_dir_all(remote_store_path.clone()); + let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone()); + { + let meta_store = RocksMetaStore::new( + store_path.clone().join("metastore").as_path(), + remote_fs, + config.config_obj(), + ); + + meta_store + .create_schema("foo".to_string(), false) + .await + .unwrap(); + let mut columns = Vec::new(); + columns.push(Column::new("col1".to_string(), ColumnType::Int, 0)); + columns.push(Column::new("col2".to_string(), ColumnType::String, 1)); + columns.push(Column::new("col3".to_string(), ColumnType::Int, 2)); + columns.push(Column::new("aggr_col1".to_string(), ColumnType::Int, 3)); + columns.push(Column::new("aggr_col2".to_string(), ColumnType::Int, 4)); + + let aggr_index_def = IndexDef { + name: "aggr_index".to_string(), + columns: vec!["col2".to_string(), "col1".to_string()], + multi_index: None, + index_type: IndexType::Aggregate, + }; + + let table1 = meta_store + .create_table( + "foo".to_string(), + "boo".to_string(), + columns.clone(), + None, + None, + vec![aggr_index_def.clone()], + true, + None, + Some(vec![ + ("sum".to_string(), "aggr_col2".to_string()), + ("max".to_string(), "aggr_col1".to_string()), + ]), + None, + ) + .await + .unwrap(); + + let table_id = table1.get_id(); + + assert_eq!( + meta_store + .get_table("foo".to_string(), "boo".to_string()) + .await + .unwrap(), + table1 + ); + + let aggr_columns = table1.get_row().aggregate_columns(); + assert_eq!( + aggr_columns[0], + AggregateColumn::new( + Column::new("aggr_col2".to_string(), ColumnType::Int, 4), + AggregateFunction::SUM + ) + ); + assert_eq!( + aggr_columns[1], + AggregateColumn::new( + Column::new("aggr_col1".to_string(), ColumnType::Int, 3), + AggregateFunction::MAX + ) + ); + + let indexes = meta_store.get_table_indexes(table_id).await.unwrap(); + assert_eq!(indexes.len(), 2); + let ind = indexes + .into_iter() + .find(|ind| ind.get_row().get_name() == &aggr_index_def.name) + .unwrap(); + + let index = ind.get_row(); + assert!(match index.get_type() { + IndexType::Aggregate => true, + _ => false, + }); + + let expected_columns = vec![ + Column::new("col2".to_string(), ColumnType::String, 0), + Column::new("col1".to_string(), ColumnType::Int, 1), + Column::new("aggr_col2".to_string(), ColumnType::Int, 2), + Column::new("aggr_col1".to_string(), ColumnType::Int, 3), + ]; + assert_eq!(index.get_columns(), &expected_columns); + + assert!(meta_store + .create_table( + "foo".to_string(), + "boo2".to_string(), + columns.clone(), + None, + None, + vec![aggr_index_def.clone()], + true, + Some(vec!["col2".to_string(), "col1".to_string()]), + Some(vec![ + ("sum".to_string(), "aggr_col2".to_string()), + ("max".to_string(), "col1".to_string()), + ]), + None, + ) + .await + .is_err()); + + assert!(meta_store + .create_table( + "foo".to_string(), + "boo3".to_string(), + columns.clone(), + None, + None, + vec![aggr_index_def.clone()], + true, + Some(vec!["col1".to_string()]), + None, + None, + ) + .await + .is_err()); + + assert!(meta_store + .create_table( + "foo".to_string(), + "boo4".to_string(), + columns.clone(), + None, + None, + vec![aggr_index_def.clone()], + true, + Some(vec!["col1".to_string()]), + Some(vec![ + ("sum".to_string(), "aggr_col2".to_string()), + ("max".to_string(), "aggr_col1".to_string()), + ]), + None, + ) + .await + .is_err()); + } + let _ = fs::remove_dir_all(store_path.clone()); + let _ = fs::remove_dir_all(remote_store_path.clone()); + } + #[tokio::test] async fn cold_start_test() { { @@ -5602,6 +6009,7 @@ mod tests { true, None, None, + None, ) .await .unwrap(); @@ -5722,6 +6130,7 @@ mod tests { true, None, None, + None, ) .await .unwrap(); diff --git a/rust/cubestore/cubestore/src/metastore/table.rs b/rust/cubestore/cubestore/src/metastore/table.rs index 72d9588fcd11..499d8f5f9705 100644 --- a/rust/cubestore/cubestore/src/metastore/table.rs +++ b/rust/cubestore/cubestore/src/metastore/table.rs @@ -1,19 +1,109 @@ use super::{ - BaseRocksSecondaryIndex, Column, ColumnType, IndexId, RocksSecondaryIndex, RocksTable, TableId, + AggregateFunction, BaseRocksSecondaryIndex, Column, ColumnType, DataFrameValue, IndexId, + RocksSecondaryIndex, RocksTable, TableId, }; use crate::data_frame_from; use crate::metastore::{IdRow, ImportFormat, MetaStoreEvent, Schema}; +use crate::queryplanner::udfs::aggregate_udf_by_kind; +use crate::queryplanner::udfs::CubeAggregateUDFKind; use crate::rocks_table_impl; use crate::{base_rocks_secondary_index, CubeError}; +use arrow::datatypes::Schema as ArrowSchema; use byteorder::{BigEndian, WriteBytesExt}; use chrono::DateTime; use chrono::Utc; +use datafusion::physical_plan::expressions::{Column as FusionColumn, Max, Min, Sum}; +use datafusion::physical_plan::{udaf, AggregateExpr, PhysicalExpr}; use itertools::Itertools; use rocksdb::DB; use serde::{Deserialize, Deserializer, Serialize}; use std::io::Write; use std::sync::Arc; +#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] +pub struct AggregateColumnIndex { + index: u64, + function: AggregateFunction, +} + +impl AggregateColumnIndex { + pub fn new(index: u64, function: AggregateFunction) -> Self { + Self { index, function } + } + + pub fn index(&self) -> u64 { + self.index + } + + pub fn function(&self) -> &AggregateFunction { + &self.function + } +} + +impl DataFrameValue for Vec { + fn value(v: &Self) -> String { + v.iter() + .map(|v| format!("{}({})", v.function, v.index)) + .join(", ") + } +} + +#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq)] +pub struct AggregateColumn { + column: Column, + function: AggregateFunction, +} + +impl AggregateColumn { + pub fn new(column: Column, function: AggregateFunction) -> Self { + Self { column, function } + } + + pub fn column(&self) -> &Column { + &self.column + } + + pub fn function(&self) -> &AggregateFunction { + &self.function + } + + pub fn aggregate_expr( + &self, + schema: &ArrowSchema, + ) -> Result, CubeError> { + let col = Arc::new(FusionColumn::new_with_schema( + self.column.get_name().as_str(), + &schema, + )?); + let res: Arc = match self.function { + AggregateFunction::SUM => { + Arc::new(Sum::new(col.clone(), col.name(), col.data_type(schema)?)) + } + AggregateFunction::MAX => { + Arc::new(Max::new(col.clone(), col.name(), col.data_type(schema)?)) + } + AggregateFunction::MIN => { + Arc::new(Min::new(col.clone(), col.name(), col.data_type(schema)?)) + } + AggregateFunction::MERGE => { + let fun = aggregate_udf_by_kind(CubeAggregateUDFKind::MergeHll).descriptor(); + udaf::create_aggregate_expr(&fun, &[col.clone()], schema, col.name())? + } + }; + Ok(res) + } +} + +impl core::fmt::Display for AggregateColumn { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.write_fmt(format_args!( + "{}({})", + self.function, + self.column.get_name() + )) + } +} + data_frame_from! { #[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] pub struct Table { @@ -32,6 +122,8 @@ pub struct Table { #[serde(default)] unique_key_column_indices: Option>, #[serde(default)] + aggregate_column_indices: Vec, + #[serde(default)] seq_column_index: Option, #[serde(default)] location_download_sizes: Option>, @@ -63,6 +155,7 @@ impl Table { import_format: Option, is_ready: bool, unique_key_column_indices: Option>, + aggregate_column_indices: Vec, seq_column_index: Option, partition_split_threshold: Option, ) -> Table { @@ -77,6 +170,7 @@ impl Table { is_ready, created_at: Some(Utc::now()), unique_key_column_indices, + aggregate_column_indices, seq_column_index, location_download_sizes, partition_split_threshold, @@ -168,6 +262,19 @@ impl Table { .map(|indices| indices.iter().map(|i| &self.columns[*i as usize]).collect()) } + pub fn aggregate_columns(&self) -> Vec { + self.aggregate_column_indices + .iter() + .map(|v| { + AggregateColumn::new(self.columns[v.index as usize].clone(), v.function.clone()) + }) + .collect() + } + + pub fn aggregate_column_indices(&self) -> &Vec { + &self.aggregate_column_indices + } + pub fn seq_column(&self) -> Option<&Column> { self.seq_column_index .as_ref() diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_indexes.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_indexes.rs index 987c6383175f..6cce641c19ae 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_indexes.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_indexes.rs @@ -92,6 +92,17 @@ impl InfoSchemaTableDef for SystemIndexesTableDef { )) }), ), + ( + Field::new("index_type", DataType::Utf8, false), + Box::new(|indexes| { + Arc::new(StringArray::from( + indexes + .iter() + .map(|row| format!("{:?}", row.get_row().get_type())) + .collect::>(), + )) + }), + ), ] } } diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_tables.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_tables.rs index c8f7e59ea926..464262966ca4 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_tables.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_tables.rs @@ -139,6 +139,18 @@ impl InfoSchemaTableDef for SystemTablesTableDef { )) }), ), + ( + Field::new("aggregate_columns", DataType::Utf8, true), + Box::new(|tables| { + let aggregates = tables + .iter() + .map(|row| format!("{:?}", row.table.get_row().aggregate_columns())) + .collect::>(); + Arc::new(StringArray::from( + aggregates.iter().map(|v| v.as_str()).collect::>(), + )) + }), + ), ( Field::new("seq_column_index", DataType::Utf8, true), Box::new(|tables| { diff --git a/rust/cubestore/cubestore/src/queryplanner/mod.rs b/rust/cubestore/cubestore/src/queryplanner/mod.rs index f89331c056ab..b569a55c6a17 100644 --- a/rust/cubestore/cubestore/src/queryplanner/mod.rs +++ b/rust/cubestore/cubestore/src/queryplanner/mod.rs @@ -220,6 +220,7 @@ impl ContextProvider for MetaStoreSchemaProvider { None, false, None, + Vec::new(), None, None, ), diff --git a/rust/cubestore/cubestore/src/queryplanner/planning.rs b/rust/cubestore/cubestore/src/queryplanner/planning.rs index 95bbe98fbae1..f470c952f7c2 100644 --- a/rust/cubestore/cubestore/src/queryplanner/planning.rs +++ b/rust/cubestore/cubestore/src/queryplanner/planning.rs @@ -25,6 +25,7 @@ use async_trait::async_trait; use datafusion::error::DataFusionError; use datafusion::execution::context::ExecutionContextState; use datafusion::logical_plan::{DFSchemaRef, Expr, LogicalPlan, Operator, UserDefinedLogicalNode}; +use datafusion::physical_plan::aggregates::AggregateFunction as FusionAggregateFunction; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::planner::ExtensionPlanner; use datafusion::physical_plan::{ @@ -37,7 +38,9 @@ use itertools::Itertools; use crate::cluster::Cluster; use crate::metastore::multi_index::MultiPartition; use crate::metastore::table::{Table, TablePath}; -use crate::metastore::{Chunk, Column, IdRow, Index, MetaStore, Partition, Schema}; +use crate::metastore::{ + AggregateFunction, Chunk, Column, IdRow, Index, IndexType, MetaStore, Partition, Schema, +}; use crate::queryplanner::optimizations::rewrite_plan::{rewrite_plan, PlanRewriter}; use crate::queryplanner::panic::{plan_panic_worker, PanicWorkerNode}; use crate::queryplanner::partition_filter::PartitionFilter; @@ -90,7 +93,7 @@ pub async fn choose_index_ext( ) -> Result<(LogicalPlan, PlanningMeta), DataFusionError> { // Prepare information to choose the index. let mut collector = CollectConstraints::default(); - rewrite_plan(p, &None, &mut collector)?; + rewrite_plan(p, &ConstraintsContext::default(), &mut collector)?; // Consult metastore to choose the index. // TODO should be single snapshot read to ensure read consistency here @@ -243,6 +246,7 @@ struct IndexConstraints { table: TablePath, projection: Option>, filters: Vec, + aggregates: Vec, } #[derive(Default)] @@ -250,8 +254,23 @@ struct CollectConstraints { constraints: Vec, } +#[derive(Default, Clone)] +struct ConstraintsContext { + sort_on: Option, + aggregates: Vec, +} + +impl ConstraintsContext { + pub fn update_sort_on(&self, sort_on: Option) -> Self { + Self { + sort_on, + aggregates: self.aggregates.clone(), + } + } +} + impl PlanRewriter for CollectConstraints { - type Context = Option; + type Context = ConstraintsContext; fn rewrite( &mut self, @@ -267,10 +286,11 @@ impl PlanRewriter for CollectConstraints { } => { let table = source.as_any().downcast_ref::().unwrap(); self.constraints.push(IndexConstraints { - sort_on: c.clone(), + sort_on: c.sort_on.clone(), table: table.table.clone(), projection: projection.clone(), filters: filters.clone(), + aggregates: c.aggregates.clone(), }) } _ => {} @@ -281,8 +301,8 @@ impl PlanRewriter for CollectConstraints { fn enter_node( &mut self, n: &LogicalPlan, - current_sort_on: &Option, - ) -> Option> { + current_context: &Self::Context, + ) -> Option { fn column_name(expr: &Expr) -> Option { match expr { Expr::Alias(e, _) => column_name(e), @@ -291,27 +311,36 @@ impl PlanRewriter for CollectConstraints { } } match n { - LogicalPlan::Aggregate { group_expr, .. } => { + LogicalPlan::Aggregate { + group_expr, + aggr_expr, + .. + } => { let sort_on = group_expr.iter().map(column_name).collect::>(); - if !sort_on.is_empty() && sort_on.iter().all(|c| c.is_some()) { - Some(Some(SortColumns { + let sort_on = if !sort_on.is_empty() && sort_on.iter().all(|c| c.is_some()) { + Some(SortColumns { sort_on: sort_on.into_iter().map(|c| c.unwrap()).collect(), required: false, - })) + }) } else { - Some(None) - } + None + }; + Some(ConstraintsContext { + sort_on, + aggregates: aggr_expr.to_vec(), + }) } LogicalPlan::Filter { predicate, .. } => { let mut sort_on = Vec::new(); if single_value_filter_columns(predicate, &mut sort_on) { if !sort_on.is_empty() { - Some(Some(SortColumns { + let sort_on = Some(SortColumns { sort_on: sort_on .into_iter() .map(|c| c.name.to_string()) .chain( - current_sort_on + current_context + .sort_on .as_ref() .map(|c| c.sort_on.clone()) .unwrap_or_else(|| Vec::new()) @@ -320,9 +349,10 @@ impl PlanRewriter for CollectConstraints { .unique() .collect(), required: false, - })) + }); + Some(current_context.update_sort_on(sort_on)) } else { - Some(current_sort_on.clone()) + Some(current_context.clone()) } } else { None @@ -332,21 +362,20 @@ impl PlanRewriter for CollectConstraints { } } - fn enter_join_left( - &mut self, - join: &LogicalPlan, - _: &Option, - ) -> Option> { + fn enter_join_left(&mut self, join: &LogicalPlan, _: &Self::Context) -> Option { let join_on; if let LogicalPlan::Join { on, .. } = join { join_on = on; } else { panic!("expected join node"); } - Some(Some(SortColumns { - sort_on: join_on.iter().map(|(l, _)| l.name.clone()).collect(), - required: true, - })) + Some(ConstraintsContext { + sort_on: Some(SortColumns { + sort_on: join_on.iter().map(|(l, _)| l.name.clone()).collect(), + required: true, + }), + aggregates: Vec::new(), + }) } fn enter_join_right( @@ -360,10 +389,13 @@ impl PlanRewriter for CollectConstraints { } else { panic!("expected join node"); } - Some(Some(SortColumns { - sort_on: join_on.iter().map(|(_, r)| r.name.clone()).collect(), - required: true, - })) + Some(ConstraintsContext { + sort_on: Some(SortColumns { + sort_on: join_on.iter().map(|(_, r)| r.name.clone()).collect(), + required: true, + }), + aggregates: Vec::new(), + }) } } @@ -475,6 +507,75 @@ struct IndexCandidate { pub partitioned_index: Option, } +fn check_aggregates_expr(table: &IdRow
, aggregates: &Vec) -> bool { + let table_aggregates = table.get_row().aggregate_columns(); + + for aggr in aggregates.iter() { + match aggr { + Expr::AggregateFunction { fun, args, .. } => { + if args.len() != 1 { + return false; + } + + let aggr_fun = match fun { + FusionAggregateFunction::Sum => Some(AggregateFunction::SUM), + FusionAggregateFunction::Max => Some(AggregateFunction::MAX), + FusionAggregateFunction::Min => Some(AggregateFunction::MIN), + _ => None, + }; + + if aggr_fun.is_none() { + return false; + } + + let aggr_fun = aggr_fun.unwrap(); + + let col_match = match &args[0] { + Expr::Column(col) => table_aggregates.iter().any(|ta| { + ta.function() == &aggr_fun && ta.column().get_name() == &col.name + }), + _ => false, + }; + + if !col_match { + return false; + } + } + Expr::AggregateUDF { fun, args } => { + if args.len() != 1 { + return false; + } + + let aggr_fun = match fun.name.to_uppercase().as_str() { + "MERGE" => Some(AggregateFunction::MERGE), + _ => None, + }; + + if aggr_fun.is_none() { + return false; + } + + let aggr_fun = aggr_fun.unwrap(); + + let col_match = match &args[0] { + Expr::Column(col) => table_aggregates.iter().any(|ta| { + ta.function() == &aggr_fun && ta.column().get_name() == &col.name + }), + _ => false, + }; + + if !col_match { + return false; + } + } + _ => { + return false; + } + }; + } + true +} + // Picks the index, but not partitions snapshots. async fn pick_index( c: &IndexConstraints, @@ -484,6 +585,8 @@ async fn pick_index( ) -> Result { let sort_on = c.sort_on.as_ref().map(|sc| (&sc.sort_on, sc.required)); + let aggr_index_allowed = check_aggregates_expr(&table, &c.aggregates); + let default_index = indices.iter().next().expect("no default index"); let (index, mut partitioned_index, sort_on) = if let Some(projection_column_indices) = &c.projection @@ -501,6 +604,35 @@ async fn pick_index( if i.get_row().sort_key_size() < (join_on_columns.len() as u64) { return false; } + let all_columns_in_index = match i.get_row().get_type() { + IndexType::Aggregate => { + if aggr_index_allowed { + let projection_check = projection_columns.iter().all(|c| { + i.get_row() + .get_columns() + .iter() + .find(|ic| ic.get_name() == c.get_name()) + .is_some() + }); + let filter_check = filter_columns.iter().all(|c| { + i.get_row() + .get_columns() + .iter() + .find(|ic| ic.get_name() == &c.name) + .is_some() + }); + + projection_check && filter_check + } else { + false + } + } + _ => true, + }; + + if !all_columns_in_index { + return false; + } let join_columns_in_index = join_on_columns .iter() .map(|c| { @@ -633,29 +765,75 @@ fn optimal_index_by_score<'a, T: Iterator>>( projection_columns: &Vec, filter_columns: &HashSet, ) -> Option<&'a IdRow> { + #[derive(PartialEq, Eq, Clone)] + struct Score { + index_type: IndexType, + index_size: u64, + filter_score: usize, + projection_score: usize, + } + impl PartialOrd for Score { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + impl Ord for Score { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + let res = match self.index_type { + IndexType::Regular => match other.index_type { + IndexType::Regular => core::cmp::Ordering::Equal, + IndexType::Aggregate => core::cmp::Ordering::Greater, + }, + IndexType::Aggregate => match other.index_type { + IndexType::Regular => core::cmp::Ordering::Less, + IndexType::Aggregate => self.index_size.cmp(&other.index_size), + }, + }; + match res { + core::cmp::Ordering::Equal => {} + ord => return ord, + } + match self.filter_score.cmp(&other.filter_score) { + core::cmp::Ordering::Equal => {} + ord => return ord, + } + self.projection_score.cmp(&other.projection_score) + } + } + indexes .filter_map(|i| { - let filter_index_positions = CubeTable::project_to_index_positions( + let index_size = i.get_row().sort_key_size(); + + let filter_score = CubeTable::project_to_index_positions( &filter_columns.iter().map(|c| c.name.to_string()).collect(), &i, - ); - let projected_index_positions = CubeTable::project_to_index_positions( + ) + .into_iter() + .fold_options(0, |a, b| a + b); + + let projection_score = CubeTable::project_to_index_positions( &projection_columns .iter() .map(|c| c.get_name().to_string()) .collect(), &i, - ); - let res = Some(i).zip( - filter_index_positions - .into_iter() - .fold_options(0, |a, b| a + b) - .zip( - projected_index_positions - .into_iter() - .fold_options(0, |a, b| a + b), - ), - ); + ) + .into_iter() + .fold_options(0, |a, b| a + b); + + let index_score = if filter_score.is_some() && projection_score.is_some() { + Some(Score { + index_type: i.get_row().get_type(), + index_size, + filter_score: filter_score.unwrap(), + projection_score: projection_score.unwrap(), + }) + } else { + None + }; + + let res = Some(i).zip(index_score); res }) .min_by_key(|(_, score)| score.clone()) @@ -1485,6 +1663,7 @@ pub mod tests { None, true, None, + Vec::new(), None, None, )); @@ -1496,6 +1675,7 @@ pub mod tests { 1, None, None, + Index::index_type_default(), ) .unwrap(), ); @@ -1508,6 +1688,7 @@ pub mod tests { 1, None, Some(PARTITIONED_INDEX), + Index::index_type_default(), ) .unwrap(), ); @@ -1528,6 +1709,7 @@ pub mod tests { None, true, None, + Vec::new(), None, None, )); @@ -1540,6 +1722,7 @@ pub mod tests { 2, None, None, + Index::index_type_default(), ) .unwrap(), ); @@ -1551,6 +1734,7 @@ pub mod tests { 2, None, None, + Index::index_type_default(), ) .unwrap(), ); @@ -1563,6 +1747,7 @@ pub mod tests { 1, None, Some(PARTITIONED_INDEX), + Index::index_type_default(), ) .unwrap(), ); @@ -1576,6 +1761,7 @@ pub mod tests { None, true, None, + Vec::new(), None, None, )); @@ -1633,6 +1819,7 @@ pub mod tests { t.get_columns().len() as u64, None, None, + Index::index_type_default(), ) .unwrap(), ); diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 76c11b8c914e..32bc6bb9dd97 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -45,7 +45,7 @@ use crate::metastore::multi_index::MultiIndex; use crate::metastore::source::SourceCredentials; use crate::metastore::{ is_valid_plain_binary_hll, table::Table, HllFlavour, IdRow, ImportFormat, Index, IndexDef, - MetaStoreTable, RowKey, Schema, TableId, + IndexType, MetaStoreTable, RowKey, Schema, TableId, }; use crate::queryplanner::panic::PanicWorkerNode; use crate::queryplanner::pretty_printers::{pp_phys_plan, pp_plan}; @@ -185,6 +185,7 @@ impl SqlServiceImpl { import_format: Option, indexes: Vec, unique_key: Option>, + aggregates: Option>, partitioned_index: Option, trace_obj: &Option, ) -> Result, CubeError> { @@ -216,10 +217,18 @@ impl SqlServiceImpl { name: "#mi0".to_string(), columns, multi_index: Some(part_index_name), + index_type: IndexType::Regular, }); } + for index in indexes.iter() { - if let Statement::CreateIndex { name, columns, .. } = index { + if let Statement::CreateIndex { + name, + columns, + unique, + .. + } = index + { indexes_to_create.push(IndexDef { name: name.to_string(), multi_index: None, @@ -236,6 +245,11 @@ impl SqlServiceImpl { } }) .collect::, _>>()?, + index_type: if *unique { + IndexType::Aggregate + } else { + IndexType::Regular + }, }); } } @@ -252,6 +266,11 @@ impl SqlServiceImpl { indexes_to_create, true, unique_key.map(|keys| keys.iter().map(|c| c.value.to_string()).collect()), + aggregates.map(|keys| { + keys.iter() + .map(|c| (c.0.value.to_string(), c.1.value.to_string())) + .collect() + }), None, ) .await; @@ -302,6 +321,11 @@ impl SqlServiceImpl { indexes_to_create, false, unique_key.map(|keys| keys.iter().map(|c| c.value.to_string()).collect()), + aggregates.map(|keys| { + keys.iter() + .map(|c| (c.0.value.to_string(), c.1.value.to_string())) + .collect() + }), partition_split_threshold, ) .await?; @@ -413,6 +437,7 @@ impl SqlServiceImpl { name, multi_index: None, columns: columns.iter().map(|c| c.value.to_string()).collect(), + index_type: IndexType::Regular, //TODO realize aggregate index here too }, ) .await?) @@ -745,6 +770,7 @@ impl SqlService for SqlServiceImpl { .. }, indexes, + aggregates, locations, unique_key, partitioned_index, @@ -790,6 +816,7 @@ impl SqlService for SqlServiceImpl { Some(import_format), indexes, unique_key, + aggregates, partitioned_index, &context.trace_obj, ) @@ -1685,6 +1712,7 @@ mod tests { TableValue::String("true".to_string()), TableValue::String(meta_store.get_table("Foo".to_string(), "Persons".to_string()).await.unwrap().get_row().created_at().as_ref().unwrap().to_string()), TableValue::String("NULL".to_string()), + TableValue::String("".to_string()), TableValue::String("NULL".to_string()), TableValue::String("NULL".to_string()), TableValue::String("NULL".to_string()), @@ -2984,6 +3012,115 @@ mod tests { }).await; }).await; } + #[tokio::test] + async fn create_aggr_index() { + assert!(true); + Config::test("aggregate_index") + .update_config(|mut c| { + c.partition_split_threshold = 10; + c.compaction_chunks_count_threshold = 0; + c + }) + .start_test(async move |services| { + let service = services.sql_service; + service.exec_query("CREATE SCHEMA foo").await.unwrap(); + + let paths = { + let dir = env::temp_dir(); + + let path_2 = dir.clone().join("orders.csv.gz"); + + let mut file = GzipEncoder::new(BufWriter::new( + tokio::fs::File::create(path_2.clone()).await.unwrap(), + )); + + file.write_all("platform,age,gender,cnt,max_id\n".as_bytes()) + .await + .unwrap(); + file.write_all("\"ios\",20,\"M\",10,100\n".as_bytes()) + .await + .unwrap(); + file.write_all("\"android\",20,\"M\",2,10\n".as_bytes()) + .await + .unwrap(); + file.write_all("\"web\",20,\"M\",20,111\n".as_bytes()) + .await + .unwrap(); + + file.write_all("\"ios\",20,\"F\",10,100\n".as_bytes()) + .await + .unwrap(); + file.write_all("\"android\",20,\"F\",2,10\n".as_bytes()) + .await + .unwrap(); + file.write_all("\"web\",22,\"F\",20,115\n".as_bytes()) + .await + .unwrap(); + file.write_all("\"web\",22,\"F\",20,222\n".as_bytes()) + .await + .unwrap(); + + file.shutdown().await.unwrap(); + + services + .injector + .get_service_typed::() + .await + .upload_file(path_2.to_str().unwrap(), "temp-uploads/orders.csv.gz") + .await + .unwrap(); + + vec!["temp://orders.csv.gz".to_string()] + }; + let query = format!( + "CREATE TABLE foo.Orders ( + platform varchar(255), + age int, + gender varchar(2), + cnt int, + max_id int + ) + AGGREGATIONS (sum(cnt), max(max_id)) + INDEX index1 (platform, age) + AGGREGATE INDEX aggr_index (platform, age) + LOCATION {}", + paths.into_iter().map(|p| format!("'{}'", p)).join(",") + ); + service.exec_query(&query).await.unwrap(); + + let indices = services.meta_store.get_table_indexes(1).await.unwrap(); + + let aggr_index = indices + .iter() + .find(|i| i.get_row().get_name() == "aggr_index") + .unwrap(); + + let partitions = services + .meta_store + .get_active_partitions_by_index_id(aggr_index.get_id()) + .await + .unwrap(); + let chunks = services + .meta_store + .get_chunks_by_partition(partitions[0].get_id(), false) + .await + .unwrap(); + + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].get_row().get_row_count(), 4); + + let p = service + .plan_query( + "SELECT platform, age, sum(cnt) FROM foo.Orders GROUP BY platform, age", + ) + .await + .unwrap(); + + let worker_plan = pp_phys_plan(p.worker.as_ref()); + assert!(worker_plan.find("aggr_index").is_some()); + }) + .await; + } } impl SqlServiceImpl { diff --git a/rust/cubestore/cubestore/src/sql/parser.rs b/rust/cubestore/cubestore/src/sql/parser.rs index f4528731cdaa..a8322cff50c4 100644 --- a/rust/cubestore/cubestore/src/sql/parser.rs +++ b/rust/cubestore/cubestore/src/sql/parser.rs @@ -42,6 +42,7 @@ pub enum Statement { indexes: Vec, locations: Option>, unique_key: Option>, + aggregates: Option>, }, CreateSchema { schema_name: ObjectName, @@ -192,10 +193,32 @@ impl<'a> CubeStoreParser<'a> { None }; + let aggregates = if self.parse_custom_token("aggregations") { + self.parser.expect_token(&Token::LParen)?; + let res = self.parser.parse_comma_separated(|p| { + let func = p.parse_identifier()?; + p.expect_token(&Token::LParen)?; + let column = p.parse_identifier()?; + p.expect_token(&Token::RParen)?; + Ok((func, column)) + })?; + self.parser.expect_token(&Token::RParen)?; + Some(res) + } else { + None + }; + let mut indexes = Vec::new(); - while self.parser.parse_keyword(Keyword::INDEX) { - indexes.push(self.parse_with_index(name.clone())?); + loop { + if self.parse_custom_token("aggregate") { + self.parser.expect_keyword(Keyword::INDEX)?; + indexes.push(self.parse_with_index(name.clone(), true)?); + } else if self.parser.parse_keyword(Keyword::INDEX) { + indexes.push(self.parse_with_index(name.clone(), false)?); + } else { + break; + } } let partitioned_index = if self.parser.parse_keywords(&[ @@ -244,6 +267,7 @@ impl<'a> CubeStoreParser<'a> { like, }, indexes, + aggregates, partitioned_index, locations, unique_key, @@ -256,6 +280,7 @@ impl<'a> CubeStoreParser<'a> { pub fn parse_with_index( &mut self, table_name: ObjectName, + is_aggregate: bool, ) -> Result { let index_name = self.parser.parse_object_name()?; self.parser.expect_token(&Token::LParen)?; @@ -263,11 +288,12 @@ impl<'a> CubeStoreParser<'a> { .parser .parse_comma_separated(Parser::parse_order_by_expr)?; self.parser.expect_token(&Token::RParen)?; + //TODO I use unique flag for aggregate index for reusing CreateIndex struct. When adding another type of index, we will need to parse it into a custom structure Ok(SQLStatement::CreateIndex { name: index_name, table_name, columns, - unique: false, + unique: is_aggregate, if_not_exists: false, }) } @@ -297,3 +323,70 @@ impl<'a> CubeStoreParser<'a> { }) } } + +#[cfg(test)] +mod tests { + + use super::*; + use sqlparser::ast::Statement as SQLStatement; + + #[test] + fn parse_aggregate_index() { + let query = "CREATE TABLE foo.Orders ( + id int, + platform varchar(255), + age int, + gender varchar(2), + count int, + max_id int + ) + UNIQUE KEY (id, platform, age, gender) + AGGREGATIONS(sum(count), max(max_id)) + INDEX index1 (platform, age) + AGGREGATE INDEX aggr_index (platform, age) + INDEX index2 (age, platform ) + ;"; + let mut parser = CubeStoreParser::new(&query).unwrap(); + let res = parser.parse_statement().unwrap(); + match res { + Statement::CreateTable { + indexes, + aggregates, + .. + } => { + assert_eq!(aggregates.as_ref().unwrap()[0].0.value, "sum".to_string()); + assert_eq!(aggregates.as_ref().unwrap()[0].1.value, "count".to_string()); + assert_eq!(aggregates.as_ref().unwrap()[1].0.value, "max".to_string()); + assert_eq!( + aggregates.as_ref().unwrap()[1].1.value, + "max_id".to_string() + ); + + assert_eq!(indexes.len(), 3); + + let ind = &indexes[0]; + if let SQLStatement::CreateIndex { + columns, unique, .. + } = ind + { + assert_eq!(columns.len(), 2); + assert_eq!(unique, &false); + } else { + assert!(false); + } + + let ind = &indexes[1]; + if let SQLStatement::CreateIndex { + columns, unique, .. + } = ind + { + assert_eq!(columns.len(), 2); + assert_eq!(unique, &true); + } else { + assert!(false); + } + } + _ => {} + } + } +} diff --git a/rust/cubestore/cubestore/src/store/compaction.rs b/rust/cubestore/cubestore/src/store/compaction.rs index c70d60850081..53cf8e2a60cb 100644 --- a/rust/cubestore/cubestore/src/store/compaction.rs +++ b/rust/cubestore/cubestore/src/store/compaction.rs @@ -2,8 +2,9 @@ use crate::config::injection::DIService; use crate::config::ConfigObj; use crate::metastore::multi_index::MultiPartition; use crate::metastore::partition::partition_file_name; +use crate::metastore::table::AggregateColumn; use crate::metastore::{ - deactivate_table_on_corrupt_data, Chunk, IdRow, MetaStore, Partition, PartitionData, + deactivate_table_on_corrupt_data, Chunk, IdRow, IndexType, MetaStore, Partition, PartitionData, }; use crate::remotefs::{ensure_temp_file_is_dropped, RemoteFs}; use crate::store::{ChunkDataStore, ChunkStore, ROW_GROUP_SIZE}; @@ -295,7 +296,12 @@ impl CompactionService for CompactionServiceImpl { .get_table_by_id(index.get_row().table_id()) .await?; let unique_key = table.get_row().unique_key_columns(); - let records = merge_chunks(key_size, main_table, new, unique_key).await?; + let aggregate_columns = match index.get_row().get_type() { + IndexType::Regular => None, + IndexType::Aggregate => Some(table.get_row().aggregate_columns()), + }; + let records = + merge_chunks(key_size, main_table, new, unique_key, aggregate_columns).await?; let count_and_min = write_to_files(records, total_rows as usize, store, new_local_files2).await?; @@ -485,9 +491,20 @@ impl CompactionService for CompactionServiceImpl { let in_memory_columns = prepare_in_memory_columns(&self.chunk_store, num_columns, key_size, &chunks).await?; + let aggregate_columns = match index.get_row().get_type() { + IndexType::Regular => None, + IndexType::Aggregate => Some(table.get_row().aggregate_columns()), + }; + // Get merged RecordBatch - let batches_stream = - merge_chunks(key_size, main_table, in_memory_columns, unique_key).await?; + let batches_stream = merge_chunks( + key_size, + main_table, + in_memory_columns, + unique_key, + aggregate_columns, + ) + .await?; let batches = collect(batches_stream).await?; let batch = RecordBatch::concat(&schema, &batches).unwrap(); @@ -1025,11 +1042,13 @@ async fn write_to_files_by_keys( Ok(row_counts) } +///Builds a `SendableRecordBatchStream` containing the result of merging a persistent chunk `l` with an in-memory chunk `r` async fn merge_chunks( key_size: usize, l: Arc, r: Vec, unique_key_columns: Option>, + aggregate_columns: Option>, ) -> Result { let schema = l.schema(); let r = RecordBatch::try_new(schema.clone(), r)?; @@ -1046,7 +1065,31 @@ async fn merge_chunks( ]); let mut res: Arc = Arc::new(MergeSortExec::try_new(Arc::new(inputs), key)?); - if let Some(key_columns) = unique_key_columns { + if let Some(aggregate_columns) = aggregate_columns { + let mut groups = Vec::with_capacity(key_size); + let schema = res.schema(); + for i in 0..key_size { + let f = schema.field(i); + let col: Arc = Arc::new(Column::new(f.name().as_str(), i)); + groups.push((col, f.name().clone())); + } + let aggregates = aggregate_columns + .iter() + .map(|aggr_col| aggr_col.aggregate_expr(&res.schema())) + .collect::, _>>()?; + + let output_sort_order = (0..key_size).map(|x| x as usize).collect(); + + res = Arc::new(HashAggregateExec::try_new( + AggregateStrategy::InplaceSorted, + Some(output_sort_order), + AggregateMode::Final, + groups, + aggregates, + res.clone(), + schema, + )?); + } else if let Some(key_columns) = unique_key_columns { res = Arc::new(LastRowByUniqueKeyExec::try_new( res.clone(), key_columns @@ -1058,7 +1101,7 @@ async fn merge_chunks( ) }) .collect::, _>>()?, - )?) + )?); } Ok(res.execute(0).await?) @@ -1068,14 +1111,20 @@ async fn merge_chunks( mod tests { use super::*; use crate::cluster::MockCluster; - use crate::config::{Config, MockConfigObj}; - use crate::metastore::{Column, ColumnType, RocksMetaStore}; + use crate::config::Config; + use crate::config::MockConfigObj; + use crate::metastore::{Column, ColumnType, IndexDef, IndexType, RocksMetaStore}; + use crate::remotefs::LocalDirRemoteFs; use crate::store::MockChunkDataStore; use crate::table::data::rows_to_columns; use crate::table::{cmp_same_types, Row, TableValue}; - use arrow::array::StringArray; + use arrow::array::{Int64Array, StringArray}; use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; + use datafusion::physical_plan::collect; + use rocksdb::{Options, DB}; + use std::fs; + use std::path::PathBuf; #[tokio::test] async fn compaction() { @@ -1098,6 +1147,7 @@ mod tests { true, None, None, + None, ) .await .unwrap(); @@ -1301,6 +1351,7 @@ mod tests { true, None, None, + None, ) .await .unwrap(); @@ -1399,6 +1450,172 @@ mod tests { RocksMetaStore::cleanup_test_metastore("compact_in_memory_chunks"); } + + #[tokio::test] + async fn aggr_index_compaction() { + let config = Config::test("create_aggr_chunk_test").update_config(|mut c| { + c.compaction_chunks_total_size_threshold = 50; + c + }); + let path = "/tmp/test_create_aggr_chunk"; + let chunk_store_path = path.to_string() + &"_store_chunk".to_string(); + let chunk_remote_store_path = path.to_string() + &"_remote_store_chunk".to_string(); + + let _ = DB::destroy(&Options::default(), path); + let _ = fs::remove_dir_all(chunk_store_path.clone()); + let _ = fs::remove_dir_all(chunk_remote_store_path.clone()); + + let remote_fs = LocalDirRemoteFs::new( + Some(PathBuf::from(chunk_remote_store_path.clone())), + PathBuf::from(chunk_store_path.clone()), + ); + let metastore = RocksMetaStore::new(path, remote_fs.clone(), config.config_obj()); + let chunk_store = ChunkStore::new( + metastore.clone(), + remote_fs.clone(), + Arc::new(MockCluster::new()), + config.config_obj(), + 50, + ); + + metastore + .create_schema("foo".to_string(), false) + .await + .unwrap(); + + let ind = IndexDef { + name: "aggr".to_string(), + columns: vec!["foo".to_string(), "boo".to_string()], + multi_index: None, + index_type: IndexType::Aggregate, + }; + let cols = vec![ + Column::new("foo".to_string(), ColumnType::String, 0), + Column::new("boo".to_string(), ColumnType::Int, 1), + Column::new("sum_int".to_string(), ColumnType::Int, 2), + ]; + let table = metastore + .create_table( + "foo".to_string(), + "bar".to_string(), + cols.clone(), + None, + None, + vec![ind], + true, + None, + Some(vec![("sum".to_string(), "sum_int".to_string())]), + None, + ) + .await + .unwrap(); + + let indices = metastore.get_table_indexes(table.get_id()).await.unwrap(); + + let aggr_index = indices + .iter() + .find(|i| i.get_row().get_name() == "aggr") + .unwrap(); + + let partition = &metastore + .get_active_partitions_by_index_id(aggr_index.get_id()) + .await + .unwrap()[0]; + + let data1: Vec = vec![ + Arc::new(StringArray::from(vec![ + "a".to_string(), + "a".to_string(), + "b".to_string(), + "b".to_string(), + "c".to_string(), + ])), + Arc::new(Int64Array::from(vec![1, 10, 2, 20, 10])), + Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])), + ]; + let data2: Vec = vec![ + Arc::new(StringArray::from(vec![ + "a".to_string(), + "a".to_string(), + "b".to_string(), + "b".to_string(), + "c".to_string(), + "c".to_string(), + ])), + Arc::new(Int64Array::from(vec![1, 10, 2, 20, 10, 30])), + Arc::new(Int64Array::from(vec![10, 20, 30, 40, 50, 60])), + ]; + + let (chunk, _) = chunk_store + .add_chunk_columns(aggr_index.clone(), partition.clone(), data1.clone(), false) + .await + .unwrap() + .await + .unwrap() + .unwrap(); + metastore.chunk_uploaded(chunk.get_id()).await.unwrap(); + + let (chunk, _) = chunk_store + .add_chunk_columns(aggr_index.clone(), partition.clone(), data2.clone(), false) + .await + .unwrap() + .await + .unwrap() + .unwrap(); + metastore.chunk_uploaded(chunk.get_id()).await.unwrap(); + + let compaction_service = CompactionServiceImpl::new( + metastore.clone(), + chunk_store.clone(), + remote_fs.clone(), + config.config_obj(), + ); + compaction_service + .compact(partition.get_id()) + .await + .unwrap(); + + let partitions = metastore + .get_active_partitions_by_index_id(aggr_index.get_id()) + .await + .unwrap(); + assert_eq!(partitions.len(), 1); + let partition = &partitions[0]; + assert_eq!(partition.get_row().main_table_row_count(), 6); + + let remote = partition + .get_row() + .get_full_name(partition.get_id()) + .unwrap(); + let local = remote_fs + .download_file(&remote, partition.get_row().file_size()) + .await + .unwrap(); + let reader = Arc::new( + ParquetExec::try_from_path(local.as_str(), None, None, ROW_GROUP_SIZE, 1, None) + .unwrap(), + ); + let res_data = &collect(reader).await.unwrap()[0]; + + let foos = Arc::new(StringArray::from(vec![ + "a".to_string(), + "a".to_string(), + "b".to_string(), + "b".to_string(), + "c".to_string(), + "c".to_string(), + ])); + let boos = Arc::new(Int64Array::from(vec![1, 10, 2, 20, 10, 30])); + + let sums = Arc::new(Int64Array::from(vec![11, 22, 33, 44, 55, 60])); + let expected: Vec = vec![foos, boos, sums]; + + assert_eq!(res_data.columns(), &expected); + + let _ = DB::destroy(&Options::default(), path); + let _ = fs::remove_dir_all(chunk_store_path.clone()); + let _ = fs::remove_dir_all(chunk_remote_store_path.clone()); + } } struct MultiSplit { diff --git a/rust/cubestore/cubestore/src/store/mod.rs b/rust/cubestore/cubestore/src/store/mod.rs index aaad27cb06f9..c65d87380937 100644 --- a/rust/cubestore/cubestore/src/store/mod.rs +++ b/rust/cubestore/cubestore/src/store/mod.rs @@ -1,8 +1,13 @@ pub mod compaction; use async_trait::async_trait; +use datafusion::physical_plan::collect; +use datafusion::physical_plan::expressions::Column as FusionColumn; +use datafusion::physical_plan::hash_aggregate::{ + AggregateMode, AggregateStrategy, HashAggregateExec, +}; use datafusion::physical_plan::memory::MemoryExec; -use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::{ExecutionPlan, PhysicalExpr}; use serde::{de, Deserialize, Serialize}; extern crate bincode; @@ -10,7 +15,7 @@ use bincode::{deserialize_from, serialize_into}; use crate::metastore::{ deactivate_table_on_corrupt_data, table::Table, Chunk, Column, ColumnType, IdRow, Index, - MetaStore, Partition, WAL, + IndexType, MetaStore, Partition, WAL, }; use crate::remotefs::{ensure_temp_file_is_dropped, RemoteFs}; use crate::table::{Row, TableValue}; @@ -524,10 +529,11 @@ mod tests { use crate::assert_eq_columns; use crate::cluster::MockCluster; use crate::config::Config; - use crate::metastore::RocksMetaStore; + use crate::metastore::{IndexDef, IndexType, RocksMetaStore}; use crate::remotefs::LocalDirRemoteFs; use crate::table::data::{concat_record_batches, rows_to_columns}; use crate::{metastore::ColumnType, table::TableValue}; + use arrow::array::{Int64Array, StringArray}; use rocksdb::{Options, DB}; use std::fs; use std::path::PathBuf; @@ -587,6 +593,7 @@ mod tests { true, None, None, + None, ) .await .unwrap(); @@ -670,6 +677,7 @@ mod tests { true, None, None, + None, ) .await .unwrap(); @@ -704,6 +712,118 @@ mod tests { let _ = fs::remove_dir_all(chunk_store_path.clone()); let _ = fs::remove_dir_all(chunk_remote_store_path.clone()); } + #[tokio::test] + async fn create_aggr_chunk_test() { + let config = Config::test("create_aggr_chunk_test"); + let path = "/tmp/test_create_aggr_chunk"; + let chunk_store_path = path.to_string() + &"_store_chunk".to_string(); + let chunk_remote_store_path = path.to_string() + &"_remote_store_chunk".to_string(); + + let _ = DB::destroy(&Options::default(), path); + let _ = fs::remove_dir_all(chunk_store_path.clone()); + let _ = fs::remove_dir_all(chunk_remote_store_path.clone()); + { + let remote_fs = LocalDirRemoteFs::new( + Some(PathBuf::from(chunk_remote_store_path.clone())), + PathBuf::from(chunk_store_path.clone()), + ); + let meta_store = RocksMetaStore::new(path, remote_fs.clone(), config.config_obj()); + let chunk_store = ChunkStore::new( + meta_store.clone(), + remote_fs.clone(), + Arc::new(MockCluster::new()), + config.config_obj(), + 10, + ); + + let col = vec![ + Column::new("foo".to_string(), ColumnType::String, 0), + Column::new("boo".to_string(), ColumnType::Int, 1), + Column::new("sum_int".to_string(), ColumnType::Int, 2), + ]; + + let foos = Arc::new(StringArray::from(vec![ + "a".to_string(), + "b".to_string(), + "a".to_string(), + "b".to_string(), + "a".to_string(), + ])); + let boos = Arc::new(Int64Array::from(vec![10, 20, 10, 20, 20])); + + let sums = Arc::new(Int64Array::from(vec![1, 2, 10, 20, 5])); + + meta_store + .create_schema("foo".to_string(), false) + .await + .unwrap(); + + let ind = IndexDef { + name: "aggr".to_string(), + columns: vec!["foo".to_string(), "boo".to_string()], + multi_index: None, + index_type: IndexType::Aggregate, + }; + let table = meta_store + .create_table( + "foo".to_string(), + "bar".to_string(), + col.clone(), + None, + None, + vec![ind], + true, + None, + Some(vec![("sum".to_string(), "sum_int".to_string())]), + None, + ) + .await + .unwrap(); + + let data: Vec = vec![foos, boos, sums]; + + let indices = meta_store.get_table_indexes(table.get_id()).await.unwrap(); + + let aggr_index = indices + .iter() + .find(|i| i.get_row().get_name() == "aggr") + .unwrap(); + let chunk_feats = join_all( + chunk_store + .partition_rows(aggr_index.get_id(), data, false) + .await + .unwrap(), + ) + .await + .into_iter() + .map(|c| { + let (c, _) = c.unwrap().unwrap(); + let cstore = chunk_store.clone(); + let mstore = meta_store.clone(); + async move { + let c = mstore.chunk_uploaded(c.get_id()).await.unwrap(); + let batches = cstore.get_chunk_columns(c).await.unwrap(); + RecordBatch::concat(&batches[0].schema(), &batches).unwrap() + } + }) + .collect::>(); + + let chunks = join_all(chunk_feats).await; + + let res = RecordBatch::concat(&chunks[0].schema(), &chunks).unwrap(); + + let foos = Arc::new(StringArray::from(vec![ + "a".to_string(), + "a".to_string(), + "b".to_string(), + ])); + let boos = Arc::new(Int64Array::from(vec![10, 20, 20])); + + let sums = Arc::new(Int64Array::from(vec![11, 5, 22])); + let expected: Vec = vec![foos, boos, sums]; + assert_eq!(res.columns(), &expected); + } + } } pub type ChunkUploadJob = JoinHandle, Option), CubeError>>; @@ -765,6 +885,7 @@ impl ChunkStore { .iter() .map(|c| arrow::compute::take(c.as_ref(), &to_write, None)) .collect::, _>>()?; + let columns = self.post_process_columns(index.clone(), columns).await?; new_chunks.push( self.add_chunk_columns(index.clone(), partition, columns, in_memory) .await?, @@ -778,6 +899,70 @@ impl ChunkStore { Ok(new_chunks) } + ///Post-processing of index columns chunk data before saving to parqet files. + ///Suitable for pre-aggregaions and similar things + ///`data` must be sorted in order of index columns + async fn post_process_columns( + &self, + index: IdRow, + data: Vec, + ) -> Result, CubeError> { + match index.get_row().get_type() { + IndexType::Regular => Ok(data), + IndexType::Aggregate => { + let table = self + .meta_store + .get_table_by_id(index.get_row().table_id()) + .await?; + let schema = Arc::new(arrow_schema(&index.get_row())); + + let batch = RecordBatch::try_new(schema.clone(), data)?; + + let input = Arc::new(MemoryExec::try_new(&[vec![batch]], schema.clone(), None)?); + + let key_size = index.get_row().sort_key_size() as usize; + let mut groups = Vec::with_capacity(key_size); + for i in 0..key_size { + let f = schema.field(i); + let col: Arc = + Arc::new(FusionColumn::new(f.name().as_str(), i)); + groups.push((col, f.name().clone())); + } + + let aggregates = table + .get_row() + .aggregate_columns() + .iter() + .map(|aggr_col| aggr_col.aggregate_expr(&schema)) + .collect::, _>>()?; + + let output_sort_order = (0..index.get_row().sort_key_size()) + .map(|x| x as usize) + .collect(); + + let aggregate = Arc::new(HashAggregateExec::try_new( + AggregateStrategy::InplaceSorted, + Some(output_sort_order), + AggregateMode::Final, + groups, + aggregates, + input, + schema.clone(), + )?); + + let batches = collect(aggregate).await?; + if batches.is_empty() { + Ok(vec![]) + } else if batches.len() == 1 { + Ok(batches[0].columns().to_vec()) + } else { + let res = RecordBatch::concat(&schema, &batches).unwrap(); + Ok(res.columns().to_vec()) + } + } + } + } + /// Processes data into parquet files in the current task and schedules an async file upload. /// Join the returned handle to wait for the upload to finish. async fn add_chunk_columns( diff --git a/rust/cubestore/cubestore/src/table/parquet.rs b/rust/cubestore/cubestore/src/table/parquet.rs index 80dcf34b9fd4..5d83174b944d 100644 --- a/rust/cubestore/cubestore/src/table/parquet.rs +++ b/rust/cubestore/cubestore/src/table/parquet.rs @@ -148,6 +148,7 @@ mod tests { 6, None, None, + Index::index_type_default(), ) .unwrap(); @@ -238,6 +239,7 @@ mod tests { 3, None, None, + Index::index_type_default(), ) .unwrap(), row_group_size: 10, @@ -348,6 +350,7 @@ mod tests { 1, None, None, + Index::index_type_default(), ) .unwrap(); let tmp_file = NamedTempFile::new().unwrap(); @@ -395,6 +398,7 @@ mod tests { 1, None, None, + Index::index_type_default(), ) .unwrap();