Skip to content

Commit

Permalink
feat: re-support query engine execute dml (#2484)
Browse files Browse the repository at this point in the history
* feat: re-support query engine execute dml

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: remove region_number in InsertRequest

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: add doc comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
  • Loading branch information
zhongzc committed Sep 26, 2023
1 parent 230a302 commit 0bf2664
Show file tree
Hide file tree
Showing 28 changed files with 190 additions and 334 deletions.
1 change: 1 addition & 0 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
let state = Arc::new(QueryEngineState::new(
catalog_list,
None,
None,
false,
plugins.clone(),
));
Expand Down
1 change: 1 addition & 0 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ impl DatanodeBuilder {
// query engine in datanode only executes plan with resolved table source.
MemoryCatalogManager::with_default_setup(),
None,
None,
false,
plugins,
);
Expand Down
57 changes: 32 additions & 25 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use meta_client::client::{MetaClient, MetaClientBuilder};
use operator::delete::{Deleter, DeleterRef};
use operator::insert::{Inserter, InserterRef};
use operator::statement::StatementExecutor;
use operator::table::table_idents_to_full_name;
use operator::table::{table_idents_to_full_name, TableMutationOperator};
use partition::manager::PartitionRuleManager;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::plan::LogicalPlan;
Expand Down Expand Up @@ -163,14 +163,6 @@ impl Instance {
catalog_manager.datanode_manager().clone(),
);

let query_engine = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
Some(region_query_handler.clone()),
true,
plugins.clone(),
)
.query_engine();

let inserter = Arc::new(Inserter::new(
catalog_manager.clone(),
partition_manager.clone(),
Expand All @@ -182,14 +174,27 @@ impl Instance {
datanode_clients,
));

let table_mutation_handler = Arc::new(TableMutationOperator::new(
inserter.clone(),
deleter.clone(),
));

let query_engine = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
Some(region_query_handler.clone()),
Some(table_mutation_handler),
true,
plugins.clone(),
)
.query_engine();

let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),
query_engine.clone(),
meta_client.clone(),
meta_backend.clone(),
catalog_manager.clone(),
inserter.clone(),
deleter.clone(),
));

plugins.insert::<StatementExecutorRef>(statement_executor.clone());
Expand Down Expand Up @@ -301,9 +306,25 @@ impl Instance {
let region_query_handler =
FrontendRegionQueryHandler::arc(partition_manager.clone(), datanode_manager.clone());

let inserter = Arc::new(Inserter::new(
catalog_manager.clone(),
partition_manager.clone(),
datanode_manager.clone(),
));
let deleter = Arc::new(Deleter::new(
catalog_manager.clone(),
partition_manager,
datanode_manager.clone(),
));
let table_mutation_handler = Arc::new(TableMutationOperator::new(
inserter.clone(),
deleter.clone(),
));

let query_engine = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
Some(region_query_handler),
Some(table_mutation_handler),
true,
plugins.clone(),
)
Expand All @@ -317,33 +338,19 @@ impl Instance {
let cache_invalidator = Arc::new(DummyCacheInvalidator);
let ddl_executor = Arc::new(DdlManager::new(
procedure_manager,
datanode_manager.clone(),
datanode_manager,
cache_invalidator.clone(),
table_metadata_manager.clone(),
Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())),
));

let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone()));

let inserter = Arc::new(Inserter::new(
catalog_manager.clone(),
partition_manager.clone(),
datanode_manager.clone(),
));
let deleter = Arc::new(Deleter::new(
catalog_manager.clone(),
partition_manager,
datanode_manager,
));

let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),
query_engine.clone(),
ddl_executor,
kv_backend.clone(),
cache_invalidator,
inserter.clone(),
deleter.clone(),
));

Ok(Instance {
Expand Down
6 changes: 4 additions & 2 deletions src/operator/src/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl Deleter {
&self,
request: TableDeleteRequest,
ctx: QueryContextRef,
) -> Result<AffectedRows> {
) -> Result<usize> {
let catalog = request.catalog_name.as_str();
let schema = request.schema_name.as_str();
let table = request.table_name.as_str();
Expand All @@ -108,7 +108,9 @@ impl Deleter {
let deletes = TableToRegion::new(&table_info, &self.partition_manager)
.convert(request)
.await?;
self.do_request(deletes, ctx.trace_id(), 0).await

let affected_rows = self.do_request(deletes, ctx.trace_id(), 0).await?;
Ok(affected_rows as _)
}
}

Expand Down
1 change: 0 additions & 1 deletion src/operator/src/req_convert/insert/table_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ mod tests {
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table_1".to_string(),
columns_values: HashMap::from([("a".to_string(), vector)]),
region_number: 0,
}
}

Expand Down
8 changes: 1 addition & 7 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ use table::engine::TableReference;
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
use table::TableRef;

use crate::delete::DeleterRef;
use crate::error::{
self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, PlanStatementSnafu,
Result, TableNotFoundSnafu,
Expand All @@ -66,7 +65,6 @@ pub struct StatementExecutor {
partition_manager: PartitionRuleManagerRef,
cache_invalidator: CacheInvalidatorRef,
inserter: InserterRef,
deleter: DeleterRef,
}

impl StatementExecutor {
Expand All @@ -77,7 +75,6 @@ impl StatementExecutor {
kv_backend: KvBackendRef,
cache_invalidator: CacheInvalidatorRef,
inserter: InserterRef,
deleter: DeleterRef,
) -> Self {
Self {
catalog_manager,
Expand All @@ -87,7 +84,6 @@ impl StatementExecutor {
partition_manager: Arc::new(PartitionRuleManager::new(kv_backend)),
cache_invalidator,
inserter,
deleter,
}
}

Expand All @@ -104,14 +100,12 @@ impl StatementExecutor {

pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
match stmt {
Statement::Query(_) | Statement::Explain(_) => {
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
}

Statement::Insert(insert) => self.insert(insert, query_ctx).await,

Statement::Delete(delete) => self.delete(delete, query_ctx).await,

Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,

Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
Expand Down
1 change: 0 additions & 1 deletion src/operator/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ impl StatementExecutor {
schema_name: req.schema_name.to_string(),
table_name: req.table_name.to_string(),
columns_values,
region_number: 0,
},
query_ctx.clone(),
));
Expand Down
Loading

0 comments on commit 0bf2664

Please sign in to comment.