Skip to content
1 change: 1 addition & 0 deletions datafusion/core/src/physical_plan/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub fn create_window_expr(
create_built_in_window_expr(fun, args, input_schema, name)?,
partition_by,
order_by,
window_frame,
)),
})
}
Expand Down
182 changes: 182 additions & 0 deletions datafusion/core/tests/sql/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1276,3 +1276,185 @@ async fn window_frame_creation() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_window_row_number_aggregate() -> Result<()> {
let config = SessionConfig::new();
let ctx = SessionContext::with_config(config);
register_aggregate_csv(&ctx).await?;
let sql = "SELECT
c8,
ROW_NUMBER() OVER(ORDER BY c9) AS rn1,
ROW_NUMBER() OVER(ORDER BY c9 ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as rn2
FROM aggregate_test_100
ORDER BY c8
LIMIT 5";

let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-----+-----+-----+",
"| c8 | rn1 | rn2 |",
"+-----+-----+-----+",
"| 102 | 73 | 73 |",
"| 299 | 1 | 1 |",
"| 363 | 41 | 41 |",
"| 417 | 14 | 14 |",
"| 794 | 95 | 95 |",
"+-----+-----+-----+",
];
assert_batches_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
async fn test_window_cume_dist() -> Result<()> {
let config = SessionConfig::new();
let ctx = SessionContext::with_config(config);
register_aggregate_csv(&ctx).await?;
let sql = "SELECT
c8,
CUME_DIST() OVER(ORDER BY c9) as cd1,
CUME_DIST() OVER(ORDER BY c9 ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as cd2
FROM aggregate_test_100
ORDER BY c8
LIMIT 5";

let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-----+------+------+",
"| c8 | cd1 | cd2 |",
"+-----+------+------+",
"| 102 | 0.73 | 0.73 |",
"| 299 | 0.01 | 0.01 |",
"| 363 | 0.41 | 0.41 |",
"| 417 | 0.14 | 0.14 |",
"| 794 | 0.95 | 0.95 |",
"+-----+------+------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
async fn test_window_rank() -> Result<()> {
let config = SessionConfig::new();
let ctx = SessionContext::with_config(config);
register_aggregate_csv(&ctx).await?;
let sql = "SELECT
c9,
RANK() OVER(ORDER BY c1) AS rank1,
RANK() OVER(ORDER BY c1 ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as rank2,
DENSE_RANK() OVER(ORDER BY c1) as dense_rank1,
DENSE_RANK() OVER(ORDER BY c1 ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as dense_rank2,
PERCENT_RANK() OVER(ORDER BY c1) as percent_rank1,
PERCENT_RANK() OVER(ORDER BY c1 ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as percent_rank2
FROM aggregate_test_100
ORDER BY c9
LIMIT 5";

let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-----------+-------+-------+-------------+-------------+---------------------+---------------------+",
"| c9 | rank1 | rank2 | dense_rank1 | dense_rank2 | percent_rank1 | percent_rank2 |",
"+-----------+-------+-------+-------------+-------------+---------------------+---------------------+",
"| 28774375 | 80 | 80 | 5 | 5 | 0.797979797979798 | 0.797979797979798 |",
"| 63044568 | 62 | 62 | 4 | 4 | 0.6161616161616161 | 0.6161616161616161 |",
"| 141047417 | 1 | 1 | 1 | 1 | 0 | 0 |",
"| 141680161 | 41 | 41 | 3 | 3 | 0.40404040404040403 | 0.40404040404040403 |",
"| 145294611 | 1 | 1 | 1 | 1 | 0 | 0 |",
"+-----------+-------+-------+-------------+-------------+---------------------+---------------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
async fn test_lag_lead() -> Result<()> {
let config = SessionConfig::new();
let ctx = SessionContext::with_config(config);
register_aggregate_csv(&ctx).await?;
let sql = "SELECT
c9,
LAG(c9, 2, 10101) OVER(ORDER BY c9) as lag1,
LAG(c9, 2, 10101) OVER(ORDER BY c9 ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lag2,
LEAD(c9, 2, 10101) OVER(ORDER BY c9) as lead1,
LEAD(c9, 2, 10101) OVER(ORDER BY c9 ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lead2
FROM aggregate_test_100
ORDER BY c9
LIMIT 5";

let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-----------+-----------+-----------+-----------+-----------+",
"| c9 | lag1 | lag2 | lead1 | lead2 |",
"+-----------+-----------+-----------+-----------+-----------+",
"| 28774375 | 10101 | 10101 | 141047417 | 141047417 |",
"| 63044568 | 10101 | 10101 | 141680161 | 141680161 |",
"| 141047417 | 28774375 | 28774375 | 145294611 | 145294611 |",
"| 141680161 | 63044568 | 63044568 | 225513085 | 225513085 |",
"| 145294611 | 141047417 | 141047417 | 243203849 | 243203849 |",
"+-----------+-----------+-----------+-----------+-----------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
async fn test_window_frame_first_value_last_value_aggregate() -> Result<()> {
let config = SessionConfig::new();
let ctx = SessionContext::with_config(config);
register_aggregate_csv(&ctx).await?;

let sql = "SELECT
FIRST_VALUE(c4) OVER(ORDER BY c9 ASC ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING) as first_value1,
FIRST_VALUE(c4) OVER(ORDER BY c9 ASC ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING) as first_value2,
LAST_VALUE(c4) OVER(ORDER BY c9 ASC ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING) as last_value1,
LAST_VALUE(c4) OVER(ORDER BY c9 ASC ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING) as last_value2
FROM aggregate_test_100
ORDER BY c9
LIMIT 5";

let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+--------------+--------------+-------------+-------------+",
"| first_value1 | first_value2 | last_value1 | last_value2 |",
"+--------------+--------------+-------------+-------------+",
"| -16110 | -16110 | 3917 | -1114 |",
"| -16110 | -16110 | -16974 | 15673 |",
"| -16110 | -16110 | -1114 | 13630 |",
"| -16110 | 3917 | 15673 | -13217 |",
"| -16110 | -16974 | 13630 | 20690 |",
"+--------------+--------------+-------------+-------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
async fn test_window_frame_nth_value_aggregate() -> Result<()> {
let config = SessionConfig::new();
let ctx = SessionContext::with_config(config);
register_aggregate_csv(&ctx).await?;

let sql = "SELECT
NTH_VALUE(c4, 3) OVER(ORDER BY c9 ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as nth_value1,
NTH_VALUE(c4, 2) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as nth_value2
FROM aggregate_test_100
ORDER BY c9
LIMIT 5";

let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+------------+------------+",
"| nth_value1 | nth_value2 |",
"+------------+------------+",
"| | 3917 |",
"| -16974 | 3917 |",
"| -16974 | -16974 |",
"| -1114 | -1114 |",
"| 15673 | 15673 |",
"+------------+------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
}
Loading