diff --git a/src/partition/src/error.rs b/src/partition/src/error.rs index 6bfe76fa5b8..ed4ba151e6f 100644 --- a/src/partition/src/error.rs +++ b/src/partition/src/error.rs @@ -23,6 +23,8 @@ use snafu::{Location, Snafu}; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; +use crate::expr::PartitionExpr; + #[derive(Snafu)] #[snafu(visibility(pub))] #[stack_trace_debug] @@ -126,14 +128,40 @@ pub enum Error { err_msg: String, source: common_meta::error::Error, }, + + #[snafu(display("Conjunct expr with non-expr is invalid"))] + ConjunctExprWithNonExpr { + expr: PartitionExpr, + location: Location, + }, + + #[snafu(display("Unclosed value {} on column {}", value, column))] + UnclosedValue { + value: String, + column: String, + location: Location, + }, + + #[snafu(display("Invalid partition expr: {:?}", expr))] + InvalidExpr { + expr: PartitionExpr, + location: Location, + }, + + #[snafu(display("Undefined column: {}", column))] + UndefinedColumn { column: String, location: Location }, } impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { Error::GetCache { .. } | Error::FindLeader { .. } => StatusCode::StorageUnavailable, - Error::FindRegionRoutes { .. } => StatusCode::InvalidArguments, - Error::FindTableRoutes { .. } => StatusCode::InvalidArguments, + Error::FindRegionRoutes { .. } + | Error::ConjunctExprWithNonExpr { .. } + | Error::UnclosedValue { .. } + | Error::InvalidExpr { .. } + | Error::FindTableRoutes { .. } + | Error::UndefinedColumn { .. } => StatusCode::InvalidArguments, Error::FindRegion { .. } | Error::FindRegions { .. } | Error::RegionKeysSize { .. } diff --git a/src/partition/src/lib.rs b/src/partition/src/lib.rs index 647e7f63b85..e088c7c841b 100644 --- a/src/partition/src/lib.rs +++ b/src/partition/src/lib.rs @@ -14,6 +14,8 @@ #![feature(assert_matches)] +//! Structs and traits for partitioning rule. + pub mod columns; pub mod error; pub mod expr; diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index eb413c63417..34f67c95a80 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -140,11 +140,9 @@ impl PartitionRuleManager { _ => None, }) .collect::>(); - Ok(Arc::new(MultiDimPartitionRule::new( - partition_columns.clone(), - regions, - exprs, - )) as _) + + let rule = MultiDimPartitionRule::try_new(partition_columns.clone(), regions, exprs)?; + Ok(Arc::new(rule) as _) } /// Get partition rule of given table. diff --git a/src/partition/src/multi_dim.rs b/src/partition/src/multi_dim.rs index 0d7d9587de4..82612868f12 100644 --- a/src/partition/src/multi_dim.rs +++ b/src/partition/src/multi_dim.rs @@ -18,40 +18,57 @@ use std::collections::HashMap; use datatypes::prelude::Value; use serde::{Deserialize, Serialize}; -use snafu::ensure; +use snafu::{ensure, OptionExt}; use store_api::storage::RegionNumber; -use crate::error::{self, Result}; +use crate::error::{ + self, ConjunctExprWithNonExprSnafu, InvalidExprSnafu, Result, UnclosedValueSnafu, + UndefinedColumnSnafu, +}; use crate::expr::{Operand, PartitionExpr, RestrictedOp}; use crate::PartitionRule; +/// Multi-Dimiension partition rule. RFC [here](https://github.com/GreptimeTeam/greptimedb/blob/main/docs/rfcs/2024-02-21-multi-dimension-partition-rule/rfc.md) +/// +/// This partition rule is defined by a set of simple expressions on the partition +/// key columns. Compare to RANGE partition, which can be considered as +/// single-dimension rule, this will evaluate expression on each column separately. #[derive(Debug, Serialize, Deserialize)] pub struct MultiDimPartitionRule { + /// Allow list of which columns can be used for partitioning. partition_columns: Vec, - // name to index of `partition_columns` + /// Name to index of `partition_columns`. Used for quick lookup. name_to_index: HashMap, + /// Region number for each partition. This list has the same length as `exprs` + /// (dispiting the default region). regions: Vec, + /// Partition expressions. exprs: Vec, } impl MultiDimPartitionRule { - pub fn new( + pub fn try_new( partition_columns: Vec, regions: Vec, exprs: Vec, - ) -> Self { + ) -> Result { let name_to_index = partition_columns .iter() .enumerate() .map(|(i, name)| (name.clone(), i)) .collect::>(); - Self { + let rule = Self { partition_columns, name_to_index, regions, exprs, - } + }; + + let mut checker = RuleChecker::new(&rule); + checker.check(&rule)?; + + Ok(rule) } fn find_region(&self, values: &[Value]) -> Result { @@ -140,12 +157,142 @@ impl PartitionRule for MultiDimPartitionRule { } } +/// Helper for [RuleChecker] +type Axis = HashMap; + +/// Helper for [RuleChecker] +struct SplitPoint { + is_equal: bool, + less_than_counter: isize, +} + +/// Check if the rule set covers all the possible values. +/// +/// Note this checker have false-negative on duplicated exprs. E.g.: +/// `a != 20`, `a <= 20` and `a > 20`. +/// +/// It works on the observation that each projected split point should be included (`is_equal`) +/// and have a balanced `<` and `>` counter. +struct RuleChecker<'a> { + axis: Vec, + rule: &'a MultiDimPartitionRule, +} + +impl<'a> RuleChecker<'a> { + pub fn new(rule: &'a MultiDimPartitionRule) -> Self { + let mut projections = Vec::with_capacity(rule.partition_columns.len()); + projections.resize_with(rule.partition_columns.len(), Default::default); + + Self { + axis: projections, + rule, + } + } + + pub fn check(&mut self, rule: &MultiDimPartitionRule) -> Result<()> { + for expr in &rule.exprs { + self.walk_expr(expr)? + } + + self.check_axis() + } + + #[allow(clippy::mutable_key_type)] + fn walk_expr(&mut self, expr: &PartitionExpr) -> Result<()> { + // recursively check the expr + match expr.op { + RestrictedOp::And | RestrictedOp::Or => { + match (expr.lhs.as_ref(), expr.rhs.as_ref()) { + (Operand::Expr(lhs), Operand::Expr(rhs)) => { + self.walk_expr(lhs)?; + self.walk_expr(rhs)? + } + _ => ConjunctExprWithNonExprSnafu { expr: expr.clone() }.fail()?, + } + + return Ok(()); + } + // Not conjunction + _ => {} + } + + let (col, val) = match (expr.lhs.as_ref(), expr.rhs.as_ref()) { + (Operand::Expr(_), _) + | (_, Operand::Expr(_)) + | (Operand::Column(_), Operand::Column(_)) + | (Operand::Value(_), Operand::Value(_)) => { + InvalidExprSnafu { expr: expr.clone() }.fail()? + } + + (Operand::Column(col), Operand::Value(val)) + | (Operand::Value(val), Operand::Column(col)) => (col, val), + }; + + let col_index = + *self + .rule + .name_to_index + .get(col) + .with_context(|| UndefinedColumnSnafu { + column: col.clone(), + })?; + let axis = &mut self.axis[col_index]; + let split_point = axis.entry(val.clone()).or_insert(SplitPoint { + is_equal: false, + less_than_counter: 0, + }); + match expr.op { + RestrictedOp::Eq => { + split_point.is_equal = true; + } + RestrictedOp::NotEq => { + // less_than +1 -1 + } + RestrictedOp::Lt => { + split_point.less_than_counter += 1; + } + RestrictedOp::LtEq => { + split_point.less_than_counter += 1; + split_point.is_equal = true; + } + RestrictedOp::Gt => { + split_point.less_than_counter -= 1; + } + RestrictedOp::GtEq => { + split_point.less_than_counter -= 1; + split_point.is_equal = true; + } + RestrictedOp::And | RestrictedOp::Or => { + unreachable!("conjunct expr should be handled above") + } + } + + Ok(()) + } + + /// Return if the rule is legal. + fn check_axis(&self) -> Result<()> { + for (col_index, axis) in self.axis.iter().enumerate() { + for (val, split_point) in axis { + if split_point.less_than_counter != 0 || !split_point.is_equal { + UnclosedValueSnafu { + value: format!("{val:?}"), + column: self.rule.partition_columns[col_index].clone(), + } + .fail()?; + } + } + } + Ok(()) + } +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; use super::*; - use crate::error; + use crate::error::{self, Error}; #[test] fn test_find_region() { @@ -154,7 +301,7 @@ mod tests { // b >= 'hz' AND b < 'sh', // b >= 'sh' // ) - let rule = MultiDimPartitionRule::new( + let rule = MultiDimPartitionRule::try_new( vec!["b".to_string()], vec![1, 2, 3], vec![ @@ -182,7 +329,8 @@ mod tests { Operand::Value(datatypes::value::Value::String("sh".into())), ), ], - ); + ) + .unwrap(); assert_matches!( rule.find_region(&["foo".into(), 1000_i32.into()]), Err(error::Error::RegionKeysSize { @@ -198,4 +346,297 @@ mod tests { assert_matches!(rule.find_region(&["sh".into()]), Ok(3)); assert_matches!(rule.find_region(&["zzzz".into()]), Ok(3)); } + + #[test] + fn invalid_expr_case_1() { + // PARTITION ON COLUMNS (b) ( + // b <= b >= 'hz' AND b < 'sh', + // ) + let rule = MultiDimPartitionRule::try_new( + vec!["a".to_string(), "b".to_string()], + vec![1], + vec![PartitionExpr::new( + Operand::Column("b".to_string()), + RestrictedOp::LtEq, + Operand::Expr(PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Column("b".to_string()), + RestrictedOp::GtEq, + Operand::Value(datatypes::value::Value::String("hz".into())), + )), + RestrictedOp::And, + Operand::Expr(PartitionExpr::new( + Operand::Column("b".to_string()), + RestrictedOp::Lt, + Operand::Value(datatypes::value::Value::String("sh".into())), + )), + )), + )], + ); + + // check rule + assert_matches!(rule.unwrap_err(), Error::InvalidExpr { .. }); + } + + #[test] + fn invalid_expr_case_2() { + // PARTITION ON COLUMNS (b) ( + // b >= 'hz' AND 'sh', + // ) + let rule = MultiDimPartitionRule::try_new( + vec!["a".to_string(), "b".to_string()], + vec![1], + vec![PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Column("b".to_string()), + RestrictedOp::GtEq, + Operand::Value(datatypes::value::Value::String("hz".into())), + )), + RestrictedOp::And, + Operand::Value(datatypes::value::Value::String("sh".into())), + )], + ); + + // check rule + assert_matches!(rule.unwrap_err(), Error::ConjunctExprWithNonExpr { .. }); + } + + /// ```ignore + /// │ │ + /// │ │ + /// ─────────┼──────────┼────────────► b + /// │ │ + /// │ │ + /// b <= h b >= s + /// ``` + #[test] + fn empty_expr_case_1() { + // PARTITION ON COLUMNS (b) ( + // b <= 'h', + // b >= 's' + // ) + let rule = MultiDimPartitionRule::try_new( + vec!["a".to_string(), "b".to_string()], + vec![1, 2], + vec![ + PartitionExpr::new( + Operand::Column("b".to_string()), + RestrictedOp::LtEq, + Operand::Value(datatypes::value::Value::String("h".into())), + ), + PartitionExpr::new( + Operand::Column("b".to_string()), + RestrictedOp::GtEq, + Operand::Value(datatypes::value::Value::String("s".into())), + ), + ], + ); + + // check rule + assert_matches!(rule.unwrap_err(), Error::UnclosedValue { .. }); + } + + /// ``` + /// a + /// ▲ + /// │ ‖ + /// │ ‖ + /// 200 │ ┌─────────┤ + /// │ │ │ + /// │ │ │ + /// │ │ │ + /// 100 │ ======┴─────────┘ + /// │ + /// └──────────────────────────►b + /// 10 20 + /// ``` + #[test] + fn empty_expr_case_2() { + // PARTITION ON COLUMNS (b) ( + // a >= 100 AND b <= 10 OR a > 100 AND a <= 200 AND b <= 10 OR a >= 200 AND b > 10 AND b <= 20 OR a > 200 AND b <= 20 + // a < 100 AND b <= 20 OR a >= 100 AND b > 20 + // ) + let rule = MultiDimPartitionRule::try_new( + vec!["a".to_string(), "b".to_string()], + vec![1, 2], + vec![ + PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + // a >= 100 AND b <= 10 + Operand::Expr(PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::GtEq, + Operand::Value(datatypes::value::Value::Int64(100)), + )), + RestrictedOp::And, + Operand::Expr(PartitionExpr::new( + Operand::Column("b".to_string()), + RestrictedOp::LtEq, + Operand::Value(datatypes::value::Value::Int64(10)), + )), + )), + RestrictedOp::Or, + // a > 100 AND a <= 200 AND b <= 10 + Operand::Expr(PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::Gt, + Operand::Value(datatypes::value::Value::Int64(100)), + )), + RestrictedOp::And, + Operand::Expr(PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::LtEq, + Operand::Value(datatypes::value::Value::Int64(200)), + )), + )), + RestrictedOp::And, + Operand::Expr(PartitionExpr::new( + Operand::Column("b".to_string()), + RestrictedOp::LtEq, + Operand::Value(datatypes::value::Value::Int64(10)), + )), + )), + )), + RestrictedOp::Or, + // a >= 200 AND b > 10 AND b <= 20 + Operand::Expr(PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::GtEq, + Operand::Value(datatypes::value::Value::Int64(200)), + )), + RestrictedOp::And, + Operand::Expr(PartitionExpr::new( + Operand::Column("b".to_string()), + RestrictedOp::Gt, + Operand::Value(datatypes::value::Value::Int64(10)), + )), + )), + RestrictedOp::And, + Operand::Expr(PartitionExpr::new( + Operand::Column("b".to_string()), + RestrictedOp::LtEq, + Operand::Value(datatypes::value::Value::Int64(20)), + )), + )), + )), + RestrictedOp::Or, + // a > 200 AND b <= 20 + Operand::Expr(PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::Gt, + Operand::Value(datatypes::value::Value::Int64(200)), + )), + RestrictedOp::And, + Operand::Expr(PartitionExpr::new( + Operand::Column("b".to_string()), + RestrictedOp::LtEq, + Operand::Value(datatypes::value::Value::Int64(20)), + )), + )), + ), + PartitionExpr::new( + // a < 100 AND b <= 20 + Operand::Expr(PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::Lt, + Operand::Value(datatypes::value::Value::Int64(100)), + )), + RestrictedOp::And, + Operand::Expr(PartitionExpr::new( + Operand::Column("b".to_string()), + RestrictedOp::LtEq, + Operand::Value(datatypes::value::Value::Int64(20)), + )), + )), + RestrictedOp::Or, + // a >= 100 AND b > 20 + Operand::Expr(PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::GtEq, + Operand::Value(datatypes::value::Value::Int64(100)), + )), + RestrictedOp::And, + Operand::Expr(PartitionExpr::new( + Operand::Column("b".to_string()), + RestrictedOp::GtEq, + Operand::Value(datatypes::value::Value::Int64(20)), + )), + )), + ), + ], + ); + + // check rule + assert_matches!(rule.unwrap_err(), Error::UnclosedValue { .. }); + } + + #[test] + fn duplicate_expr_case_1() { + // PARTITION ON COLUMNS (a) ( + // a <= 20, + // a >= 10 + // ) + let rule = MultiDimPartitionRule::try_new( + vec!["a".to_string(), "b".to_string()], + vec![1, 2], + vec![ + PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::LtEq, + Operand::Value(datatypes::value::Value::Int64(20)), + ), + PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::GtEq, + Operand::Value(datatypes::value::Value::Int64(10)), + ), + ], + ); + + // check rule + assert_matches!(rule.unwrap_err(), Error::UnclosedValue { .. }); + } + + #[test] + #[ignore = "checker cannot detect this kind of duplicate for now"] + fn duplicate_expr_case_2() { + // PARTITION ON COLUMNS (a) ( + // a != 20, + // a <= 20, + // a > 20, + // ) + let rule = MultiDimPartitionRule::try_new( + vec!["a".to_string(), "b".to_string()], + vec![1, 2], + vec![ + PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::NotEq, + Operand::Value(datatypes::value::Value::Int64(20)), + ), + PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::LtEq, + Operand::Value(datatypes::value::Value::Int64(20)), + ), + PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::Gt, + Operand::Value(datatypes::value::Value::Int64(20)), + ), + ], + ); + + // check rule + assert!(rule.is_err()); + } }