Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions datafusion/functions-window/src/lead_lag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use std::cmp::min;
use std::collections::VecDeque;
use std::hash::Hash;
use std::ops::{Neg, Range};
use std::ops::Range;
use std::sync::{Arc, LazyLock};

get_or_init_udwf!(
Expand Down Expand Up @@ -116,7 +116,7 @@ impl WindowShiftKind {
fn shift_offset(&self, value: Option<i64>) -> i64 {
match self {
WindowShiftKind::Lag => value.unwrap_or(1),
WindowShiftKind::Lead => value.map(|v| v.neg()).unwrap_or(-1),
WindowShiftKind::Lead => value.map_or(-1, |v| v.wrapping_neg()),
}
}
}
Expand Down Expand Up @@ -266,7 +266,7 @@ impl WindowUDFImpl for WindowShift {
.map(|n| self.kind.shift_offset(n))
.map(|offset| {
if partition_evaluator_args.is_reversed() {
-offset
offset.wrapping_neg()
} else {
offset
}
Expand Down Expand Up @@ -410,6 +410,15 @@ struct WindowShiftEvaluator {
non_null_offsets: VecDeque<usize>,
}

fn offset_magnitude(offset: i64) -> usize {
let offset = offset.unsigned_abs();
if offset > usize::MAX as u64 {
usize::MAX
} else {
offset as usize
}
}

impl WindowShiftEvaluator {
fn is_lag(&self) -> bool {
// Mode is LAG, when shift_offset is positive
Expand Down Expand Up @@ -503,27 +512,27 @@ fn shift_with_default_value(

impl PartitionEvaluator for WindowShiftEvaluator {
fn get_range(&self, idx: usize, n_rows: usize) -> Result<Range<usize>> {
let offset = offset_magnitude(self.shift_offset);

if self.is_lag() {
let start = if self.non_null_offsets.len() == self.shift_offset as usize {
let start = if self.non_null_offsets.len() == offset {
// How many rows needed previous than the current row to get necessary lag result
let offset: usize = self.non_null_offsets.iter().sum();
idx.saturating_sub(offset)
} else if !self.ignore_nulls {
let offset = self.shift_offset as usize;
idx.saturating_sub(offset)
} else {
0
};
let end = idx + 1;
Ok(Range { start, end })
} else {
let end = if self.non_null_offsets.len() == (-self.shift_offset) as usize {
let end = if self.non_null_offsets.len() == offset {
// How many rows needed further than the current row to get necessary lead result
let offset: usize = self.non_null_offsets.iter().sum();
min(idx + offset + 1, n_rows)
min(idx.saturating_add(offset).saturating_add(1), n_rows)
} else if !self.ignore_nulls {
let offset = (-self.shift_offset) as usize;
min(idx + offset, n_rows)
min(idx.saturating_add(offset), n_rows)
} else {
n_rows
};
Expand All @@ -546,20 +555,26 @@ impl PartitionEvaluator for WindowShiftEvaluator {

// LAG mode
let i = if self.is_lag() {
(range.end as i64 - self.shift_offset - 1) as usize
range
.end
.checked_sub(1)
.and_then(|end| (end as i64).checked_sub(self.shift_offset))
.and_then(|value| usize::try_from(value).ok())
} else {
// LEAD mode
(range.start as i64 - self.shift_offset) as usize
(range.start as i64)
.checked_sub(self.shift_offset)
.and_then(|value| usize::try_from(value).ok())
};

let mut idx: Option<usize> = if i < len { Some(i) } else { None };
let mut idx: Option<usize> = i.filter(|i| *i < len);

// LAG with IGNORE NULLS calculated as the current row index - offset, but only for non-NULL rows
// If current row index points to NULL value the row is NOT counted
if self.ignore_nulls && self.is_lag() {
// LAG when NULLS are ignored.
// Find the nonNULL row index that shifted by offset comparing to current row index
idx = if self.non_null_offsets.len() == self.shift_offset as usize {
let shift_offset = offset_magnitude(self.shift_offset);
idx = if self.non_null_offsets.len() == shift_offset {
let total_offset: usize = self.non_null_offsets.iter().sum();
Some(range.end - 1 - total_offset)
} else {
Expand All @@ -570,7 +585,7 @@ impl PartitionEvaluator for WindowShiftEvaluator {
if array.is_valid(range.end - 1) {
// Non-null add new offset
self.non_null_offsets.push_back(1);
if self.non_null_offsets.len() > self.shift_offset as usize {
if self.non_null_offsets.len() > shift_offset {
// WE do not need to keep track of more than `lag number of offset` values.
self.non_null_offsets.pop_front();
}
Expand All @@ -582,7 +597,7 @@ impl PartitionEvaluator for WindowShiftEvaluator {
} else if self.ignore_nulls && !self.is_lag() {
// LEAD when NULLS are ignored.
// Stores the necessary non-null entry number further than the current row.
let non_null_row_count = (-self.shift_offset) as usize;
let non_null_row_count = offset_magnitude(self.shift_offset);

if self.non_null_offsets.is_empty() {
// When empty, fill non_null offsets with the data further than the current row.
Expand All @@ -596,7 +611,8 @@ impl PartitionEvaluator for WindowShiftEvaluator {
}
// It is enough to keep track of `non_null_row_count + 1` non-null offset.
// further data is unnecessary for the result.
if self.non_null_offsets.len() == non_null_row_count + 1 {
if self.non_null_offsets.len() == non_null_row_count.saturating_add(1)
{
break;
}
}
Expand Down
38 changes: 38 additions & 0 deletions datafusion/sqllogictest/test_files/lead_lag_extreme_offsets.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Tests for extreme lead offsets regression cases (#22221, #22231)

statement ok
CREATE TABLE lead_lag_extreme_offsets(id INT, value INT) AS VALUES
(1, 10),
(2, 20),
(3, 30);

# i64::MIN lead offset should not panic and should produce the provided default
query I
SELECT lead(value, -9223372036854775808, 0) OVER (ORDER BY id) FROM lead_lag_extreme_offsets ORDER BY id;
----
0
0
0

# Very large lead offset should not panic and should produce the provided default
query I
SELECT lead(value, 9223372036854775807, 0) OVER (ORDER BY id) FROM lead_lag_extreme_offsets ORDER BY id;
----
0
0
0

# i64::MIN lead offset without explicit default should return NULL
query I
SELECT lead(value, -9223372036854775808) OVER (ORDER BY id) FROM lead_lag_extreme_offsets ORDER BY id;
----
NULL
NULL
NULL

# Out-of-range offset literal should fail as a non-integer offset
query error DataFusion error: Execution error: Expected an integer value
SELECT lead(value, -9223372036854775809, 0) OVER (ORDER BY id) FROM lead_lag_extreme_offsets;

statement ok
DROP TABLE lead_lag_extreme_offsets;
Loading