Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/binaries/query/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub async fn query_local(conf: &InnerConfig) -> Result<()> {
.await?;
let ctx = session.create_query_context().await?;
let mut planner = Planner::new(ctx.clone());
let (plan, _, _) = planner.plan_sql(&sql).await?;
let (plan, _) = planner.plan_sql(&sql).await?;
let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?;
let stream = interpreter.execute(ctx.clone()).await?;
let blocks = stream.map(|v| v).collect::<Vec<_>>().await;
Expand Down
11 changes: 11 additions & 0 deletions src/query/ast/src/ast/statements/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,17 @@ impl Connection {
}
}

pub fn mask(&self) -> Self {
let mut conns = BTreeMap::new();
for k in self.conns.keys() {
conns.insert(k.to_string(), "********".to_string());
}
Self {
visited_keys: self.visited_keys.clone(),
conns,
}
}

pub fn get(&mut self, key: &str) -> Option<&String> {
self.visited_keys.insert(key.to_string());
self.conns.get(key)
Expand Down
27 changes: 27 additions & 0 deletions src/query/ast/src/ast/statements/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,33 @@ pub struct StatementMsg {
pub(crate) format: Option<String>,
}

impl Statement {
pub fn to_mask_sql(&self) -> String {
match self {
Statement::Copy(copy) => {
let mut copy_clone = copy.clone();

if let CopyUnit::UriLocation(location) = &mut copy_clone.src {
location.connection = location.connection.mask()
}

if let CopyUnit::UriLocation(location) = &mut copy_clone.dst {
location.connection = location.connection.mask()
}
format!("{}", Statement::Copy(copy_clone))
}
Statement::CreateStage(stage) => {
let mut stage_clone = stage.clone();
if let Some(location) = &mut stage_clone.location {
location.connection = location.connection.mask()
}
format!("{}", Statement::CreateStage(stage_clone))
}
_ => format!("{}", self),
}
}
}

impl Display for Statement {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down
2 changes: 1 addition & 1 deletion src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub trait TableContext: Send + Sync {
fn get_cacheable(&self) -> bool;
fn set_cacheable(&self, cacheable: bool);

fn attach_query_str(&self, kind: String, query: &str);
fn attach_query_str(&self, kind: String, query: String);
fn get_query_str(&self) -> String;

fn get_fragment_id(&self) -> usize;
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ tokio-stream = { version = "0.1.10", features = ["net"] }
tonic = "0.8.1"
tracing = "0.1.36"
typetag = "0.2.3"
unicode-segmentation = "1.10.0"
unicode-segmentation = "1.10.1"
uuid = { version = "1.1.2", features = ["serde", "v4"] }

[dev-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Interpreter for DescribeTableInterpreter {
let schema = if tbl_info.engine() == VIEW_ENGINE {
if let Some(query) = tbl_info.options().get(QUERY) {
let mut planner = Planner::new(self.ctx.clone());
let (plan, _, _) = planner.plan_sql(query).await?;
let (plan, _) = planner.plan_sql(query).await?;
infer_table_schema(&plan.schema())
} else {
return Err(ErrorCode::Internal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl AlterViewInterpreter {
self.plan.subquery.clone()
} else {
let mut planner = Planner::new(self.ctx.clone());
let (plan, _, _) = planner.plan_sql(&self.plan.subquery.clone()).await?;
let (plan, _) = planner.plan_sql(&self.plan.subquery.clone()).await?;
if plan.schema().fields().len() != self.plan.column_names.len() {
return Err(ErrorCode::BadDataArrayLength(format!(
"column name length mismatch, expect {}, got {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl CreateViewInterpreter {
self.plan.subquery.clone()
} else {
let mut planner = Planner::new(self.ctx.clone());
let (plan, _, _) = planner.plan_sql(&self.plan.subquery.clone()).await?;
let (plan, _) = planner.plan_sql(&self.plan.subquery.clone()).await?;
if plan.schema().fields().len() != self.plan.column_names.len() {
return Err(ErrorCode::BadDataArrayLength(format!(
"column name length mismatch, expect {}, got {}",
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/procedures/systems/search_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl OneBlockProcedure for SearchTablesProcedure {
args[0]
);
let mut planner = Planner::new(ctx.clone());
let (plan, _, _) = planner.plan_sql(&query).await?;
let (plan, _) = planner.plan_sql(&query).await?;

let stream = if let Plan::Query {
s_expr,
Expand Down
12 changes: 6 additions & 6 deletions src/query/service/src/servers/http/clickhouse_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,14 @@ pub async fn clickhouse_handler_get(
let default_format = get_default_format(&params, headers).map_err(BadRequest)?;
let sql = params.query();
let mut planner = Planner::new(context.clone());
let (plan, _, fmt) = planner
let (plan, extras) = planner
.plan_sql(&sql)
.await
.map_err(|err| err.display_with_sql(&sql))
.map_err(BadRequest)?;
let format = get_format_with_default(fmt, default_format)?;
let format = get_format_with_default(extras.format, default_format)?;

context.attach_query_str(plan.to_string(), &sql);
context.attach_query_str(plan.to_string(), extras.stament.to_mask_sql());
let interpreter = InterpreterFactory::get(context.clone(), &plan)
.await
.map_err(|err| err.display_with_sql(&sql))
Expand Down Expand Up @@ -298,13 +298,13 @@ pub async fn clickhouse_handler_post(
info!("receive clickhouse http post, (query + body) = {}", &msg);

let mut planner = Planner::new(ctx.clone());
let (mut plan, _, fmt) = planner
let (mut plan, extras) = planner
.plan_sql(&sql)
.await
.map_err(|err| err.display_with_sql(&sql))
.map_err(BadRequest)?;
let schema = plan.schema();
ctx.attach_query_str(plan.to_string(), &sql);
ctx.attach_query_str(plan.to_string(), extras.stament.to_mask_sql());
let mut handle = None;
if let Plan::Insert(insert) = &mut plan {
if let InsertInputSource::StreamingWithFormat(format, start, input_context_ref) =
Expand Down Expand Up @@ -404,7 +404,7 @@ pub async fn clickhouse_handler_post(
}
};

let format = get_format_with_default(fmt, default_format)?;
let format = get_format_with_default(extras.format, default_format)?;
let interpreter = InterpreterFactory::get(ctx.clone(), &plan)
.await
.map_err(|err| err.display_with_sql(&sql))
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/servers/http/v1/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ pub async fn streaming_load(
}

let mut planner = Planner::new(context.clone());
let (mut plan, _, _) = planner
let (mut plan, extras) = planner
.plan_sql(insert_sql)
.await
.map_err(|err| err.display_with_sql(insert_sql))
.map_err(InternalServerError)?;
context.attach_query_str(plan.to_string(), insert_sql);
context.attach_query_str(plan.to_string(), extras.stament.to_mask_sql());

let schema = plan.schema();
match &mut plan {
Expand Down
6 changes: 3 additions & 3 deletions src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl Executor {
impl ExecuteState {
pub(crate) async fn get_schema(sql: &str, ctx: Arc<QueryContext>) -> Result<DataSchemaRef> {
let mut planner = Planner::new(ctx.clone());
let (plan, _, _) = planner.plan_sql(sql).await?;
let (plan, _) = planner.plan_sql(sql).await?;
Ok(InterpreterFactory::get_schema(ctx, &plan))
}

Expand All @@ -218,8 +218,8 @@ impl ExecuteState {
block_sender: SizedChannelSender<DataBlock>,
) -> Result<()> {
let mut planner = Planner::new(ctx.clone());
let (plan, _, _) = planner.plan_sql(sql).await?;
ctx.attach_query_str(plan.to_string(), sql);
let (plan, extras) = planner.plan_sql(sql).await?;
ctx.attach_query_str(plan.to_string(), extras.stament.to_mask_sql());

let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?;
let running_state = ExecuteRunning {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,9 @@ impl<W: AsyncWrite + Send + Unpin> InteractiveWorkerBase<W> {
let context = self.session.create_query_context().await?;

let mut planner = Planner::new(context.clone());
let (plan, _, _) = planner.plan_sql(query).await?;
let (plan, extras) = planner.plan_sql(query).await?;

context.attach_query_str(plan.to_string(), query);
context.attach_query_str(plan.to_string(), extras.stament.to_mask_sql());
let interpreter = InterpreterFactory::get(context.clone(), &plan).await;
let has_result_set = has_result_set_by_plan(&plan);

Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ impl TableContext for QueryContext {
self.shared.cacheable.store(cacheable, Ordering::Release);
}

fn attach_query_str(&self, kind: String, query: &str) {
fn attach_query_str(&self, kind: String, query: String) {
self.shared.attach_query_str(kind, query);
}

Expand Down
8 changes: 4 additions & 4 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ impl QueryContextShared {
}
}

pub fn attach_query_str(&self, kind: String, query: &str) {
pub fn attach_query_str(&self, kind: String, query: String) {
{
let mut running_query = self.running_query.write();
*running_query = Some(short_sql(query));
Expand Down Expand Up @@ -357,9 +357,9 @@ impl Drop for QueryContextShared {
}
}

pub fn short_sql(query: &str) -> String {
pub fn short_sql(sql: String) -> String {
use unicode_segmentation::UnicodeSegmentation;
let query = query.trim_start();
let query = sql.trim_start();
if query.len() >= 64 && query[..6].eq_ignore_ascii_case("INSERT") {
// keep first 64 graphemes
String::from_utf8(
Expand All @@ -373,6 +373,6 @@ pub fn short_sql(query: &str) -> String {
)
.unwrap() // by construction, this cannot panic as we extracted unicode grapheme
} else {
query.to_string()
sql
}
}
4 changes: 2 additions & 2 deletions src/query/service/tests/it/api/http/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ async fn run_query(query_ctx: &Arc<QueryContext>) -> Result<Arc<dyn Interpreter>
.set_authed_user(user, None)
.await?;
let mut planner = Planner::new(query_ctx.clone());
let (plan, _, _) = planner.plan_sql(sql).await?;
query_ctx.attach_query_str(plan.to_string(), sql);
let (plan, extras) = planner.plan_sql(sql).await?;
query_ctx.attach_query_str(plan.to_string(), extras.stament.to_mask_sql());
InterpreterFactory::get(query_ctx.clone(), &plan).await
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> {
// do compact
let query = format!("optimize table {db_name}.{tbl_name} compact");
let mut planner = Planner::new(ctx.clone());
let (plan, _, _) = planner.plan_sql(&query).await?;
let (plan, _) = planner.plan_sql(&query).await?;
let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?;
ctx.get_settings().set_max_threads(1)?;
let data_stream = interpreter.execute(ctx.clone()).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ impl TableContext for CtxDelegation {
todo!()
}

fn attach_query_str(&self, _kind: String, _query: &str) {
fn attach_query_str(&self, _kind: String, _query: String) {
todo!()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn test_deletion_mutator_multiple_empty_segments() -> Result<()> {
// delete
let query = format!("delete from {}.{} where id=1", db_name, tbl_name);
let mut planner = Planner::new(ctx.clone());
let (plan, _, _) = planner.plan_sql(&query).await?;
let (plan, _) = planner.plan_sql(&query).await?;
if let Plan::Delete(delete) = plan {
do_deletion(ctx.clone(), table.clone(), *delete).await?;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async fn test_fuse_table_optimize() -> Result<()> {
let query = format!("optimize table {db_name}.{tbl_name} compact");

let mut planner = Planner::new(ctx.clone());
let (plan, _, _) = planner.plan_sql(&query).await?;
let (plan, _) = planner.plan_sql(&query).await?;
let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?;

// `PipelineBuilder` will parallelize the table reading according to value of setting `max_threads`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn test_table_modify_column_ndv_statistics() -> Result<()> {
// delete
let query = "delete from default.t where c=1";
let mut planner = Planner::new(ctx.clone());
let (plan, _, _) = planner.plan_sql(query).await?;
let (plan, _) = planner.plan_sql(query).await?;
if let Plan::Delete(delete) = plan {
do_deletion(ctx.clone(), table.clone(), *delete).await?;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ pub async fn expects_ok(

pub async fn execute_query(ctx: Arc<QueryContext>, query: &str) -> Result<SendableDataBlockStream> {
let mut planner = Planner::new(ctx.clone());
let (plan, _, _) = planner.plan_sql(query).await?;
let (plan, _) = planner.plan_sql(query).await?;
let executor = InterpreterFactory::get(ctx.clone(), &plan).await?;
executor.execute(ctx.clone()).await
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ impl Binder {
if table.engine() == VIEW_ENGINE {
let query = table.get_table_info().options().get(QUERY).unwrap();
let mut planner = Planner::new(self.ctx.clone());
let (plan, _, _) = planner.plan_sql(query).await?;
let (plan, _) = planner.plan_sql(query).await?;
Ok((infer_table_schema(&plan.schema())?, vec![], vec![]))
} else {
Ok((table.schema(), vec![], table.field_comments().clone()))
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub use binder::SelectBuilder;
pub use binder::Visibility;
pub use expression_parser::*;
pub use metadata::*;
pub use planner::PlanExtras;
pub use planner::Planner;
pub use plans::ScalarExpr;
pub use semantic::normalize_identifier;
Expand Down
15 changes: 13 additions & 2 deletions src/query/sql/src/planner/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,19 @@ pub struct Planner {
ctx: Arc<dyn TableContext>,
}

#[derive(Debug, Clone)]
pub struct PlanExtras {
pub metadata: MetadataRef,
pub format: Option<String>,
pub stament: Statement,
}

impl Planner {
pub fn new(ctx: Arc<dyn TableContext>) -> Self {
Planner { ctx }
}

pub async fn plan_sql(&mut self, sql: &str) -> Result<(Plan, MetadataRef, Option<String>)> {
pub async fn plan_sql(&mut self, sql: &str) -> Result<(Plan, PlanExtras)> {
let settings = self.ctx.get_settings();
let sql_dialect = settings.get_sql_dialect()?;

Expand Down Expand Up @@ -103,7 +110,11 @@ impl Planner {
}));

let optimized_plan = optimize(self.ctx.clone(), opt_ctx, plan)?;
Ok((optimized_plan, metadata.clone(), format))
Ok((optimized_plan, PlanExtras {
metadata,
format,
stament: stmt,
}))
}
.await;

Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/system/src/columns_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl ColumnsTable {
let fields = if table.engine() == VIEW_ENGINE {
if let Some(query) = table.options().get(QUERY) {
let mut planner = Planner::new(ctx.clone());
let (plan, _, _) = planner.plan_sql(query).await?;
let (plan, _) = planner.plan_sql(query).await?;
let schema = infer_table_schema(&plan.schema())?;
schema.fields().clone()
} else {
Expand Down
4 changes: 2 additions & 2 deletions tests/suites/1_stateful/02_query/02_0000_kill_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@

mycursor = mydb.cursor()
mycursor.execute(
"SELECT mysql_connection_id FROM system.processes WHERE extra_info LIKE '%SELECT max(number), sum(number) FROM numbers_mt(100000000000) GROUP BY number % 3, number % 4, number % 5 LIMIT 10%' AND extra_info NOT LIKE '%system.processes%';"
"SELECT mysql_connection_id FROM system.processes WHERE extra_info LIKE '%SELECT max(number)%' AND extra_info NOT LIKE '%system.processes%';"
)
res = mycursor.fetchone()
kill_query = "kill query " + str(res[0]) + ";"
mycursor.execute(kill_query)
time.sleep(0.1)
mycursor.execute(
"SELECT * FROM system.processes WHERE extra_info LIKE '%SELECT max(number), sum(number) FROM numbers_mt(100000000000) GROUP BY number % 3, number % 4, number % 5 LIMIT 10%' AND extra_info NOT LIKE '%system.processes%';"
"SELECT * FROM system.processes WHERE extra_info LIKE '%SELECT max(number)%' AND extra_info NOT LIKE '%system.processes%';"
)
res = mycursor.fetchone()

Expand Down