Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion scripts/ci/ci-run-stateful-tests-standalone-minio.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,3 @@ cd "$SCRIPT_PATH/../../tests" || exit

echo "Starting databend-test"
./databend-test $1 --mode 'standalone' --run-dir 1_stateful

3 changes: 1 addition & 2 deletions scripts/ci/deploy/databend-query-sharing.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ nohup target/${BUILD_PROFILE}/databend-meta -c scripts/ci/deploy/config/databend
python3 scripts/ci/wait_tcp.py --timeout 10 --port 19191

echo 'Start query node2 open-sharing...'
nohup target/${BUILD_PROFILE}/open-sharing --tenant=shared_tenant --storage-type=s3 --storage-s3-bucket=testbucket --storage-s3-endpoint-url=http://127.0.0.1:9900 --storage-s3-access-key-id=minioadmin --storage-s3-secret-access-key=minioadmin --storage-allow-insecure --share-endpoint-address=127.0.0.1:33003 &
nohup target/${BUILD_PROFILE}/open-sharing --tenant=shared_tenant --storage-type=s3 --storage-s3-bucket=testbucket --storage-s3-endpoint-url=http://127.0.0.1:9900 --storage-s3-access-key-id=minioadmin --storage-s3-secret-access-key=minioadmin --storage-allow-insecure --share-endpoint-address=127.0.0.1:33003 &
python3 scripts/ci/wait_tcp.py --timeout 10 --port 33003

echo "Start query node2 for sharding data"
nohup target/${BUILD_PROFILE}/databend-query -c scripts/ci/deploy/config/databend-query-node-share-2.toml &
python3 scripts/ci/wait_tcp.py --timeout 30 --port 43307

1 change: 1 addition & 0 deletions src/query/ast/src/parser/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ pub fn window_spec(i: Input) -> IResult<WindowSpec> {
},
)(i)
}

