diff --git a/e2e_test/batch/catalog/pg_attribute.slt.part b/e2e_test/batch/catalog/pg_attribute.slt.part index 11d7be4a1d24..b1e2b44181a1 100644 --- a/e2e_test/batch/catalog/pg_attribute.slt.part +++ b/e2e_test/batch/catalog/pg_attribute.slt.part @@ -38,9 +38,9 @@ select i.relname, a.attname, ix.indkey from pg_catalog.pg_class t join pg_catalog.pg_attribute a on t.oid = a.attrelid and a.attnum = ANY(ix.indkey) where t.relname = 'tmp' order by a.attnum; ---- -tmp_idx id1 {2,1,3} -tmp_idx id2 {2,1,3} -tmp_idx id3 {2,1,3} +tmp_idx id2 {2,3,4,5} +tmp_idx id3 {2,3,4,5} +tmp_idx id4 {2,3,4,5} statement ok drop table tmp; diff --git a/e2e_test/batch/catalog/pg_index.slt.part b/e2e_test/batch/catalog/pg_index.slt.part index 774dc6c5e266..271553f13a62 100644 --- a/e2e_test/batch/catalog/pg_index.slt.part +++ b/e2e_test/batch/catalog/pg_index.slt.part @@ -10,7 +10,7 @@ select ix.indnatts, ix.indkey from pg_catalog.pg_class t join pg_catalog.pg_class i on i.oid = ix.indexrelid where t.relname = 'tmp' and i.relname = 'tmp_id2_idx'; ---- -1 {2} +2 {2,3} statement ok create index tmp_id2_idx_include_id1 on tmp(id2) include(id1); @@ -21,7 +21,7 @@ select ix.indnatts, ix.indkey from pg_catalog.pg_class t join pg_catalog.pg_class i on i.oid = ix.indexrelid where t.relname = 'tmp' and i.relname = 'tmp_id2_idx_include_id1'; ---- -2 {2,1} +3 {2,3,4} statement ok create index tmp_id1_id2_idx on tmp(id1, id2); @@ -32,7 +32,7 @@ select ix.indnatts, ix.indkey from pg_catalog.pg_class t join pg_catalog.pg_class i on i.oid = ix.indexrelid where t.relname = 'tmp' and i.relname = 'tmp_id1_id2_idx'; ---- -2 {1,2} +3 {2,3,4} statement ok drop table tmp; diff --git a/e2e_test/ddl/alter_table_column.slt b/e2e_test/ddl/alter_table_column.slt index fc142e151019..8407ca388003 100644 --- a/e2e_test/ddl/alter_table_column.slt +++ b/e2e_test/ddl/alter_table_column.slt @@ -215,7 +215,7 @@ statement ok create table t(id int primary key, a int, b varchar); statement ok -create index idx on t(a); +create index idx on t(a, lower(b)); statement ok alter table t add column c int; diff --git a/e2e_test/ddl/index.slt b/e2e_test/ddl/index.slt index 6509607d77d3..865ce20839ad 100644 --- a/e2e_test/ddl/index.slt +++ b/e2e_test/ddl/index.slt @@ -113,4 +113,24 @@ statement ok drop index index_on_quoted_column2; statement ok -drop table t2; \ No newline at end of file +drop table t2; + +# create functional indexes +statement ok + +create table t (j jsonb); + +statement ok +insert into t values ('{"k": "abc" }'::jsonb); + +statement ok +create index idx on t(j->'k'); + +# query functional indexes +query II +select * from idx; +---- +"abc" {"k": "abc"} + +statement ok +drop table t; \ No newline at end of file diff --git a/src/frontend/planner_test/tests/testdata/index_selection.yaml b/src/frontend/planner_test/tests/testdata/index_selection.yaml index f1b33325b387..af3ab8c55189 100644 --- a/src/frontend/planner_test/tests/testdata/index_selection.yaml +++ b/src/frontend/planner_test/tests/testdata/index_selection.yaml @@ -671,3 +671,11 @@ └─BatchLookupJoin { type: Inner, predicate: t2.d1 = idx_t1.c2, output: all } └─BatchExchange { order: [], dist: UpstreamHashShard(t2.d1) } └─BatchScan { table: t2, columns: [t2.d1, t2.d2], distribution: SomeShard } +- sql: | + create table t (j jsonb); + explain create index idx on t(j->'k'); + explain_output: | + StreamMaterialize { columns: [JSONB_ACCESS_INNER, j, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [JSONB_ACCESS_INNER, t._row_id], pk_conflict: "NoCheck" } + └─StreamExchange { dist: HashShard($expr1) } + └─StreamProject { exprs: [JsonbAccessInner(t.j, 'k':Varchar) as $expr1, t.j, t._row_id] } + └─StreamTableScan { table: t, columns: [j, _row_id] } diff --git a/src/frontend/src/catalog/index_catalog.rs b/src/frontend/src/catalog/index_catalog.rs index 1763cfbc18bc..5412396d962e 100644 --- a/src/frontend/src/catalog/index_catalog.rs +++ b/src/frontend/src/catalog/index_catalog.rs @@ -17,14 +17,12 @@ use std::sync::Arc; use itertools::Itertools; use risingwave_common::catalog::IndexId; -use risingwave_common::types::DataType; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::catalog::PbIndex; -use risingwave_pb::expr::expr_node::RexNode; use super::ColumnId; use crate::catalog::{DatabaseId, RelationCatalog, SchemaId, TableCatalog}; -use crate::expr::{Expr, InputRef}; +use crate::expr::{Expr, ExprImpl}; use crate::user::UserId; #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -33,10 +31,11 @@ pub struct IndexCatalog { pub name: String, - /// Only `InputRef` type index is supported Now. + /// Only `InputRef` and `FuncCall` type index is supported Now. /// The index of `InputRef` is the column index of the primary table. - /// index_item size is equal to index table columns size - pub index_item: Vec, + /// The index_item size is equal to the index table columns size + /// The input args of `FuncCall` is also the column index of the primary table. + pub index_item: Vec, pub index_table: Arc, @@ -55,30 +54,29 @@ impl IndexCatalog { index_table: &TableCatalog, primary_table: &TableCatalog, ) -> Self { - let index_item = index_prost + let index_item: Vec = index_prost .index_item .iter() - .map(|x| match x.rex_node.as_ref().unwrap() { - RexNode::InputRef(input_col_idx) => InputRef { - index: *input_col_idx as usize, - data_type: DataType::from(x.return_type.as_ref().unwrap()), - }, - RexNode::FuncCall(_) => unimplemented!(), - _ => unreachable!(), - }) - .collect_vec(); + .map(ExprImpl::from_expr_proto) + .try_collect() + .unwrap(); - let primary_to_secondary_mapping = index_item + let primary_to_secondary_mapping: BTreeMap = index_item .iter() .enumerate() - .map(|(i, input_ref)| (input_ref.index, i)) + .filter_map(|(i, expr)| match expr { + ExprImpl::InputRef(input_ref) => Some((input_ref.index, i)), + ExprImpl::FunctionCall(_) => None, + _ => unreachable!(), + }) .collect(); - let secondary_to_primary_mapping = index_item - .iter() - .enumerate() - .map(|(i, input_ref)| (i, input_ref.index)) - .collect(); + let secondary_to_primary_mapping = BTreeMap::from_iter( + primary_to_secondary_mapping + .clone() + .into_iter() + .map(|(x, y)| (y, x)), + ); let original_columns = index_prost .original_columns @@ -147,7 +145,7 @@ impl IndexCatalog { index_item: self .index_item .iter() - .map(InputRef::to_expr_proto) + .map(|expr| expr.to_expr_proto()) .collect_vec(), original_columns: self.original_columns.iter().map(Into::into).collect_vec(), } diff --git a/src/frontend/src/handler/create_index.rs b/src/frontend/src/handler/create_index.rs index 7016a9fad542..afd31fd71263 100644 --- a/src/frontend/src/handler/create_index.rs +++ b/src/frontend/src/handler/create_index.rs @@ -19,7 +19,7 @@ use fixedbitset::FixedBitSet; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::{IndexId, TableDesc, TableId}; -use risingwave_common::error::{ErrorCode, Result, RwError}; +use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_pb::catalog::{PbIndex, PbTable}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; @@ -29,7 +29,7 @@ use risingwave_sqlparser::ast::{Ident, ObjectName, OrderByExpr}; use super::RwPgResponse; use crate::binder::Binder; use crate::catalog::root_catalog::SchemaPath; -use crate::expr::{Expr, ExprImpl, InputRef}; +use crate::expr::{Expr, ExprImpl, ExprType, InputRef}; use crate::handler::privilege::ObjectCheckItem; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::{Explain, LogicalProject, LogicalScan, StreamMaterialize}; @@ -48,7 +48,6 @@ pub(crate) fn gen_create_index_plan( include: Vec, distributed_by: Vec, ) -> Result<(PlanRef, PbTable, PbIndex)> { - let columns = check_columns(columns)?; let db_name = session.database(); let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?; let search_path = session.config().get_search_path(); @@ -77,77 +76,114 @@ pub(crate) fn gen_create_index_plan( Object::TableId(table.id.table_id), )])?; - let table_desc = Rc::new(table.table_desc()); - let table_desc_map = table_desc - .columns - .iter() - .enumerate() - .map(|(x, y)| (y.name.clone(), x)) - .collect::>(); - - let to_column_order = |(ident, order): &(Ident, OrderType)| { - let x = ident.real_value(); - table_desc_map - .get(&x) - .map(|x| ColumnOrder::new(*x, *order)) - .ok_or_else(|| ErrorCode::ItemNotFound(x).into()) - }; - - let to_column_indices = |ident: &Ident| { - let x = ident.real_value(); - table_desc_map - .get(&x) - .cloned() - .ok_or_else(|| ErrorCode::ItemNotFound(x).into()) - }; - - let mut index_columns = columns - .iter() - .map(to_column_order) - .try_collect::<_, Vec<_>, RwError>()?; + let mut binder = Binder::new_for_stream(session); + binder.bind_table(Some(&schema_name), &table_name, None)?; + + let mut index_columns_ordered_expr = vec![]; + let mut include_columns_expr = vec![]; + let mut distributed_columns_expr = vec![]; + for column in columns { + let order_type = OrderType::from_bools(column.asc, column.nulls_first); + let expr_impl = binder.bind_expr(column.expr)?; + match &expr_impl { + ExprImpl::InputRef(_) => {} + ExprImpl::FunctionCall(func) => { + match func.get_expr_type() { + // TODO: support more functions after verification + ExprType::Lower + | ExprType::Upper + | ExprType::JsonbAccessInner + | ExprType::JsonbAccessStr => {} + _ => { + return Err(ErrorCode::NotSupported( + "this function is not supported for indexes".into(), + "use other functions instead".into(), + ) + .into()) + } + }; + if !func.inputs().iter().all(|input| { + matches!(input, ExprImpl::InputRef(_)) || matches!(input, ExprImpl::Literal(_)) + }) { + return Err(ErrorCode::NotSupported( + "complex arguments for functions are not supported".into(), + "use columns or literals instead".into(), + ) + .into()); + } + } + _ => { + return Err(ErrorCode::NotSupported( + "index columns should be columns or functions".into(), + "use columns or functions instead".into(), + ) + .into()) + } + }; + index_columns_ordered_expr.push((expr_impl, order_type)); + } - let mut include_columns = if include.is_empty() { + if include.is_empty() { // Create index to include all (non-hidden) columns by default. - table + include_columns_expr = table .columns() .iter() .enumerate() .filter(|(_, column)| !column.is_hidden) - .map(|(x, _)| x) - .collect_vec() + .map(|(x, column)| { + ExprImpl::InputRef(InputRef::new(x, column.column_desc.data_type.clone()).into()) + }) + .collect_vec(); } else { - include - .iter() - .map(to_column_indices) - .try_collect::<_, Vec<_>, RwError>()? + for column in include { + let expr_impl = + binder.bind_expr(risingwave_sqlparser::ast::Expr::Identifier(column))?; + include_columns_expr.push(expr_impl); + } }; - let distributed_by_columns = distributed_by - .iter() - .map(to_column_indices) - .try_collect::<_, Vec<_>, RwError>()?; + for column in distributed_by { + let expr_impl = binder.bind_expr(risingwave_sqlparser::ast::Expr::Identifier(column))?; + distributed_columns_expr.push(expr_impl); + } + + let table_desc = Rc::new(table.table_desc()); // Remove duplicate column of index columns let mut set = HashSet::new(); - index_columns = index_columns + index_columns_ordered_expr = index_columns_ordered_expr .into_iter() - .filter(|x| set.insert(x.column_index)) + .filter(|(expr, _)| match expr { + ExprImpl::InputRef(input_ref) => set.insert(input_ref.index), + ExprImpl::FunctionCall(_) => true, + _ => unreachable!(), + }) .collect_vec(); // Remove include columns are already in index columns - include_columns = include_columns + include_columns_expr = include_columns_expr .into_iter() - .filter(|x| set.insert(*x)) + .filter(|expr| match expr { + ExprImpl::InputRef(input_ref) => set.insert(input_ref.index), + _ => unreachable!(), + }) .collect_vec(); // Remove duplicate columns of distributed by columns - let distributed_by_columns = distributed_by_columns.into_iter().unique().collect_vec(); + let mut set = HashSet::new(); + let distributed_columns_expr = distributed_columns_expr + .into_iter() + .filter(|expr| match expr { + ExprImpl::InputRef(input_ref) => set.insert(input_ref.index), + _ => unreachable!(), + }) + .collect_vec(); // Distributed by columns should be a prefix of index columns - if !index_columns + if !index_columns_ordered_expr .iter() - .map(|x| x.column_index) + .map(|(expr, _)| expr.clone()) .collect_vec() - .starts_with(&distributed_by_columns) + .starts_with(&distributed_columns_expr) { return Err(ErrorCode::InvalidInputSyntax( "Distributed by columns should be a prefix of index columns".to_string(), @@ -161,14 +197,14 @@ pub(crate) fn gen_create_index_plan( table_desc.clone(), context, index_table_name.clone(), - &index_columns, - &include_columns, + &index_columns_ordered_expr, + &include_columns_expr, // We use the whole index columns as distributed key by default if users // haven't specify the distributed by columns. - if distributed_by_columns.is_empty() { - index_columns.len() + if distributed_columns_expr.is_empty() { + index_columns_ordered_expr.len() } else { - distributed_by_columns.len() + distributed_columns_expr.len() }, )?; @@ -192,6 +228,20 @@ pub(crate) fn gen_create_index_plan( index_table_prost.owner = session.user_id(); index_table_prost.dependent_relations = vec![table.id.table_id]; + // FIXME: why sqlalchemy need these information? + let original_columns = index_table + .columns + .iter() + .map(|x| x.column_desc.column_id.get_id()) + .collect(); + + let index_item = build_index_item( + index_table.table_desc().into(), + table.name(), + table_desc, + index_columns_ordered_expr, + ); + let index_prost = PbIndex { id: IndexId::placeholder().index_id, schema_id: index_schema_id, @@ -200,18 +250,8 @@ pub(crate) fn gen_create_index_plan( owner: index_table_prost.owner, index_table_id: TableId::placeholder().table_id, primary_table_id: table.id.table_id, - index_item: build_index_item(index_table.table_desc().into(), table.name(), table_desc) - .iter() - .map(InputRef::to_expr_proto) - .collect_vec(), - original_columns: index_columns - .iter() - .map(|x| x.column_index) - .collect_vec() - .iter() - .chain(include_columns.iter()) - .map(|index| *index as i32) - .collect_vec(), + index_item, + original_columns, }; let plan: PlanRef = materialize.into(); @@ -229,7 +269,8 @@ fn build_index_item( index_table_desc: Rc, primary_table_name: &str, primary_table_desc: Rc, -) -> Vec { + index_columns: Vec<(ExprImpl, OrderType)>, +) -> Vec { let primary_table_desc_map = primary_table_desc .columns .iter() @@ -239,27 +280,35 @@ fn build_index_item( let primary_table_name_prefix = format!("{}.", primary_table_name); - index_table_desc - .columns - .iter() - .map(|x| { - let name = if x.name.starts_with(&primary_table_name_prefix) { - x.name[primary_table_name_prefix.len()..].to_string() - } else { - x.name.clone() - }; - - let column_index = *primary_table_desc_map.get(&name).unwrap(); - InputRef { - index: column_index, - data_type: primary_table_desc - .columns - .get(column_index) - .unwrap() - .data_type - .clone(), - } - }) + let index_columns_len = index_columns.len(); + index_columns + .into_iter() + .map(|(expr, _)| expr.to_expr_proto()) + .chain( + index_table_desc + .columns + .iter() + .skip(index_columns_len) + .map(|x| { + let name = if x.name.starts_with(&primary_table_name_prefix) { + x.name[primary_table_name_prefix.len()..].to_string() + } else { + x.name.clone() + }; + + let column_index = *primary_table_desc_map.get(&name).unwrap(); + InputRef { + index: column_index, + data_type: primary_table_desc + .columns + .get(column_index) + .unwrap() + .data_type + .clone(), + } + .to_expr_proto() + }), + ) .collect_vec() } @@ -270,8 +319,8 @@ fn assemble_materialize( table_desc: Rc, context: OptimizerContextRef, index_name: String, - index_columns: &[ColumnOrder], - include_columns: &[usize], + index_columns: &[(ExprImpl, OrderType)], + include_columns: &[ExprImpl], distributed_by_columns_len: usize, ) -> Result { // Build logical plan and then call gen_create_index_plan @@ -292,28 +341,48 @@ fn assemble_materialize( let exprs = index_columns .iter() - .map(|x| x.column_index) - .collect_vec() - .iter() - .chain(include_columns.iter()) - .map(|&i| { - ExprImpl::InputRef( - InputRef::new(i, table_desc.columns.get(i).unwrap().data_type.clone()).into(), - ) - }) + .map(|(expr, _)| expr.clone()) + .chain(include_columns.iter().cloned()) .collect_vec(); let logical_project = LogicalProject::create(logical_scan.into(), exprs); let mut project_required_cols = FixedBitSet::with_capacity(logical_project.schema().len()); project_required_cols.toggle_range(0..logical_project.schema().len()); + let mut col_names = HashSet::new(); + let mut count = 0; + let out_names: Vec = index_columns .iter() - .map(|x| x.column_index) - .collect_vec() - .iter() - .chain(include_columns.iter()) - .map(|&i| table_desc.columns.get(i).unwrap().name.clone()) + .map(|(expr, _)| match expr { + ExprImpl::InputRef(input_ref) => table_desc + .columns + .get(input_ref.index) + .unwrap() + .name + .clone(), + ExprImpl::FunctionCall(func) => { + let func_name = func.get_expr_type().as_str_name().to_string(); + let mut name = func_name.clone(); + while !col_names.insert(name.clone()) { + count += 1; + name = format!("{}{}", func_name, count); + } + name + } + _ => unreachable!(), + }) + .chain(include_columns.iter().map(|expr| { + match expr { + ExprImpl::InputRef(input_ref) => table_desc + .columns + .get(input_ref.index) + .unwrap() + .name + .clone(), + _ => unreachable!(), + } + })) .collect_vec(); PlanRoot::new( @@ -325,7 +394,7 @@ fn assemble_materialize( index_columns .iter() .enumerate() - .map(|(i, column_order)| ColumnOrder::new(i, column_order.order_type)) + .map(|(i, (_, order))| ColumnOrder::new(i, *order)) .collect(), ), project_required_cols, @@ -334,27 +403,6 @@ fn assemble_materialize( .gen_index_plan(index_name, definition) } -fn check_columns(columns: Vec) -> Result> { - columns - .into_iter() - .map(|column| { - let order_type = OrderType::from_bools(column.asc, column.nulls_first); - - use risingwave_sqlparser::ast::Expr; - - if let Expr::Identifier(ident) = column.expr { - Ok::<(_, _), RwError>((ident, order_type)) - } else { - Err(ErrorCode::NotImplemented( - "only identifier is supported for create index".into(), - None.into(), - ) - .into()) - } - }) - .try_collect::<_, Vec<_>, _>() -} - pub async fn handle_create_index( handler_args: HandlerArgs, if_not_exists: bool, diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 3e611e8c106a..232d8ee95378 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -90,11 +90,12 @@ macro_rules! commit_meta { } pub(crate) use commit_meta; use risingwave_common::util::column_index_mapping::ColIndexMapping; -use risingwave_pb::expr::expr_node::RexNode; use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::{CreatingJobInfo, Relation, RelationGroup}; -use crate::manager::catalog::utils::{alter_relation_rename, alter_relation_rename_refs}; +use crate::manager::catalog::utils::{ + alter_relation_rename, alter_relation_rename_refs, ReplaceTableExprRewriter, +}; pub type CatalogManagerRef = Arc>; @@ -1945,28 +1946,16 @@ where let mut updated_indexes = vec![]; + let expr_rewriter = ReplaceTableExprRewriter { + table_col_index_mapping: table_col_index_mapping.clone(), + }; + for index_id in &index_ids { let mut index = indexes.get_mut(*index_id).unwrap(); index .index_item .iter_mut() - .for_each(|x| match x.rex_node.as_mut().unwrap() { - RexNode::InputRef(input_col_idx) => { - *input_col_idx = - table_col_index_mapping.map(*input_col_idx as usize) as u32; - assert_eq!( - x.return_type, - table.columns[*input_col_idx as usize] - .column_desc - .clone() - .unwrap() - .column_type - ); - } - RexNode::FuncCall(_) => unimplemented!(), - _ => unreachable!(), - }); - + .for_each(|x| expr_rewriter.rewrite_expr(x)); updated_indexes.push(indexes.get(index_id).cloned().unwrap()); } diff --git a/src/meta/src/manager/catalog/utils.rs b/src/meta/src/manager/catalog/utils.rs index 59044bfca190..3a97077f38c1 100644 --- a/src/meta/src/manager/catalog/utils.rs +++ b/src/meta/src/manager/catalog/utils.rs @@ -13,6 +13,9 @@ // limitations under the License. use itertools::Itertools; +use risingwave_common::util::column_index_mapping::ColIndexMapping; +use risingwave_pb::expr::expr_node::RexNode; +use risingwave_pb::expr::{ExprNode, FunctionCall, UserDefinedFunction}; use risingwave_sqlparser::ast::{ Array, CreateSink, CreateSinkStatement, CreateSourceStatement, Distinct, Expr, Function, FunctionArg, FunctionArgExpr, Ident, ObjectName, Query, SelectItem, SetExpr, Statement, @@ -317,6 +320,37 @@ impl QueryRewriter<'_> { } } +pub struct ReplaceTableExprRewriter { + pub table_col_index_mapping: ColIndexMapping, +} + +impl ReplaceTableExprRewriter { + pub fn rewrite_expr(&self, expr: &mut ExprNode) { + let rex_node = expr.rex_node.as_mut().unwrap(); + match rex_node { + RexNode::InputRef(input_col_idx) => { + *input_col_idx = self.table_col_index_mapping.map(*input_col_idx as usize) as u32 + } + RexNode::Constant(_) => {} + RexNode::Udf(udf) => self.rewrite_udf(udf), + RexNode::FuncCall(function_call) => self.rewrite_function_call(function_call), + } + } + + fn rewrite_udf(&self, udf: &mut UserDefinedFunction) { + udf.children + .iter_mut() + .for_each(|expr| self.rewrite_expr(expr)); + } + + fn rewrite_function_call(&self, function_call: &mut FunctionCall) { + function_call + .children + .iter_mut() + .for_each(|expr| self.rewrite_expr(expr)); + } +} + #[cfg(test)] mod tests { use crate::manager::catalog::utils::{alter_relation_rename, alter_relation_rename_refs};