diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index c1b619cf85b..c5f256a3c5b 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -261,6 +261,7 @@ async fn create_query_engine(meta_addr: &str) -> Result { let state = Arc::new(QueryEngineState::new( catalog_list, None, + None, false, plugins.clone(), )); diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 3a526235ca5..885ed8bbddb 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -338,6 +338,7 @@ impl DatanodeBuilder { // query engine in datanode only executes plan with resolved table source. MemoryCatalogManager::with_default_setup(), None, + None, false, plugins, ); diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index bffefabd47e..a7cef4ba678 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -53,7 +53,7 @@ use meta_client::client::{MetaClient, MetaClientBuilder}; use operator::delete::{Deleter, DeleterRef}; use operator::insert::{Inserter, InserterRef}; use operator::statement::StatementExecutor; -use operator::table::table_idents_to_full_name; +use operator::table::{table_idents_to_full_name, TableMutationOperator}; use partition::manager::PartitionRuleManager; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::plan::LogicalPlan; @@ -163,14 +163,6 @@ impl Instance { catalog_manager.datanode_manager().clone(), ); - let query_engine = QueryEngineFactory::new_with_plugins( - catalog_manager.clone(), - Some(region_query_handler.clone()), - true, - plugins.clone(), - ) - .query_engine(); - let inserter = Arc::new(Inserter::new( catalog_manager.clone(), partition_manager.clone(), @@ -182,6 +174,20 @@ impl Instance { datanode_clients, )); + let table_mutation_handler = Arc::new(TableMutationOperator::new( + inserter.clone(), + deleter.clone(), + )); + + let query_engine = QueryEngineFactory::new_with_plugins( + catalog_manager.clone(), + Some(region_query_handler.clone()), + Some(table_mutation_handler), + true, + plugins.clone(), + ) + .query_engine(); + let statement_executor = Arc::new(StatementExecutor::new( catalog_manager.clone(), query_engine.clone(), @@ -189,7 +195,6 @@ impl Instance { meta_backend.clone(), catalog_manager.clone(), inserter.clone(), - deleter.clone(), )); plugins.insert::(statement_executor.clone()); @@ -301,9 +306,25 @@ impl Instance { let region_query_handler = FrontendRegionQueryHandler::arc(partition_manager.clone(), datanode_manager.clone()); + let inserter = Arc::new(Inserter::new( + catalog_manager.clone(), + partition_manager.clone(), + datanode_manager.clone(), + )); + let deleter = Arc::new(Deleter::new( + catalog_manager.clone(), + partition_manager, + datanode_manager.clone(), + )); + let table_mutation_handler = Arc::new(TableMutationOperator::new( + inserter.clone(), + deleter.clone(), + )); + let query_engine = QueryEngineFactory::new_with_plugins( catalog_manager.clone(), Some(region_query_handler), + Some(table_mutation_handler), true, plugins.clone(), ) @@ -317,25 +338,12 @@ impl Instance { let cache_invalidator = Arc::new(DummyCacheInvalidator); let ddl_executor = Arc::new(DdlManager::new( procedure_manager, - datanode_manager.clone(), + datanode_manager, cache_invalidator.clone(), table_metadata_manager.clone(), Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())), )); - let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone())); - - let inserter = Arc::new(Inserter::new( - catalog_manager.clone(), - partition_manager.clone(), - datanode_manager.clone(), - )); - let deleter = Arc::new(Deleter::new( - catalog_manager.clone(), - partition_manager, - datanode_manager, - )); - let statement_executor = Arc::new(StatementExecutor::new( catalog_manager.clone(), query_engine.clone(), @@ -343,7 +351,6 @@ impl Instance { kv_backend.clone(), cache_invalidator, inserter.clone(), - deleter.clone(), )); Ok(Instance { diff --git a/src/operator/src/delete.rs b/src/operator/src/delete.rs index 546d45d5f8f..50111dd3911 100644 --- a/src/operator/src/delete.rs +++ b/src/operator/src/delete.rs @@ -98,7 +98,7 @@ impl Deleter { &self, request: TableDeleteRequest, ctx: QueryContextRef, - ) -> Result { + ) -> Result { let catalog = request.catalog_name.as_str(); let schema = request.schema_name.as_str(); let table = request.table_name.as_str(); @@ -108,7 +108,9 @@ impl Deleter { let deletes = TableToRegion::new(&table_info, &self.partition_manager) .convert(request) .await?; - self.do_request(deletes, ctx.trace_id(), 0).await + + let affected_rows = self.do_request(deletes, ctx.trace_id(), 0).await?; + Ok(affected_rows as _) } } diff --git a/src/operator/src/req_convert/insert/table_to_region.rs b/src/operator/src/req_convert/insert/table_to_region.rs index fa7181eebbf..5ece06b79f9 100644 --- a/src/operator/src/req_convert/insert/table_to_region.rs +++ b/src/operator/src/req_convert/insert/table_to_region.rs @@ -145,7 +145,6 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table_1".to_string(), columns_values: HashMap::from([("a".to_string(), vector)]), - region_number: 0, } } diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index e6b46d26a60..f2d371b6d23 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -48,7 +48,6 @@ use table::engine::TableReference; use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest}; use table::TableRef; -use crate::delete::DeleterRef; use crate::error::{ self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, PlanStatementSnafu, Result, TableNotFoundSnafu, @@ -66,7 +65,6 @@ pub struct StatementExecutor { partition_manager: PartitionRuleManagerRef, cache_invalidator: CacheInvalidatorRef, inserter: InserterRef, - deleter: DeleterRef, } impl StatementExecutor { @@ -77,7 +75,6 @@ impl StatementExecutor { kv_backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef, inserter: InserterRef, - deleter: DeleterRef, ) -> Self { Self { catalog_manager, @@ -87,7 +84,6 @@ impl StatementExecutor { partition_manager: Arc::new(PartitionRuleManager::new(kv_backend)), cache_invalidator, inserter, - deleter, } } @@ -104,14 +100,12 @@ impl StatementExecutor { pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result { match stmt { - Statement::Query(_) | Statement::Explain(_) => { + Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => { self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await } Statement::Insert(insert) => self.insert(insert, query_ctx).await, - Statement::Delete(delete) => self.delete(delete, query_ctx).await, - Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await, Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await, diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index b335e5356a5..9f34627ee4e 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -330,7 +330,6 @@ impl StatementExecutor { schema_name: req.schema_name.to_string(), table_name: req.table_name.to_string(), columns_values, - region_number: 0, }, query_ctx.clone(), )); diff --git a/src/operator/src/statement/dml.rs b/src/operator/src/statement/dml.rs index f127dda4d48..197ac5c03b1 100644 --- a/src/operator/src/statement/dml.rs +++ b/src/operator/src/statement/dml.rs @@ -12,30 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - use common_query::Output; -use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; -use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp}; -use datatypes::schema::SchemaRef; -use futures_util::StreamExt; use query::parser::QueryStatement; -use query::plan::LogicalPlan; use session::context::QueryContextRef; -use snafu::{ensure, OptionExt, ResultExt}; -use sql::statements::delete::Delete; use sql::statements::insert::Insert; use sql::statements::statement::Statement; -use table::engine::TableReference; -use table::metadata::TableInfoRef; -use table::requests::{DeleteRequest, InsertRequest}; -use table::TableRef; use super::StatementExecutor; -use crate::error::{ - BuildColumnVectorsSnafu, ExecLogicalPlanSnafu, MissingTimeIndexColumnSnafu, - ReadRecordBatchSnafu, Result, UnexpectedSnafu, -}; +use crate::error::Result; impl StatementExecutor { pub async fn insert(&self, insert: Box, query_ctx: QueryContextRef) -> Result { @@ -45,178 +29,9 @@ impl StatementExecutor { .handle_statement_insert(insert.as_ref(), &query_ctx) .await } else { - // Slow path: insert with subquery. Execute the subquery first, via query engine. Then - // insert the results by sending insert requests. - - // 1. Plan the whole insert statement into a logical plan, then a wrong insert statement - // will be caught and a plan error will be returned. + // Slow path: insert with subquery. Execute using query engine. let statement = QueryStatement::Sql(Statement::Insert(insert)); - let logical_plan = self.plan(statement, query_ctx.clone()).await?; - - // 2. Execute the subquery, get the results as a record batch stream. - let dml_statement = extract_dml_statement(logical_plan)?; - ensure!( - dml_statement.op == WriteOp::Insert, - UnexpectedSnafu { - violated: "expected an INSERT plan" - } - ); - let mut stream = self - .execute_dml_subquery(&dml_statement, query_ctx.clone()) - .await?; - - // 3. Send insert requests. - let mut affected_rows = 0; - let table = self.get_table_from_dml(dml_statement, &query_ctx).await?; - let table_info = table.table_info(); - while let Some(batch) = stream.next().await { - let record_batch = batch.context(ReadRecordBatchSnafu)?; - let insert_request = - build_insert_request(record_batch, table.schema(), &table_info)?; - affected_rows += self - .inserter - .handle_table_insert(insert_request, query_ctx.clone()) - .await?; - } - - Ok(Output::AffectedRows(affected_rows)) + self.plan_exec(statement, query_ctx).await } } - - pub async fn delete(&self, delete: Box, query_ctx: QueryContextRef) -> Result { - // 1. Plan the whole delete statement into a logical plan, then a wrong delete statement - // will be caught and a plan error will be returned. - let statement = QueryStatement::Sql(Statement::Delete(delete)); - let logical_plan = self.plan(statement, query_ctx.clone()).await?; - - // 2. Execute the subquery, get the results as a record batch stream. - let dml_statement = extract_dml_statement(logical_plan)?; - ensure!( - dml_statement.op == WriteOp::Delete, - UnexpectedSnafu { - violated: "expected a DELETE plan" - } - ); - let mut stream = self - .execute_dml_subquery(&dml_statement, query_ctx.clone()) - .await?; - - // 3. Send delete requests. - let mut affected_rows = 0; - let table = self.get_table_from_dml(dml_statement, &query_ctx).await?; - let table_info = table.table_info(); - while let Some(batch) = stream.next().await { - let record_batch = batch.context(ReadRecordBatchSnafu)?; - let request = build_delete_request(record_batch, table.schema(), &table_info)?; - affected_rows += self - .deleter - .handle_table_delete(request, query_ctx.clone()) - .await?; - } - - Ok(Output::AffectedRows(affected_rows as _)) - } - - async fn execute_dml_subquery( - &self, - dml_statement: &DmlStatement, - query_ctx: QueryContextRef, - ) -> Result { - let subquery_plan = LogicalPlan::from(dml_statement.input.as_ref().clone()); - let output = self - .query_engine - .execute(subquery_plan, query_ctx) - .await - .context(ExecLogicalPlanSnafu)?; - match output { - Output::Stream(stream) => Ok(stream), - Output::RecordBatches(record_batches) => Ok(record_batches.as_stream()), - _ => UnexpectedSnafu { - violated: "expected a stream", - } - .fail(), - } - } - - async fn get_table_from_dml( - &self, - dml_statement: DmlStatement, - query_ctx: &QueryContextRef, - ) -> Result { - let default_catalog = query_ctx.current_catalog().to_owned(); - let default_schema = query_ctx.current_schema().to_owned(); - let resolved_table_ref = dml_statement - .table_name - .resolve(&default_catalog, &default_schema); - let table_ref = TableReference::full( - &resolved_table_ref.catalog, - &resolved_table_ref.schema, - &resolved_table_ref.table, - ); - self.get_table(&table_ref).await - } -} - -fn extract_dml_statement(logical_plan: LogicalPlan) -> Result { - let LogicalPlan::DfPlan(df_plan) = logical_plan; - match df_plan { - DfLogicalPlan::Dml(dml) => Ok(dml), - _ => UnexpectedSnafu { - violated: "expected a DML plan", - } - .fail(), - } -} - -fn build_insert_request( - record_batch: RecordBatch, - table_schema: SchemaRef, - table_info: &TableInfoRef, -) -> Result { - let columns_values = record_batch - .column_vectors(&table_info.name, table_schema) - .context(BuildColumnVectorsSnafu)?; - - Ok(InsertRequest { - catalog_name: table_info.catalog_name.clone(), - schema_name: table_info.schema_name.clone(), - table_name: table_info.name.clone(), - columns_values, - region_number: 0, - }) -} - -fn build_delete_request( - record_batch: RecordBatch, - table_schema: SchemaRef, - table_info: &TableInfoRef, -) -> Result { - let ts_column = table_schema - .timestamp_column() - .map(|x| x.name.clone()) - .with_context(|| table::error::MissingTimeIndexColumnSnafu { - table_name: table_info.name.clone(), - }) - .context(MissingTimeIndexColumnSnafu)?; - - let column_vectors = record_batch - .column_vectors(&table_info.name, table_schema) - .context(BuildColumnVectorsSnafu)?; - - let rowkey_columns = table_info - .meta - .row_key_column_names() - .collect::>(); - - let key_column_values = column_vectors - .into_iter() - .filter(|x| x.0 == ts_column || rowkey_columns.contains(&&x.0)) - .collect::>(); - - Ok(DeleteRequest { - catalog_name: table_info.catalog_name.clone(), - schema_name: table_info.schema_name.clone(), - table_name: table_info.name.clone(), - key_column_values, - }) } diff --git a/src/operator/src/table.rs b/src/operator/src/table.rs index 240dd010324..abfc27732bf 100644 --- a/src/operator/src/table.rs +++ b/src/operator/src/table.rs @@ -12,10 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use async_trait::async_trait; +use common_error::ext::BoxedError; +use query::error as query_error; +use query::error::Result as QueryResult; +use query::table_mutation::{AffectedRows, TableMutationHandler}; use session::context::QueryContextRef; +use snafu::ResultExt; use sqlparser::ast::ObjectName; +use table::requests::{DeleteRequest as TableDeleteRequest, InsertRequest as TableInsertRequest}; +use crate::delete::DeleterRef; use crate::error::{InvalidSqlSnafu, Result}; +use crate::insert::InserterRef; // TODO(LFC): Refactor consideration: move this function to some helper mod, // could be done together or after `TableReference`'s refactoring, when issue #559 is resolved. @@ -47,3 +56,41 @@ pub fn table_idents_to_full_name( }.fail(), } } + +pub struct TableMutationOperator { + inserter: InserterRef, + deleter: DeleterRef, +} + +impl TableMutationOperator { + pub fn new(inserter: InserterRef, deleter: DeleterRef) -> Self { + Self { inserter, deleter } + } +} + +#[async_trait] +impl TableMutationHandler for TableMutationOperator { + async fn insert( + &self, + request: TableInsertRequest, + ctx: QueryContextRef, + ) -> QueryResult { + self.inserter + .handle_table_insert(request, ctx) + .await + .map_err(BoxedError::new) + .context(query_error::TableMutationSnafu) + } + + async fn delete( + &self, + request: TableDeleteRequest, + ctx: QueryContextRef, + ) -> QueryResult { + self.deleter + .handle_table_delete(request, ctx) + .await + .map_err(BoxedError::new) + .context(query_error::TableMutationSnafu) + } +} diff --git a/src/operator/src/tests/partition_manager.rs b/src/operator/src/tests/partition_manager.rs index 415ec0f5d61..58421634ae2 100644 --- a/src/operator/src/tests/partition_manager.rs +++ b/src/operator/src/tests/partition_manager.rs @@ -435,7 +435,6 @@ fn test_meter_insert_request() { schema_name: "public".to_string(), table_name: "numbers".to_string(), columns_values: Default::default(), - region_number: 0, }; meter_insert_request!(req); diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 1357897b589..3928a630c92 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -54,8 +54,8 @@ use crate::dataframe::DataFrame; pub use crate::datafusion::planner::DfContextProviderAdapter; use crate::error::{ CatalogSnafu, CreateRecordBatchSnafu, CreateSchemaSnafu, DataFusionSnafu, - MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableNotFoundSnafu, - UnimplementedSnafu, UnsupportedExprSnafu, + MissingTableMutationHandlerSnafu, MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, + TableNotFoundSnafu, UnimplementedSnafu, UnsupportedExprSnafu, }; use crate::executor::QueryExecutor; use crate::logical_optimizer::LogicalOptimizer; @@ -115,7 +115,7 @@ impl DatafusionQueryEngine { let table = self.find_table(&table_name).await?; let output = self - .exec_query_plan(LogicalPlan::DfPlan((*dml.input).clone()), query_ctx) + .exec_query_plan(LogicalPlan::DfPlan((*dml.input).clone()), query_ctx.clone()) .await?; let mut stream = match output { Output::RecordBatches(batches) => batches.as_stream(), @@ -132,8 +132,14 @@ impl DatafusionQueryEngine { .context(QueryExecutionSnafu)?; let rows = match dml.op { - WriteOp::Insert => Self::insert(&table_name, &table, column_vectors).await?, - WriteOp::Delete => Self::delete(&table_name, &table, column_vectors).await?, + WriteOp::Insert => { + self.insert(&table_name, column_vectors, query_ctx.clone()) + .await? + } + WriteOp::Delete => { + self.delete(&table_name, &table, column_vectors, query_ctx.clone()) + .await? + } _ => unreachable!("guarded by the 'ensure!' at the beginning"), }; affected_rows += rows; @@ -142,9 +148,11 @@ impl DatafusionQueryEngine { } async fn delete<'a>( + &self, table_name: &ResolvedTableReference<'a>, table: &TableRef, column_vectors: HashMap, + query_ctx: QueryContextRef, ) -> Result { let catalog_name = table_name.catalog.to_string(); let schema_name = table_name.schema.to_string(); @@ -174,31 +182,31 @@ impl DatafusionQueryEngine { key_column_values: column_vectors, }; - table - .delete(request) + self.state + .table_mutation_handler() + .context(MissingTableMutationHandlerSnafu)? + .delete(request, query_ctx) .await - .map_err(BoxedError::new) - .context(QueryExecutionSnafu) } async fn insert<'a>( + &self, table_name: &ResolvedTableReference<'a>, - table: &TableRef, column_vectors: HashMap, + query_ctx: QueryContextRef, ) -> Result { let request = InsertRequest { catalog_name: table_name.catalog.to_string(), schema_name: table_name.schema.to_string(), table_name: table_name.table.to_string(), columns_values: column_vectors, - region_number: 0, }; - table - .insert(request) + self.state + .table_mutation_handler() + .context(MissingTableMutationHandlerSnafu)? + .insert(request, query_ctx) .await - .map_err(BoxedError::new) - .context(QueryExecutionSnafu) } async fn find_table(&self, table_name: &ResolvedTableReference<'_>) -> Result { @@ -517,7 +525,7 @@ mod tests { }; catalog_manager.register_table_sync(req).unwrap(); - QueryEngineFactory::new(catalog_manager, None, false).query_engine() + QueryEngineFactory::new(catalog_manager, None, None, false).query_engine() } #[tokio::test] diff --git a/src/query/src/error.rs b/src/query/src/error.rs index e535a4b94b5..5bbdab34726 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -254,11 +254,20 @@ pub enum Error { #[snafu(display("Column schema has no default value, column: {}", column))] ColumnSchemaNoDefault { column: String, location: Location }, - #[snafu(display("Region query error, source: {}", source))] + #[snafu(display("Region query error"))] RegionQuery { source: BoxedError, location: Location, }, + + #[snafu(display("Table mutation error"))] + TableMutation { + source: BoxedError, + location: Location, + }, + + #[snafu(display("Missing table mutation handler"))] + MissingTableMutationHandler { location: Location }, } impl ErrorExt for Error { @@ -305,7 +314,10 @@ impl ErrorExt for Error { RemoteRequest { source, .. } => source.status_code(), UnexpectedOutputKind { .. } => StatusCode::Unexpected, CreateSchema { source, .. } => source.status_code(), + RegionQuery { source, .. } => source.status_code(), + TableMutation { source, .. } => source.status_code(), + MissingTableMutationHandler { .. } => StatusCode::Unexpected, } } diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index dfdbe9a208c..54506f6c93b 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -32,6 +32,7 @@ pub mod query_engine; mod range_select; pub mod region_query; pub mod sql; +pub mod table_mutation; pub use crate::datafusion::DfContextProviderAdapter; pub use crate::query_engine::{ diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index b18834d2c8e..d88f399da31 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -38,6 +38,7 @@ use crate::planner::LogicalPlanner; pub use crate::query_engine::context::QueryEngineContext; pub use crate::query_engine::state::QueryEngineState; use crate::region_query::RegionQueryHandlerRef; +use crate::table_mutation::TableMutationHandlerRef; /// Describe statement result #[derive(Debug)] @@ -80,11 +81,13 @@ impl QueryEngineFactory { pub fn new( catalog_manager: CatalogManagerRef, region_query_handler: Option, + table_mutation_handler: Option, with_dist_planner: bool, ) -> Self { Self::new_with_plugins( catalog_manager, region_query_handler, + table_mutation_handler, with_dist_planner, Default::default(), ) @@ -93,12 +96,14 @@ impl QueryEngineFactory { pub fn new_with_plugins( catalog_manager: CatalogManagerRef, region_query_handler: Option, + table_mutation_handler: Option, with_dist_planner: bool, plugins: Arc, ) -> Self { let state = Arc::new(QueryEngineState::new( catalog_manager, region_query_handler, + table_mutation_handler, with_dist_planner, plugins.clone(), )); @@ -131,7 +136,7 @@ mod tests { #[test] fn test_query_engine_factory() { let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap(); - let factory = QueryEngineFactory::new(catalog_list, None, false); + let factory = QueryEngineFactory::new(catalog_list, None, None, false); let engine = factory.query_engine(); diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 93916db8610..e882a6811e8 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -48,6 +48,7 @@ use crate::optimizer::type_conversion::TypeConversionRule; use crate::query_engine::options::QueryOptions; use crate::range_select::planner::RangeSelectPlanner; use crate::region_query::RegionQueryHandlerRef; +use crate::table_mutation::TableMutationHandlerRef; /// Query engine global state // TODO(yingwen): This QueryEngineState still relies on datafusion, maybe we can define a trait for it, @@ -57,6 +58,7 @@ use crate::region_query::RegionQueryHandlerRef; pub struct QueryEngineState { df_context: SessionContext, catalog_manager: CatalogManagerRef, + table_mutation_handler: Option, aggregate_functions: Arc>>, plugins: Arc, } @@ -73,6 +75,7 @@ impl QueryEngineState { pub fn new( catalog_list: CatalogManagerRef, region_query_handler: Option, + table_mutation_handler: Option, with_dist_planner: bool, plugins: Arc, ) -> Self { @@ -123,6 +126,7 @@ impl QueryEngineState { Self { df_context, catalog_manager: catalog_list, + table_mutation_handler, aggregate_functions: Arc::new(RwLock::new(HashMap::new())), plugins, } @@ -184,6 +188,11 @@ impl QueryEngineState { &self.catalog_manager } + #[inline] + pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> { + self.table_mutation_handler.as_ref() + } + pub(crate) fn disallow_cross_schema_query(&self) -> bool { self.plugins .map::(|x| x.disallow_cross_schema_query) diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index 8842b6dd367..e7ae66bac8b 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -388,7 +388,7 @@ mod test { table, }) .is_ok()); - QueryEngineFactory::new(catalog_list, None, false).query_engine() + QueryEngineFactory::new(catalog_list, None, None, false).query_engine() } async fn query_plan_compare(sql: &str, expected: String) { diff --git a/src/query/src/region_query.rs b/src/query/src/region_query.rs index ef6e494dfc8..f9861103e62 100644 --- a/src/query/src/region_query.rs +++ b/src/query/src/region_query.rs @@ -22,7 +22,6 @@ use crate::error::Result; #[async_trait] pub trait RegionQueryHandler: Send + Sync { - // TODO(ruihang): add trace id and span id in the request. async fn do_get(&self, request: QueryRequest) -> Result; } diff --git a/src/query/src/table_mutation.rs b/src/query/src/table_mutation.rs new file mode 100644 index 00000000000..bff93af9368 --- /dev/null +++ b/src/query/src/table_mutation.rs @@ -0,0 +1,35 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use session::context::QueryContextRef; +use table::requests::{DeleteRequest, InsertRequest}; + +use crate::error::Result; + +pub type AffectedRows = usize; + +/// A trait for handling table mutations in `QueryEngine`. +#[async_trait] +pub trait TableMutationHandler: Send + Sync { + /// Inserts rows into the table. + async fn insert(&self, request: InsertRequest, ctx: QueryContextRef) -> Result; + + /// Delete rows from the table. + async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result; +} + +pub type TableMutationHandlerRef = Arc; diff --git a/src/query/src/tests.rs b/src/query/src/tests.rs index 4a60f925ce0..14e62a2d59c 100644 --- a/src/query/src/tests.rs +++ b/src/query/src/tests.rs @@ -51,5 +51,5 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec { pub fn new_query_engine_with_table(table: TableRef) -> QueryEngineRef { let catalog_manager = MemoryCatalogManager::new_with_table(table); - QueryEngineFactory::new(catalog_manager, None, false).query_engine() + QueryEngineFactory::new(catalog_manager, None, None, false).query_engine() } diff --git a/src/query/src/tests/query_engine_test.rs b/src/query/src/tests/query_engine_test.rs index 41e95d3de1e..71894eea47c 100644 --- a/src/query/src/tests/query_engine_test.rs +++ b/src/query/src/tests/query_engine_test.rs @@ -47,7 +47,7 @@ async fn test_datafusion_query_engine() -> Result<()> { let catalog_list = catalog::memory::new_memory_catalog_manager() .map_err(BoxedError::new) .context(QueryExecutionSnafu)?; - let factory = QueryEngineFactory::new(catalog_list, None, false); + let factory = QueryEngineFactory::new(catalog_list, None, None, false); let engine = factory.query_engine(); let column_schemas = vec![ColumnSchema::new( @@ -129,7 +129,7 @@ async fn test_query_validate() -> Result<()> { }); let plugins = Arc::new(plugins); - let factory = QueryEngineFactory::new_with_plugins(catalog_list, None, false, plugins); + let factory = QueryEngineFactory::new_with_plugins(catalog_list, None, None, false, plugins); let engine = factory.query_engine(); let stmt = QueryLanguageParser::parse_sql("select number from public.numbers").unwrap(); @@ -153,7 +153,7 @@ async fn test_udf() -> Result<()> { common_telemetry::init_default_ut_logging(); let catalog_list = catalog_manager()?; - let factory = QueryEngineFactory::new(catalog_list, None, false); + let factory = QueryEngineFactory::new(catalog_list, None, None, false); let engine = factory.query_engine(); let pow = make_scalar_function(pow); diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index 17c894b5459..dfbcc3a13a7 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -106,7 +106,7 @@ fn create_test_engine() -> TimeRangeTester { }; let _ = catalog_manager.register_table_sync(req).unwrap(); - let engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine(); + let engine = QueryEngineFactory::new(catalog_manager, None, None, false).query_engine(); TimeRangeTester { engine, filter } } diff --git a/src/script/benches/py_benchmark.rs b/src/script/benches/py_benchmark.rs index f4827a18ad7..27d8ef07ce1 100644 --- a/src/script/benches/py_benchmark.rs +++ b/src/script/benches/py_benchmark.rs @@ -52,7 +52,7 @@ where pub(crate) fn sample_script_engine() -> PyEngine { let catalog_manager = MemoryCatalogManager::new_with_table(NumbersTable::table(NUMBERS_TABLE_ID)); - let query_engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine(); + let query_engine = QueryEngineFactory::new(catalog_manager, None, None, false).query_engine(); PyEngine::new(query_engine.clone()) } diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index 5ab4f06298b..61f603232a7 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -375,7 +375,8 @@ mod tests { pub(crate) fn sample_script_engine() -> PyEngine { let catalog_manager = MemoryCatalogManager::new_with_table(NumbersTable::table(NUMBERS_TABLE_ID)); - let query_engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine(); + let query_engine = + QueryEngineFactory::new(catalog_manager, None, None, false).query_engine(); PyEngine::new(query_engine.clone()) } diff --git a/src/script/src/test.rs b/src/script/src/test.rs index c937c4afd21..55ba73f582a 100644 --- a/src/script/src/test.rs +++ b/src/script/src/test.rs @@ -56,7 +56,7 @@ pub async fn setup_scripts_manager( let catalog_manager = MemoryCatalogManager::new_with_table(table.clone()); - let factory = QueryEngineFactory::new(catalog_manager.clone(), None, false); + let factory = QueryEngineFactory::new(catalog_manager.clone(), None, None, false); let query_engine = factory.query_engine(); let mgr = ScriptManager::new(Arc::new(MockGrpcQueryHandler {}) as _, query_engine) .await diff --git a/src/servers/src/line_writer.rs b/src/servers/src/line_writer.rs index 7da1de6bb8d..38ebe218c7a 100644 --- a/src/servers/src/line_writer.rs +++ b/src/servers/src/line_writer.rs @@ -141,7 +141,6 @@ impl LineWriter { schema_name: self.db, table_name: self.table_name, columns_values, - region_number: 0, // TODO(hl): Check if assign 0 region is ok? } } } diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 4ef156cb043..bb2a79818c2 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -214,7 +214,7 @@ impl GrpcQueryHandler for DummyInstance { fn create_testing_instance(table: TableRef) -> DummyInstance { let catalog_manager = MemoryCatalogManager::new_with_table(table); - let query_engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine(); + let query_engine = QueryEngineFactory::new(catalog_manager, None, None, false).query_engine(); DummyInstance::new(query_engine) } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 9fcb7f8db7e..9d78f21534f 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -246,7 +246,6 @@ pub struct InsertRequest { pub schema_name: String, pub table_name: String, pub columns_values: HashMap, - pub region_number: RegionNumber, } /// Delete (by primary key) request @@ -327,7 +326,6 @@ macro_rules! meter_insert_request { $req.catalog_name.to_string(), $req.schema_name.to_string(), $req.table_name.to_string(), - $req.region_number, $req ); }; diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 8af3cfd1ffe..d2ea65cf840 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -24,14 +24,10 @@ use async_trait::async_trait; use common_query::logical_plan::Expr; use common_recordbatch::SendableRecordBatchStream; use datatypes::schema::SchemaRef; -use store_api::storage::{RegionNumber, ScanRequest}; +use store_api::storage::ScanRequest; -use crate::error::{Result, UnsupportedSnafu}; +use crate::error::Result; use crate::metadata::{FilterPushDownType, TableId, TableInfoRef, TableType}; -use crate::requests::{AlterTableRequest, DeleteRequest, InsertRequest}; -use crate::stats::TableStatistics; - -pub type AlterContext = anymap::Map; /// Table abstraction. #[async_trait] @@ -49,16 +45,6 @@ pub trait Table: Send + Sync { /// Get the type of this table for metadata/catalog purposes. fn table_type(&self) -> TableType; - /// Insert values into table. - /// - /// Returns number of inserted rows. - async fn insert(&self, _request: InsertRequest) -> Result { - UnsupportedSnafu { - operation: "INSERT", - } - .fail()? - } - async fn scan_to_stream(&self, request: ScanRequest) -> Result; /// Tests whether the table provider can make use of any or all filter expressions @@ -66,67 +52,6 @@ pub trait Table: Send + Sync { fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { Ok(vec![FilterPushDownType::Unsupported; filters.len()]) } - - /// Alter table. - async fn alter(&self, _context: AlterContext, _request: &AlterTableRequest) -> Result<()> { - UnsupportedSnafu { - operation: "ALTER TABLE", - } - .fail()? - } - - /// Delete rows in the table. - /// - /// Returns number of deleted rows. - async fn delete(&self, _request: DeleteRequest) -> Result { - UnsupportedSnafu { - operation: "DELETE", - } - .fail()? - } - - /// Flush table. - /// - /// Options: - /// - region_number: specify region to flush. - /// - wait: Whether to wait until flush is done. - async fn flush(&self, region_number: Option, wait: Option) -> Result<()> { - let _ = (region_number, wait); - UnsupportedSnafu { operation: "FLUSH" }.fail()? - } - - /// Close the table. - async fn close(&self, _regions: &[RegionNumber]) -> Result<()> { - Ok(()) - } - - /// Return true if contains the region - fn contains_region(&self, _region: RegionNumber) -> Result { - UnsupportedSnafu { - operation: "contain_region", - } - .fail()? - } - - /// Get statistics for this table, if available - fn statistics(&self) -> Option { - None - } - - async fn compact(&self, region_number: Option, wait: Option) -> Result<()> { - let _ = (region_number, wait); - UnsupportedSnafu { - operation: "COMPACTION", - } - .fail()? - } - - async fn truncate(&self) -> Result<()> { - UnsupportedSnafu { - operation: "TRUNCATE", - } - .fail()? - } } pub type TableRef = Arc;