Skip to content

Commit

Permalink
feat(cubestore): Aggregating index (#4379)
Browse files Browse the repository at this point in the history
* feat(cubestore): Storing aggregating index (#4599)
* feat(cubestore): Using aggregating indexes in queries (#4728)
  • Loading branch information
waralex committed Jun 23, 2022
1 parent b81e0b0 commit a0bd36c
Show file tree
Hide file tree
Showing 13 changed files with 1,710 additions and 68 deletions.
268 changes: 268 additions & 0 deletions rust/cubestore/cubestore-sql-tests/src/tests.rs
Expand Up @@ -179,6 +179,10 @@ pub fn sql_tests() -> Vec<(&'static str, TestFn)> {
),
t("panic_worker", panic_worker),
t("planning_filter_index_selection", planning_filter_index_selection),
t("planning_aggregate_index", planning_aggregate_index),
t("aggregate_index", aggregate_index),
t("aggregate_index_hll", aggregate_index_hll),
t("aggregate_index_errors", aggregate_index_errors),
];

fn t<F>(name: &'static str, f: fn(Box<dyn SqlClient>) -> F) -> (&'static str, TestFn)
Expand Down Expand Up @@ -4747,6 +4751,270 @@ async fn filter_multiple_in_for_decimal(service: Box<dyn SqlClient>) {
assert_eq!(to_rows(&r), rows(&[(2)]));
}

async fn planning_aggregate_index(service: Box<dyn SqlClient>) {
service.exec_query("CREATE SCHEMA s").await.unwrap();
service
.exec_query("CREATE TABLE s.Orders(a int, b int, c int, a_sum int, a_max int, a_min int, a_merge HYPERLOGLOG)
AGGREGATIONS(sum(a_sum), max(a_max), min(a_min), merge(a_merge))
INDEX reg_index (a, b)
AGGREGATE INDEX aggr_index (a, b)
")
.await
.unwrap();

let p = service
.plan_query("SELECT a, b, sum(a_sum) FROM s.Orders GROUP BY 1, 2")
.await
.unwrap();
assert_eq!(
pp_phys_plan(p.worker.as_ref()),
"Projection, [a, b, SUM(s.Orders.a_sum)@2:SUM(a_sum)]\
\n FinalInplaceAggregate\
\n Worker\
\n PartialInplaceAggregate\
\n MergeSort\
\n Scan, index: aggr_index:2:[2]:sort_on[a, b], fields: [a, b, a_sum]\
\n Empty"
);

let p = service
.plan_query("SELECT a, b, sum(a_sum), max(a_max), min(a_min), merge(a_merge) FROM s.Orders GROUP BY 1, 2")
.await
.unwrap();
assert_eq!(
pp_phys_plan(p.worker.as_ref()),
"Projection, [a, b, SUM(s.Orders.a_sum)@2:SUM(a_sum), MAX(s.Orders.a_max)@3:MAX(a_max), MIN(s.Orders.a_min)@4:MIN(a_min), MERGE(s.Orders.a_merge)@5:MERGE(a_merge)]\
\n FinalInplaceAggregate\
\n Worker\
\n PartialInplaceAggregate\
\n MergeSort\
\n Scan, index: aggr_index:2:[2]:sort_on[a, b], fields: *\
\n Empty"
);

let p = service
.plan_query("SELECT a, b, sum(a_sum), max(a_max), min(a_min), merge(a_merge) FROM s.Orders WHERE c = 1 GROUP BY 1, 2")
.await
.unwrap();
assert_eq!(
pp_phys_plan(p.worker.as_ref()),
"Projection, [a, b, SUM(s.Orders.a_sum)@2:SUM(a_sum), MAX(s.Orders.a_max)@3:MAX(a_max), MIN(s.Orders.a_min)@4:MIN(a_min), MERGE(s.Orders.a_merge)@5:MERGE(a_merge)]\
\n FinalInplaceAggregate\
\n Worker\
\n PartialInplaceAggregate\
\n Filter\
\n MergeSort\
\n Scan, index: default:3:[3]:sort_on[a, b, c], fields: *\
\n Empty"
);

let p = service
.plan_query("SELECT a, sum(a_sum), max(a_max), min(a_min), merge(a_merge) FROM s.Orders GROUP BY 1")
.await
.unwrap();
assert_eq!(
pp_phys_plan(p.worker.as_ref()),
"Projection, [a, SUM(s.Orders.a_sum)@1:SUM(a_sum), MAX(s.Orders.a_max)@2:MAX(a_max), MIN(s.Orders.a_min)@3:MIN(a_min), MERGE(s.Orders.a_merge)@4:MERGE(a_merge)]\
\n FinalInplaceAggregate\
\n Worker\
\n PartialInplaceAggregate\
\n MergeSort\
\n Scan, index: aggr_index:2:[2]:sort_on[a], fields: [a, a_sum, a_max, a_min, a_merge]\
\n Empty"
);

let p = service
.plan_query("SELECT a, avg(a_sum) FROM s.Orders GROUP BY 1")
.await
.unwrap();
assert_eq!(
pp_phys_plan(p.worker.as_ref()),
"Projection, [a, AVG(s.Orders.a_sum)@1:AVG(a_sum)]\
\n FinalInplaceAggregate\
\n Worker\
\n PartialInplaceAggregate\
\n MergeSort\
\n Scan, index: reg_index:1:[1]:sort_on[a], fields: [a, a_sum]\
\n Empty"
);

let p = service
.plan_query("SELECT a, sum(a_sum) FROM s.Orders WHERE b = 1 GROUP BY 1")
.await
.unwrap();
assert_eq!(
pp_phys_plan(p.worker.as_ref()),
"Projection, [a, SUM(s.Orders.a_sum)@1:SUM(a_sum)]\
\n FinalInplaceAggregate\
\n Worker\
\n PartialInplaceAggregate\
\n Filter\
\n MergeSort\
\n Scan, index: aggr_index:2:[2]:sort_on[a, b], fields: [a, b, a_sum]\
\n Empty"
);
}

async fn aggregate_index(service: Box<dyn SqlClient>) {
service.exec_query("CREATE SCHEMA s").await.unwrap();
service
.exec_query("CREATE TABLE s.Orders(a int, b int, c int, a_sum int, a_max int, a_min int)
AGGREGATIONS(sum(a_sum), max(a_max), min(a_min))
INDEX reg_index (a, b)
AGGREGATE INDEX aggr_index (a, b)
")
.await
.unwrap();
service
.exec_query(
"INSERT INTO s.Orders (a, b, c, a_sum, a_max, a_min) VALUES (1, 10, 100, 10, 10, 10), \
(2, 20, 200, 10, 10, 10), \
(1, 10, 300, 20, 20, 20), \
(2, 30, 200, 100, 100, 100), \
(1, 10, 400, 20, 40, 40), \
(2, 20, 410, 20, 30, 30) \
").await.unwrap();

let res = service
.exec_query("SELECT a, b, sum(a_sum) as sum, max(a_max) as max, min(a_min) as min FROM s.Orders GROUP BY 1, 2 ORDER BY 1, 2")
.await
.unwrap();

assert_eq!(
to_rows(&res),
[
[TableValue::Int(1), TableValue::Int(10), TableValue::Int(50), TableValue::Int(40), TableValue::Int(10)],
[TableValue::Int(2), TableValue::Int(20), TableValue::Int(30), TableValue::Int(30), TableValue::Int(10)],
[TableValue::Int(2), TableValue::Int(30), TableValue::Int(100), TableValue::Int(100), TableValue::Int(100)],
]
);

let res = service
.exec_query("SELECT a, sum(a_sum) as sum, max(a_max) as max, min(a_min) as min FROM s.Orders GROUP BY 1 ORDER BY 1")
.await
.unwrap();

assert_eq!(
to_rows(&res),
[
[TableValue::Int(1), TableValue::Int(50), TableValue::Int(40), TableValue::Int(10)],
[TableValue::Int(2), TableValue::Int(130), TableValue::Int(100), TableValue::Int(10)],
]
);

let res = service
.exec_query("SELECT a, sum(a_sum) as sum, max(a_max) as max, min(a_min) as min FROM s.Orders WHERE b = 20 GROUP BY 1 ORDER BY 1")
.await
.unwrap();

assert_eq!(
to_rows(&res),
[
[TableValue::Int(2), TableValue::Int(30), TableValue::Int(30), TableValue::Int(10)],
]
);
}

async fn aggregate_index_hll(service: Box<dyn SqlClient>) {
service.exec_query("CREATE SCHEMA s").await.unwrap();
service
.exec_query("CREATE TABLE s.Orders(a int, b int, a_hll hyperloglog)
AGGREGATIONS(merge(a_hll))
AGGREGATE INDEX aggr_index (a, b)
")
.await
.unwrap();
service
.exec_query(
"INSERT INTO s.Orders (a, b, a_hll) VALUES \
(1, 10, X'020C0200C02FF58941D5F0C6'), \
(1, 20, X'020C0200C02FF58941D5F0C6'), \
(1, 10, X'020C0200C02FF58941D5F0C6'), \
(1, 20, X'020C0200C02FF58941D5F0C6') \
").await.unwrap();

let res = service
.exec_query("SELECT a, b, cardinality(merge(a_hll)) as hll FROM s.Orders GROUP BY 1, 2 ORDER BY 1, 2")
.await
.unwrap();
assert_eq!(
to_rows(&res),
[
[TableValue::Int(1), TableValue::Int(10), TableValue::Int(2)],
[TableValue::Int(1), TableValue::Int(20), TableValue::Int(2)],
]
);

let res = service
.exec_query("SELECT a, cardinality(merge(a_hll)) as hll FROM s.Orders WHERE b = 20 GROUP BY 1 ORDER BY 1")
.await
.unwrap();
assert_eq!(
to_rows(&res),
[
[TableValue::Int(1), TableValue::Int(2)],
]
);

let res = service
.exec_query("SELECT a, cardinality(merge(a_hll)) as hll FROM s.Orders GROUP BY 1 ORDER BY 1")
.await
.unwrap();
assert_eq!(
to_rows(&res),
[
[TableValue::Int(1), TableValue::Int(2)],
]
);

}

async fn aggregate_index_errors(service: Box<dyn SqlClient>) {
service.exec_query("CREATE SCHEMA s").await.unwrap();
service
.exec_query("CREATE TABLE s.Orders(a int, b int, a_hll hyperloglog)
AGGREGATE INDEX aggr_index (a, b, a_hll)
")
.await
.expect_err("Can't create aggregate index for table 'Orders' because aggregate columns (`AGGREGATIONS`) not specified for the table");

service
.exec_query("CREATE TABLE s.Orders(a int, b int, a_hll hyperloglog)
AGGREGATIONS(merge(a_hll))
AGGREGATE INDEX aggr_index (a, b, a_hll)
")
.await
.expect_err("Column 'a_hll' in aggregate index 'aggr_index' is in aggregations list for table 'Orders'. Aggregate index columns must be outside of aggregations list.");

service
.exec_query("CREATE TABLE s.Orders(a int, b string, a_hll hyperloglog)
AGGREGATIONS (sum(a), sum(b))
")
.await
.expect_err("Aggregate function SUM not allowed for column type text");

service
.exec_query("CREATE TABLE s.Orders(a int, b string, a_hll hyperloglog)
AGGREGATIONS (sum(a), max(b), sum(a_hll))
")
.await
.expect_err("Aggregate function SUM not allowed for column type hyperloglog");

service
.exec_query("CREATE TABLE s.Orders(a int, b string, a_hll hyperloglog)
AGGREGATIONS (max(a_hll))
")
.await
.expect_err("Aggregate function MAX not allowed for column type hyperloglog");

service
.exec_query("CREATE TABLE s.Orders(a int, b string, a_hll hyperloglog)
AGGREGATIONS (merge(a))
")
.await
.expect_err("Aggregate function MERGE not allowed for column type integer");

}
async fn panic_worker(service: Box<dyn SqlClient>) {
let r = service.exec_query("SYS PANIC WORKER").await;
assert_eq!(r, Err(CubeError::panic("worker panic".to_string())));
Expand Down
13 changes: 12 additions & 1 deletion rust/cubestore/cubestore/src/metastore/index.rs
@@ -1,5 +1,6 @@
use super::{
BaseRocksSecondaryIndex, Column, Index, IndexId, RocksSecondaryIndex, RocksTable, TableId,
BaseRocksSecondaryIndex, Column, Index, IndexId, IndexType, RocksSecondaryIndex, RocksTable,
TableId,
};
use crate::metastore::{IdRow, MetaStoreEvent};
use crate::{rocks_table_impl, CubeError};
Expand All @@ -16,6 +17,7 @@ impl Index {
sort_key_size: u64,
partition_split_key_size: Option<u64>,
multi_index_id: Option<u64>,
index_type: IndexType,
) -> Result<Index, CubeError> {
if sort_key_size == 0 {
return Err(CubeError::user(format!(
Expand All @@ -30,6 +32,7 @@ impl Index {
sort_key_size,
partition_split_key_size,
multi_index_id,
index_type,
})
}

Expand All @@ -41,6 +44,10 @@ impl Index {
&self.name
}

pub fn get_type(&self) -> IndexType {
self.index_type.clone()
}

pub fn columns(&self) -> &Vec<Column> {
&self.columns
}
Expand All @@ -61,6 +68,10 @@ impl Index {
pub fn multi_index_id(&self) -> Option<u64> {
self.multi_index_id
}

pub fn index_type_default() -> IndexType {
IndexType::Regular
}
}

#[derive(Clone, Copy, Debug)]
Expand Down

0 comments on commit a0bd36c

Please sign in to comment.