pub fn window_spec_ident(i: Input) -> IResult<Window> {
alt((
map(
Expand Down
2 changes: 1 addition & 1 deletion src/query/sharing-endpoint/src/middlewares.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl<E: Endpoint> Endpoint for SharingAuthImpl<E> {
Ok(resp)
}
Err(err) => {
println!("err: {:?}", err);
// println!("err: {:?}", err);
Err(err)
}
}
Expand Down
67 changes: 53 additions & 14 deletions src/query/sql/src/planner/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ impl Binder {
from_context.all_column_bindings(),
self.name_resolution_ctx.unquoted_ident_case_sensitive,
);
let new_stmt = rewriter.rewrite(stmt)?;
let (new_stmt, new_order_by) = rewriter.rewrite(stmt, order_by)?;
let stmt = new_stmt.as_ref().unwrap_or(stmt);
let order_by = new_order_by.as_deref().unwrap_or(order_by);

if let Some(expr) = &stmt.selection {
s_expr = self.bind_where(&mut from_context, expr, s_expr).await?;
Expand Down Expand Up @@ -162,7 +163,7 @@ impl Binder {

let order_items = self
.analyze_order_items(
&from_context,
&mut from_context,
&mut scalar_items,
&projections,
order_by,
Expand Down Expand Up @@ -676,6 +677,7 @@ impl Binder {
struct SelectRewriter<'a> {
column_binding: &'a [ColumnBinding],
new_stmt: Option<SelectStmt>,
new_order_by: Option<Vec<OrderByExpr>>,
is_unquoted_ident_case_sensitive: bool,
}

Expand Down Expand Up @@ -780,16 +782,22 @@ impl<'a> SelectRewriter<'a> {
SelectRewriter {
column_binding,
new_stmt: None,
new_order_by: None,
is_unquoted_ident_case_sensitive,
}
}

fn rewrite(&mut self, stmt: &SelectStmt) -> Result<Option<SelectStmt>> {
self.rewrite_window_references(stmt)?;
fn rewrite(
&mut self,
stmt: &SelectStmt,
order_by: &[OrderByExpr],
) -> Result<(Option<SelectStmt>, Option<Vec<OrderByExpr>>)> {
self.rewrite_window_references(stmt, order_by)?;
self.rewrite_pivot(stmt)?;
self.rewrite_unpivot(stmt)?;
Ok(self.new_stmt.take())
Ok((self.new_stmt.take(), self.new_order_by.take()))
}

fn rewrite_pivot(&mut self, stmt: &SelectStmt) -> Result<()> {
if stmt.from.len() != 1 || stmt.from[0].pivot().is_none() {
return Ok(());
Expand Down Expand Up @@ -900,7 +908,11 @@ impl<'a> SelectRewriter<'a> {
Ok(())
}

fn rewrite_window_references(&mut self, stmt: &SelectStmt) -> Result<()> {
fn rewrite_window_references(
&mut self,
stmt: &SelectStmt,
order_by: &[OrderByExpr],
) -> Result<()> {
if stmt.window_list.is_none() {
return Ok(());
}
Expand Down Expand Up @@ -947,6 +959,36 @@ impl<'a> SelectRewriter<'a> {
}
}

if !order_by.is_empty() {
let mut new_order_by = order_by.to_vec();
for order in &mut new_order_by {
match &mut order.expr {
Expr::FunctionCall { window, .. } => {
if let Some(window) = window {
match window {
Window::WindowReference(reference) => {
let window_spec = window_definitions
.get(&reference.window_name.name)
.ok_or_else(|| {
ErrorCode::SyntaxException("Window not found")
})?;
*window = Window::WindowSpec(WindowSpec {
existing_window_name: None,
partition_by: window_spec.partition_by.clone(),
order_by: window_spec.order_by.clone(),
window_frame: window_spec.window_frame.clone(),
});
}
Window::WindowSpec(_) => continue,
}
}
}
_ => continue,
}
}
self.new_order_by = Some(new_order_by);
}

if let Some(ref mut new_stmt) = self.new_stmt {
new_stmt.select_list = new_select_list;
} else {
Expand Down Expand Up @@ -1004,14 +1046,11 @@ impl<'a> SelectRewriter<'a> {
.clone();

let resolved_spec = match referenced_window_spec.existing_window_name.clone() {
Some(_) => {
println!("call recursion:{:?}", referenced_name);
Self::rewrite_inherited_window_spec(
&referenced_window_spec,
window_list,
resolved_window,
)?
}
Some(_) => Self::rewrite_inherited_window_spec(
&referenced_window_spec,
window_list,
resolved_window,
)?,
None => referenced_window_spec.clone(),
};

Expand Down
100 changes: 69 additions & 31 deletions src/query/sql/src/planner/binder/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_exception::Result;
use super::bind_context::NameResolutionResult;
use crate::binder::scalar::ScalarBinder;
use crate::binder::select::SelectList;
use crate::binder::window::WindowRewriter;
use crate::binder::Binder;
use crate::binder::ColumnBinding;
use crate::normalize_identifier;
Expand Down Expand Up @@ -64,7 +65,7 @@ impl Binder {
#[async_backtrace::framed]
pub(super) async fn analyze_order_items(
&mut self,
from_context: &BindContext,
bind_context: &mut BindContext,
scalar_items: &mut HashMap<IndexType, ScalarItem>,
projections: &[ColumnBinding],
order_by: &[OrderByExpr],
Expand Down Expand Up @@ -121,7 +122,7 @@ impl Binder {

// If there isn't a matched alias in select list, we will fallback to
// from clause.
let result = from_context.resolve_name(
let result = bind_context.resolve_name(
database.as_deref(),
table.as_deref(),
&column,
Expand Down Expand Up @@ -182,32 +183,36 @@ impl Binder {
});
}
_ => {
let mut bind_context = from_context.clone();
for column_binding in projections.iter() {
if bind_context.columns.contains(column_binding) {
continue;
}
bind_context.columns.push(column_binding.clone());
}
let mut scalar_binder = ScalarBinder::new(
&mut bind_context,
bind_context,
self.ctx.clone(),
&self.name_resolution_ctx,
self.metadata.clone(),
&[],
);
let (bound_expr, _) = scalar_binder.bind(&order.expr).await?;
let rewrite_scalar = self
.rewrite_scalar_with_replacement(&bound_expr, &|nest_scalar| {
if let ScalarExpr::BoundColumnRef(BoundColumnRef { column, .. }) =
nest_scalar
{
if let Some(scalar_item) = scalar_items.get(&column.index) {
return Ok(Some(scalar_item.scalar.clone()));
.rewrite_scalar_with_replacement(
bind_context,
&bound_expr,
&|nest_scalar| {
if let ScalarExpr::BoundColumnRef(BoundColumnRef {
column, ..
}) = nest_scalar
{
if let Some(scalar_item) = scalar_items.get(&column.index) {
return Ok(Some(scalar_item.scalar.clone()));
}
}
}
Ok(None)
})
Ok(None)
},
)
.map_err(|e| ErrorCode::SemanticError(e.message()))?;
let column_binding = self.create_column_binding(
None,
Expand Down Expand Up @@ -377,6 +382,7 @@ impl Binder {
#[allow(clippy::only_used_in_recursion)]
pub(crate) fn rewrite_scalar_with_replacement<F>(
&self,
bind_context: &mut BindContext,
original_scalar: &ScalarExpr,
replacement_fn: &F,
) -> Result<ScalarExpr>
Expand All @@ -388,29 +394,50 @@ impl Binder {
Some(replacement) => Ok(replacement),
None => match original_scalar {
ScalarExpr::AndExpr(AndExpr { left, right }) => {
let left =
Box::new(self.rewrite_scalar_with_replacement(left, replacement_fn)?);
let right =
Box::new(self.rewrite_scalar_with_replacement(right, replacement_fn)?);
let left = Box::new(self.rewrite_scalar_with_replacement(
bind_context,
left,
replacement_fn,
)?);
let right = Box::new(self.rewrite_scalar_with_replacement(
bind_context,
right,
replacement_fn,
)?);
Ok(ScalarExpr::AndExpr(AndExpr { left, right }))
}
ScalarExpr::OrExpr(OrExpr { left, right }) => {
let left =
Box::new(self.rewrite_scalar_with_replacement(left, replacement_fn)?);
let right =
Box::new(self.rewrite_scalar_with_replacement(right, replacement_fn)?);
let left = Box::new(self.rewrite_scalar_with_replacement(
bind_context,
left,
replacement_fn,
)?);
let right = Box::new(self.rewrite_scalar_with_replacement(
bind_context,
right,
replacement_fn,
)?);
Ok(ScalarExpr::OrExpr(OrExpr { left, right }))
}
ScalarExpr::NotExpr(NotExpr { argument }) => {
let argument =
Box::new(self.rewrite_scalar_with_replacement(argument, replacement_fn)?);
let argument = Box::new(self.rewrite_scalar_with_replacement(
bind_context,
argument,
replacement_fn,
)?);
Ok(ScalarExpr::NotExpr(NotExpr { argument }))
}
ScalarExpr::ComparisonExpr(ComparisonExpr { op, left, right }) => {
let left =
Box::new(self.rewrite_scalar_with_replacement(left, replacement_fn)?);
let right =
Box::new(self.rewrite_scalar_with_replacement(right, replacement_fn)?);
let left = Box::new(self.rewrite_scalar_with_replacement(
bind_context,
left,
replacement_fn,
)?);
let right = Box::new(self.rewrite_scalar_with_replacement(
bind_context,
right,
replacement_fn,
)?);
Ok(ScalarExpr::ComparisonExpr(ComparisonExpr {
op: op.clone(),
left,
Expand All @@ -427,7 +454,9 @@ impl Binder {
}) => {
let args = args
.iter()
.map(|arg| self.rewrite_scalar_with_replacement(arg, replacement_fn))
.map(|arg| {
self.rewrite_scalar_with_replacement(bind_context, arg, replacement_fn)
})
.collect::<Result<Vec<_>>>()?;
Ok(ScalarExpr::AggregateFunction(AggregateFunction {
display_name: display_name.clone(),
Expand All @@ -438,6 +467,10 @@ impl Binder {
return_type: return_type.clone(),
}))
}
window @ ScalarExpr::WindowFunction(_) => {
let mut rewriter = WindowRewriter::new(bind_context, self.metadata.clone());
rewriter.visit(window)
}
ScalarExpr::FunctionCall(FunctionCall {
span,
params,
Expand All @@ -446,7 +479,9 @@ impl Binder {
}) => {
let arguments = arguments
.iter()
.map(|arg| self.rewrite_scalar_with_replacement(arg, replacement_fn))
.map(|arg| {
self.rewrite_scalar_with_replacement(bind_context, arg, replacement_fn)
})
.collect::<Result<Vec<_>>>()?;
Ok(ScalarExpr::FunctionCall(FunctionCall {
span: *span,
Expand All @@ -461,8 +496,11 @@ impl Binder {
argument,
target_type,
}) => {
let argument =
Box::new(self.rewrite_scalar_with_replacement(argument, replacement_fn)?);
let argument = Box::new(self.rewrite_scalar_with_replacement(
bind_context,
argument,
replacement_fn,
)?);
Ok(ScalarExpr::CastExpr(CastExpr {
span: *span,
is_try: *is_try,
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/semantic/grouping_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl<'a> GroupingChecker<'a> {
}
.into())
} else {
Err(ErrorCode::Internal("Invalid window function"))
Err(ErrorCode::Internal("Group Check: Invalid window function"))
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/semantic/window_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl<'a> WindowChecker<'a> {
}
.into());
}
Err(ErrorCode::Internal("Invalid window function"))
Err(ErrorCode::Internal("Window Check: Invalid window function"))
}

ScalarExpr::AggregateFunction(_) => unreachable!(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,5 +262,17 @@ SELECT * FROM (SELECT row_number() OVER w rn FROM empsalary WINDOW w AS (PARTITI
2
3

# Window func in order by
query II
SELECT a, sum(a) OVER w FROM t1 WINDOW w AS (PARTITION BY a) ORDER BY count() OVER w DESC
----
1 3
1 3
1 3
3 6
3 6
5 10
5 10

statement ok
DROP DATABASE test_named_window_basic
Loading