diff --git a/Cargo.lock b/Cargo.lock index 15bc482a3c203..9bbdeb02c1cc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9197,9 +9197,9 @@ dependencies = [ [[package]] name = "unicode-segmentation" -version = "1.10.0" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fdbf052a0783de01e944a6ce7a8cb939e295b1e7be835a1112c3b9a7f047a5a" +checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" [[package]] name = "unicode-width" diff --git a/src/binaries/query/local.rs b/src/binaries/query/local.rs index 00715f5c5330d..1c625174e7599 100644 --- a/src/binaries/query/local.rs +++ b/src/binaries/query/local.rs @@ -44,7 +44,7 @@ pub async fn query_local(conf: &InnerConfig) -> Result<()> { .await?; let ctx = session.create_query_context().await?; let mut planner = Planner::new(ctx.clone()); - let (plan, _, _) = planner.plan_sql(&sql).await?; + let (plan, _) = planner.plan_sql(&sql).await?; let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?; let stream = interpreter.execute(ctx.clone()).await?; let blocks = stream.map(|v| v).collect::>().await; diff --git a/src/query/ast/src/ast/statements/copy.rs b/src/query/ast/src/ast/statements/copy.rs index 7361e787afa16..5ca9301fe0766 100644 --- a/src/query/ast/src/ast/statements/copy.rs +++ b/src/query/ast/src/ast/statements/copy.rs @@ -202,6 +202,17 @@ impl Connection { } } + pub fn mask(&self) -> Self { + let mut conns = BTreeMap::new(); + for k in self.conns.keys() { + conns.insert(k.to_string(), "********".to_string()); + } + Self { + visited_keys: self.visited_keys.clone(), + conns, + } + } + pub fn get(&mut self, key: &str) -> Option<&String> { self.visited_keys.insert(key.to_string()); self.conns.get(key) diff --git a/src/query/ast/src/ast/statements/statement.rs b/src/query/ast/src/ast/statements/statement.rs index 0def2c9eeb812..8d43ca097b063 100644 --- a/src/query/ast/src/ast/statements/statement.rs +++ b/src/query/ast/src/ast/statements/statement.rs @@ -212,6 +212,33 @@ pub struct StatementMsg { pub(crate) format: Option, } +impl Statement { + pub fn to_mask_sql(&self) -> String { + match self { + Statement::Copy(copy) => { + let mut copy_clone = copy.clone(); + + if let CopyUnit::UriLocation(location) = &mut copy_clone.src { + location.connection = location.connection.mask() + } + + if let CopyUnit::UriLocation(location) = &mut copy_clone.dst { + location.connection = location.connection.mask() + } + format!("{}", Statement::Copy(copy_clone)) + } + Statement::CreateStage(stage) => { + let mut stage_clone = stage.clone(); + if let Some(location) = &mut stage_clone.location { + location.connection = location.connection.mask() + } + format!("{}", Statement::CreateStage(stage_clone)) + } + _ => format!("{}", self), + } + } +} + impl Display for Statement { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 3fb7b1f4be4cd..b536a2c14a98c 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -93,7 +93,7 @@ pub trait TableContext: Send + Sync { fn get_cacheable(&self) -> bool; fn set_cacheable(&self, cacheable: bool); - fn attach_query_str(&self, kind: String, query: &str); + fn attach_query_str(&self, kind: String, query: String); fn get_query_str(&self) -> String; fn get_fragment_id(&self) -> usize; diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index d646fef06239d..22b3837464349 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -140,7 +140,7 @@ tokio-stream = { version = "0.1.10", features = ["net"] } tonic = "0.8.1" tracing = "0.1.36" typetag = "0.2.3" -unicode-segmentation = "1.10.0" +unicode-segmentation = "1.10.1" uuid = { version = "1.1.2", features = ["serde", "v4"] } [dev-dependencies] diff --git a/src/query/service/src/interpreters/interpreter_table_describe.rs b/src/query/service/src/interpreters/interpreter_table_describe.rs index afc92c4d36644..4e6ab019b06e8 100644 --- a/src/query/service/src/interpreters/interpreter_table_describe.rs +++ b/src/query/service/src/interpreters/interpreter_table_describe.rs @@ -63,7 +63,7 @@ impl Interpreter for DescribeTableInterpreter { let schema = if tbl_info.engine() == VIEW_ENGINE { if let Some(query) = tbl_info.options().get(QUERY) { let mut planner = Planner::new(self.ctx.clone()); - let (plan, _, _) = planner.plan_sql(query).await?; + let (plan, _) = planner.plan_sql(query).await?; infer_table_schema(&plan.schema()) } else { return Err(ErrorCode::Internal( diff --git a/src/query/service/src/interpreters/interpreter_view_alter.rs b/src/query/service/src/interpreters/interpreter_view_alter.rs index a2ae81fd99637..6df23474bb3c3 100644 --- a/src/query/service/src/interpreters/interpreter_view_alter.rs +++ b/src/query/service/src/interpreters/interpreter_view_alter.rs @@ -94,7 +94,7 @@ impl AlterViewInterpreter { self.plan.subquery.clone() } else { let mut planner = Planner::new(self.ctx.clone()); - let (plan, _, _) = planner.plan_sql(&self.plan.subquery.clone()).await?; + let (plan, _) = planner.plan_sql(&self.plan.subquery.clone()).await?; if plan.schema().fields().len() != self.plan.column_names.len() { return Err(ErrorCode::BadDataArrayLength(format!( "column name length mismatch, expect {}, got {}", diff --git a/src/query/service/src/interpreters/interpreter_view_create.rs b/src/query/service/src/interpreters/interpreter_view_create.rs index 3fbc319245040..63131d7ea24b8 100644 --- a/src/query/service/src/interpreters/interpreter_view_create.rs +++ b/src/query/service/src/interpreters/interpreter_view_create.rs @@ -76,7 +76,7 @@ impl CreateViewInterpreter { self.plan.subquery.clone() } else { let mut planner = Planner::new(self.ctx.clone()); - let (plan, _, _) = planner.plan_sql(&self.plan.subquery.clone()).await?; + let (plan, _) = planner.plan_sql(&self.plan.subquery.clone()).await?; if plan.schema().fields().len() != self.plan.column_names.len() { return Err(ErrorCode::BadDataArrayLength(format!( "column name length mismatch, expect {}, got {}", diff --git a/src/query/service/src/procedures/systems/search_tables.rs b/src/query/service/src/procedures/systems/search_tables.rs index 454aafb75ac21..fde0a626bd26d 100644 --- a/src/query/service/src/procedures/systems/search_tables.rs +++ b/src/query/service/src/procedures/systems/search_tables.rs @@ -56,7 +56,7 @@ impl OneBlockProcedure for SearchTablesProcedure { args[0] ); let mut planner = Planner::new(ctx.clone()); - let (plan, _, _) = planner.plan_sql(&query).await?; + let (plan, _) = planner.plan_sql(&query).await?; let stream = if let Plan::Query { s_expr, diff --git a/src/query/service/src/servers/http/clickhouse_handler.rs b/src/query/service/src/servers/http/clickhouse_handler.rs index d124c7fe2f947..6347f5a9a5295 100644 --- a/src/query/service/src/servers/http/clickhouse_handler.rs +++ b/src/query/service/src/servers/http/clickhouse_handler.rs @@ -239,14 +239,14 @@ pub async fn clickhouse_handler_get( let default_format = get_default_format(¶ms, headers).map_err(BadRequest)?; let sql = params.query(); let mut planner = Planner::new(context.clone()); - let (plan, _, fmt) = planner + let (plan, extras) = planner .plan_sql(&sql) .await .map_err(|err| err.display_with_sql(&sql)) .map_err(BadRequest)?; - let format = get_format_with_default(fmt, default_format)?; + let format = get_format_with_default(extras.format, default_format)?; - context.attach_query_str(plan.to_string(), &sql); + context.attach_query_str(plan.to_string(), extras.stament.to_mask_sql()); let interpreter = InterpreterFactory::get(context.clone(), &plan) .await .map_err(|err| err.display_with_sql(&sql)) @@ -298,13 +298,13 @@ pub async fn clickhouse_handler_post( info!("receive clickhouse http post, (query + body) = {}", &msg); let mut planner = Planner::new(ctx.clone()); - let (mut plan, _, fmt) = planner + let (mut plan, extras) = planner .plan_sql(&sql) .await .map_err(|err| err.display_with_sql(&sql)) .map_err(BadRequest)?; let schema = plan.schema(); - ctx.attach_query_str(plan.to_string(), &sql); + ctx.attach_query_str(plan.to_string(), extras.stament.to_mask_sql()); let mut handle = None; if let Plan::Insert(insert) = &mut plan { if let InsertInputSource::StreamingWithFormat(format, start, input_context_ref) = @@ -404,7 +404,7 @@ pub async fn clickhouse_handler_post( } }; - let format = get_format_with_default(fmt, default_format)?; + let format = get_format_with_default(extras.format, default_format)?; let interpreter = InterpreterFactory::get(ctx.clone(), &plan) .await .map_err(|err| err.display_with_sql(&sql)) diff --git a/src/query/service/src/servers/http/v1/load.rs b/src/query/service/src/servers/http/v1/load.rs index dfb5958a534ee..fe51252b1bd9d 100644 --- a/src/query/service/src/servers/http/v1/load.rs +++ b/src/query/service/src/servers/http/v1/load.rs @@ -115,12 +115,12 @@ pub async fn streaming_load( } let mut planner = Planner::new(context.clone()); - let (mut plan, _, _) = planner + let (mut plan, extras) = planner .plan_sql(insert_sql) .await .map_err(|err| err.display_with_sql(insert_sql)) .map_err(InternalServerError)?; - context.attach_query_str(plan.to_string(), insert_sql); + context.attach_query_str(plan.to_string(), extras.stament.to_mask_sql()); let schema = plan.schema(); match &mut plan { diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index 9918ec531929b..fe66ff4888eaa 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -206,7 +206,7 @@ impl Executor { impl ExecuteState { pub(crate) async fn get_schema(sql: &str, ctx: Arc) -> Result { let mut planner = Planner::new(ctx.clone()); - let (plan, _, _) = planner.plan_sql(sql).await?; + let (plan, _) = planner.plan_sql(sql).await?; Ok(InterpreterFactory::get_schema(ctx, &plan)) } @@ -218,8 +218,8 @@ impl ExecuteState { block_sender: SizedChannelSender, ) -> Result<()> { let mut planner = Planner::new(ctx.clone()); - let (plan, _, _) = planner.plan_sql(sql).await?; - ctx.attach_query_str(plan.to_string(), sql); + let (plan, extras) = planner.plan_sql(sql).await?; + ctx.attach_query_str(plan.to_string(), extras.stament.to_mask_sql()); let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?; let running_state = ExecuteRunning { diff --git a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs index 62d69271a1005..c7d54db1500d1 100644 --- a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs +++ b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs @@ -343,9 +343,9 @@ impl InteractiveWorkerBase { let context = self.session.create_query_context().await?; let mut planner = Planner::new(context.clone()); - let (plan, _, _) = planner.plan_sql(query).await?; + let (plan, extras) = planner.plan_sql(query).await?; - context.attach_query_str(plan.to_string(), query); + context.attach_query_str(plan.to_string(), extras.stament.to_mask_sql()); let interpreter = InterpreterFactory::get(context.clone(), &plan).await; let has_result_set = has_result_set_by_plan(&plan); diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index df884a6b2b2fe..41360080e943a 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -317,7 +317,7 @@ impl TableContext for QueryContext { self.shared.cacheable.store(cacheable, Ordering::Release); } - fn attach_query_str(&self, kind: String, query: &str) { + fn attach_query_str(&self, kind: String, query: String) { self.shared.attach_query_str(kind, query); } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 543bb323b7888..4cedba96d838d 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -270,7 +270,7 @@ impl QueryContextShared { } } - pub fn attach_query_str(&self, kind: String, query: &str) { + pub fn attach_query_str(&self, kind: String, query: String) { { let mut running_query = self.running_query.write(); *running_query = Some(short_sql(query)); @@ -357,9 +357,9 @@ impl Drop for QueryContextShared { } } -pub fn short_sql(query: &str) -> String { +pub fn short_sql(sql: String) -> String { use unicode_segmentation::UnicodeSegmentation; - let query = query.trim_start(); + let query = sql.trim_start(); if query.len() >= 64 && query[..6].eq_ignore_ascii_case("INSERT") { // keep first 64 graphemes String::from_utf8( @@ -373,6 +373,6 @@ pub fn short_sql(query: &str) -> String { ) .unwrap() // by construction, this cannot panic as we extracted unicode grapheme } else { - query.to_string() + sql } } diff --git a/src/query/service/tests/it/api/http/status.rs b/src/query/service/tests/it/api/http/status.rs index 663edf6d910c0..dc420646496db 100644 --- a/src/query/service/tests/it/api/http/status.rs +++ b/src/query/service/tests/it/api/http/status.rs @@ -67,8 +67,8 @@ async fn run_query(query_ctx: &Arc) -> Result .set_authed_user(user, None) .await?; let mut planner = Planner::new(query_ctx.clone()); - let (plan, _, _) = planner.plan_sql(sql).await?; - query_ctx.attach_query_str(plan.to_string(), sql); + let (plan, extras) = planner.plan_sql(sql).await?; + query_ctx.attach_query_str(plan.to_string(), extras.stament.to_mask_sql()); InterpreterFactory::get(query_ctx.clone(), &plan).await } diff --git a/src/query/service/tests/it/storages/fuse/operations/alter_table.rs b/src/query/service/tests/it/storages/fuse/operations/alter_table.rs index b9c6d6f29c6d1..f3ed67f7bb8a3 100644 --- a/src/query/service/tests/it/storages/fuse/operations/alter_table.rs +++ b/src/query/service/tests/it/storages/fuse/operations/alter_table.rs @@ -238,7 +238,7 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> { // do compact let query = format!("optimize table {db_name}.{tbl_name} compact"); let mut planner = Planner::new(ctx.clone()); - let (plan, _, _) = planner.plan_sql(&query).await?; + let (plan, _) = planner.plan_sql(&query).await?; let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?; ctx.get_settings().set_max_threads(1)?; let data_stream = interpreter.execute(ctx.clone()).await?; diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 809e92926ced7..51371cbe9b543 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -379,7 +379,7 @@ impl TableContext for CtxDelegation { todo!() } - fn attach_query_str(&self, _kind: String, _query: &str) { + fn attach_query_str(&self, _kind: String, _query: String) { todo!() } diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/deletion.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/deletion.rs index 369ca9109c99b..b146b7514d93b 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/deletion.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/deletion.rs @@ -53,7 +53,7 @@ async fn test_deletion_mutator_multiple_empty_segments() -> Result<()> { // delete let query = format!("delete from {}.{} where id=1", db_name, tbl_name); let mut planner = Planner::new(ctx.clone()); - let (plan, _, _) = planner.plan_sql(&query).await?; + let (plan, _) = planner.plan_sql(&query).await?; if let Plan::Delete(delete) = plan { do_deletion(ctx.clone(), table.clone(), *delete).await?; } diff --git a/src/query/service/tests/it/storages/fuse/operations/optimize.rs b/src/query/service/tests/it/storages/fuse/operations/optimize.rs index 710fc41f764b1..5afce019c98e6 100644 --- a/src/query/service/tests/it/storages/fuse/operations/optimize.rs +++ b/src/query/service/tests/it/storages/fuse/operations/optimize.rs @@ -84,7 +84,7 @@ async fn test_fuse_table_optimize() -> Result<()> { let query = format!("optimize table {db_name}.{tbl_name} compact"); let mut planner = Planner::new(ctx.clone()); - let (plan, _, _) = planner.plan_sql(&query).await?; + let (plan, _) = planner.plan_sql(&query).await?; let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?; // `PipelineBuilder` will parallelize the table reading according to value of setting `max_threads`, diff --git a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs index aeffb2f3021f3..a23558d284f90 100644 --- a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs +++ b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs @@ -75,7 +75,7 @@ async fn test_table_modify_column_ndv_statistics() -> Result<()> { // delete let query = "delete from default.t where c=1"; let mut planner = Planner::new(ctx.clone()); - let (plan, _, _) = planner.plan_sql(query).await?; + let (plan, _) = planner.plan_sql(query).await?; if let Plan::Delete(delete) = plan { do_deletion(ctx.clone(), table.clone(), *delete).await?; } diff --git a/src/query/service/tests/it/storages/fuse/table_test_fixture.rs b/src/query/service/tests/it/storages/fuse/table_test_fixture.rs index 7664a8d94e629..33a743bdd8776 100644 --- a/src/query/service/tests/it/storages/fuse/table_test_fixture.rs +++ b/src/query/service/tests/it/storages/fuse/table_test_fixture.rs @@ -369,7 +369,7 @@ pub async fn expects_ok( pub async fn execute_query(ctx: Arc, query: &str) -> Result { let mut planner = Planner::new(ctx.clone()); - let (plan, _, _) = planner.plan_sql(query).await?; + let (plan, _) = planner.plan_sql(query).await?; let executor = InterpreterFactory::get(ctx.clone(), &plan).await?; executor.execute(ctx.clone()).await } diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 086ae2140dbec..364318cbdd566 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -925,7 +925,7 @@ impl Binder { if table.engine() == VIEW_ENGINE { let query = table.get_table_info().options().get(QUERY).unwrap(); let mut planner = Planner::new(self.ctx.clone()); - let (plan, _, _) = planner.plan_sql(query).await?; + let (plan, _) = planner.plan_sql(query).await?; Ok((infer_table_schema(&plan.schema())?, vec![], vec![])) } else { Ok((table.schema(), vec![], table.field_comments().clone())) diff --git a/src/query/sql/src/planner/mod.rs b/src/query/sql/src/planner/mod.rs index 540816e2c24f4..bd4723d79ff05 100644 --- a/src/query/sql/src/planner/mod.rs +++ b/src/query/sql/src/planner/mod.rs @@ -33,6 +33,7 @@ pub use binder::SelectBuilder; pub use binder::Visibility; pub use expression_parser::*; pub use metadata::*; +pub use planner::PlanExtras; pub use planner::Planner; pub use plans::ScalarExpr; pub use semantic::normalize_identifier; diff --git a/src/query/sql/src/planner/planner.rs b/src/query/sql/src/planner/planner.rs index b2ca16c05395c..90a9cb1ab19db 100644 --- a/src/query/sql/src/planner/planner.rs +++ b/src/query/sql/src/planner/planner.rs @@ -47,12 +47,19 @@ pub struct Planner { ctx: Arc, } +#[derive(Debug, Clone)] +pub struct PlanExtras { + pub metadata: MetadataRef, + pub format: Option, + pub stament: Statement, +} + impl Planner { pub fn new(ctx: Arc) -> Self { Planner { ctx } } - pub async fn plan_sql(&mut self, sql: &str) -> Result<(Plan, MetadataRef, Option)> { + pub async fn plan_sql(&mut self, sql: &str) -> Result<(Plan, PlanExtras)> { let settings = self.ctx.get_settings(); let sql_dialect = settings.get_sql_dialect()?; @@ -103,7 +110,11 @@ impl Planner { })); let optimized_plan = optimize(self.ctx.clone(), opt_ctx, plan)?; - Ok((optimized_plan, metadata.clone(), format)) + Ok((optimized_plan, PlanExtras { + metadata, + format, + stament: stmt, + })) } .await; diff --git a/src/query/storages/system/src/columns_table.rs b/src/query/storages/system/src/columns_table.rs index 90164e03da6f9..64d8274b0ddcc 100644 --- a/src/query/storages/system/src/columns_table.rs +++ b/src/query/storages/system/src/columns_table.rs @@ -146,7 +146,7 @@ impl ColumnsTable { let fields = if table.engine() == VIEW_ENGINE { if let Some(query) = table.options().get(QUERY) { let mut planner = Planner::new(ctx.clone()); - let (plan, _, _) = planner.plan_sql(query).await?; + let (plan, _) = planner.plan_sql(query).await?; let schema = infer_table_schema(&plan.schema())?; schema.fields().clone() } else { diff --git a/tests/suites/1_stateful/02_query/02_0000_kill_query.py b/tests/suites/1_stateful/02_query/02_0000_kill_query.py index 6d08a1eac1d78..ee211a4d749b8 100755 --- a/tests/suites/1_stateful/02_query/02_0000_kill_query.py +++ b/tests/suites/1_stateful/02_query/02_0000_kill_query.py @@ -32,14 +32,14 @@ mycursor = mydb.cursor() mycursor.execute( - "SELECT mysql_connection_id FROM system.processes WHERE extra_info LIKE '%SELECT max(number), sum(number) FROM numbers_mt(100000000000) GROUP BY number % 3, number % 4, number % 5 LIMIT 10%' AND extra_info NOT LIKE '%system.processes%';" + "SELECT mysql_connection_id FROM system.processes WHERE extra_info LIKE '%SELECT max(number)%' AND extra_info NOT LIKE '%system.processes%';" ) res = mycursor.fetchone() kill_query = "kill query " + str(res[0]) + ";" mycursor.execute(kill_query) time.sleep(0.1) mycursor.execute( - "SELECT * FROM system.processes WHERE extra_info LIKE '%SELECT max(number), sum(number) FROM numbers_mt(100000000000) GROUP BY number % 3, number % 4, number % 5 LIMIT 10%' AND extra_info NOT LIKE '%system.processes%';" + "SELECT * FROM system.processes WHERE extra_info LIKE '%SELECT max(number)%' AND extra_info NOT LIKE '%system.processes%';" ) res = mycursor.fetchone()