Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -927,10 +927,18 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
.as_ref()
.map::<Result<WindowFrame, _>, _>(|e| match e {
window_expr_node::WindowFrame::Frame(frame) => {
frame.clone().try_into()
let window_frame: WindowFrame = frame.clone().try_into()?;
if WindowFrameUnits::Range == window_frame.units
&& order_by.len() != 1
{
Err(proto_error("With window frame of type RANGE, the order by expression must be of length 1"))
} else {
Ok(window_frame)
}
}
})
.transpose()?;

match window_function {
window_expr_node::WindowFunction::AggrFunction(i) => {
let aggr_function = protobuf::AggregateFunction::from_i32(*i)
Expand Down
35 changes: 35 additions & 0 deletions datafusion/src/logical_plan/window_frames.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,22 @@ impl TryFrom<ast::WindowFrame> for WindowFrame {
)))
} else {
let units = value.units.into();
if units == WindowFrameUnits::Range {
for bound in &[start_bound, end_bound] {
match bound {
WindowFrameBound::Preceding(Some(v))
| WindowFrameBound::Following(Some(v))
if *v > 0 =>
{
Err(DataFusionError::NotImplemented(format!(
"With WindowFrameUnits={}, the bound cannot be {} PRECEDING or FOLLOWING at the moment",
units, v
)))
}
_ => Ok(()),
}?;
}
}
Ok(Self {
units,
start_bound,
Expand Down Expand Up @@ -270,6 +286,25 @@ mod tests {
result.err().unwrap().to_string(),
"Execution error: Invalid window frame: start bound (1 PRECEDING) cannot be larger than end bound (2 PRECEDING)".to_owned()
);

let window_frame = ast::WindowFrame {
units: ast::WindowFrameUnits::Range,
start_bound: ast::WindowFrameBound::Preceding(Some(2)),
end_bound: Some(ast::WindowFrameBound::Preceding(Some(1))),
};
let result = WindowFrame::try_from(window_frame);
assert_eq!(
result.err().unwrap().to_string(),
"This feature is not implemented: With WindowFrameUnits=RANGE, the bound cannot be 2 PRECEDING or FOLLOWING at the moment".to_owned()
);

let window_frame = ast::WindowFrame {
units: ast::WindowFrameUnits::Rows,
start_bound: ast::WindowFrameBound::Preceding(Some(2)),
end_bound: Some(ast::WindowFrameBound::Preceding(Some(1))),
};
let result = WindowFrame::try_from(window_frame);
assert!(result.is_ok());
Ok(())
}

Expand Down
1 change: 0 additions & 1 deletion datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,5 @@ pub mod udf;
#[cfg(feature = "unicode_expressions")]
pub mod unicode_expressions;
pub mod union;
pub mod window_frames;
pub mod window_functions;
pub mod windows;
Loading