Skip to content

Commit

Permalink
fix: optimize the error message for direct select the stream table
Browse files Browse the repository at this point in the history
  • Loading branch information
yukkit authored and ZuoTiJia committed Nov 21, 2023
1 parent 9ebb2e9 commit 4a0d2c0
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 23 deletions.
56 changes: 33 additions & 23 deletions query_server/query/src/execution/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,34 +75,44 @@ impl QueryExecutionFactory for SqlQueryExecutionFactory {
// 获取执行计划中所有涉及到的stream source
let stream_providers = extract_stream_providers(&query_plan);

// 纯批操作
// 1. 没有stream source
// 2. explain
// 3. 非dml
if stream_providers.is_empty() || query_plan.is_explain() || !is_dml(&query_plan) {
return Ok(Arc::new(SqlQueryExecution::new(
// (含有流表, explain, dml)
match (
!stream_providers.is_empty(),
query_plan.is_explain(),
is_dml(&query_plan),
) {
(false, _, _) | (true, true, _) => Ok(Arc::new(SqlQueryExecution::new(
state_machine,
query_plan,
self.optimizer.clone(),
self.scheduler.clone(),
)));
}

// 流操作
let options = state_machine.session.inner().state().config().into();
let exec = MicroBatchStreamExecutionBuilder::new(MicroBatchStreamExecutionDesc {
plan: Arc::new(query_plan),
options,
})
.with_stream_providers(stream_providers)
.build(
state_machine,
self.scheduler.clone(),
self.trigger_executor_factory.clone(),
self.runtime.clone(),
)?;
))),
(true, false, true) => {
// 流操作
// stream source + dml + !explain
let options = (&state_machine.session.inner().copied_config()).into();
let exec =
MicroBatchStreamExecutionBuilder::new(MicroBatchStreamExecutionDesc {
plan: Arc::new(query_plan),
options,
})
.with_stream_providers(stream_providers)
.build(
state_machine,
self.scheduler.clone(),
self.trigger_executor_factory.clone(),
self.runtime.clone(),
)?;

Ok(Arc::new(exec))
Ok(Arc::new(exec))
}
(true, false, false) => {
// stream source + !dml + !explain
Err(QueryError::NotImplemented {
err: "Stream table can only be used as source table in insert select statements.".to_string(),
})
}
}
}
Plan::DDL(ddl_plan) => Ok(Arc::new(DDLExecution::new(
state_machine,
Expand Down
5 changes: 5 additions & 0 deletions query_server/sqllogicaltests/cases/stream/unsupport_op.slt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ insert into readings_agg
from TskvTable
order by time, name;

# select
statement error Arrow error: Io error: Status \{ code: Internal, message: "Execute logical plan: This feature is not implemented: Stream table can only be used as source table in insert select statements\.", .*
select *
from TskvTable;

# alter
statement error Arrow error: Io error: Status \{ code: Internal, message: "Build logical plan: This feature is not implemented: only tskv table support alter",.*
alter table TskvTable add tag ta;

0 comments on commit 4a0d2c0

Please sign in to comment.