From e6ccbda00b465951179c9f5d8b98579435a2f139 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 16 May 2026 12:22:37 +0200 Subject: [PATCH] fix: handle extreme lead lag offsets --- datafusion/functions-window/src/lead_lag.rs | 50 ++++++++++++------- .../test_files/lead_lag_extreme_offsets.slt | 38 ++++++++++++++ 2 files changed, 71 insertions(+), 17 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/lead_lag_extreme_offsets.slt diff --git a/datafusion/functions-window/src/lead_lag.rs b/datafusion/functions-window/src/lead_lag.rs index b78709b22c1db..de4071c0ceda7 100644 --- a/datafusion/functions-window/src/lead_lag.rs +++ b/datafusion/functions-window/src/lead_lag.rs @@ -36,7 +36,7 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use std::cmp::min; use std::collections::VecDeque; use std::hash::Hash; -use std::ops::{Neg, Range}; +use std::ops::Range; use std::sync::{Arc, LazyLock}; get_or_init_udwf!( @@ -116,7 +116,7 @@ impl WindowShiftKind { fn shift_offset(&self, value: Option) -> i64 { match self { WindowShiftKind::Lag => value.unwrap_or(1), - WindowShiftKind::Lead => value.map(|v| v.neg()).unwrap_or(-1), + WindowShiftKind::Lead => value.map_or(-1, |v| v.wrapping_neg()), } } } @@ -266,7 +266,7 @@ impl WindowUDFImpl for WindowShift { .map(|n| self.kind.shift_offset(n)) .map(|offset| { if partition_evaluator_args.is_reversed() { - -offset + offset.wrapping_neg() } else { offset } @@ -410,6 +410,15 @@ struct WindowShiftEvaluator { non_null_offsets: VecDeque, } +fn offset_magnitude(offset: i64) -> usize { + let offset = offset.unsigned_abs(); + if offset > usize::MAX as u64 { + usize::MAX + } else { + offset as usize + } +} + impl WindowShiftEvaluator { fn is_lag(&self) -> bool { // Mode is LAG, when shift_offset is positive @@ -503,13 +512,14 @@ fn shift_with_default_value( impl PartitionEvaluator for WindowShiftEvaluator { fn get_range(&self, idx: usize, n_rows: usize) -> Result> { + let offset = offset_magnitude(self.shift_offset); + if self.is_lag() { - let start = if self.non_null_offsets.len() == self.shift_offset as usize { + let start = if self.non_null_offsets.len() == offset { // How many rows needed previous than the current row to get necessary lag result let offset: usize = self.non_null_offsets.iter().sum(); idx.saturating_sub(offset) } else if !self.ignore_nulls { - let offset = self.shift_offset as usize; idx.saturating_sub(offset) } else { 0 @@ -517,13 +527,12 @@ impl PartitionEvaluator for WindowShiftEvaluator { let end = idx + 1; Ok(Range { start, end }) } else { - let end = if self.non_null_offsets.len() == (-self.shift_offset) as usize { + let end = if self.non_null_offsets.len() == offset { // How many rows needed further than the current row to get necessary lead result let offset: usize = self.non_null_offsets.iter().sum(); - min(idx + offset + 1, n_rows) + min(idx.saturating_add(offset).saturating_add(1), n_rows) } else if !self.ignore_nulls { - let offset = (-self.shift_offset) as usize; - min(idx + offset, n_rows) + min(idx.saturating_add(offset), n_rows) } else { n_rows }; @@ -546,20 +555,26 @@ impl PartitionEvaluator for WindowShiftEvaluator { // LAG mode let i = if self.is_lag() { - (range.end as i64 - self.shift_offset - 1) as usize + range + .end + .checked_sub(1) + .and_then(|end| (end as i64).checked_sub(self.shift_offset)) + .and_then(|value| usize::try_from(value).ok()) } else { // LEAD mode - (range.start as i64 - self.shift_offset) as usize + (range.start as i64) + .checked_sub(self.shift_offset) + .and_then(|value| usize::try_from(value).ok()) }; - - let mut idx: Option = if i < len { Some(i) } else { None }; + let mut idx: Option = i.filter(|i| *i < len); // LAG with IGNORE NULLS calculated as the current row index - offset, but only for non-NULL rows // If current row index points to NULL value the row is NOT counted if self.ignore_nulls && self.is_lag() { // LAG when NULLS are ignored. // Find the nonNULL row index that shifted by offset comparing to current row index - idx = if self.non_null_offsets.len() == self.shift_offset as usize { + let shift_offset = offset_magnitude(self.shift_offset); + idx = if self.non_null_offsets.len() == shift_offset { let total_offset: usize = self.non_null_offsets.iter().sum(); Some(range.end - 1 - total_offset) } else { @@ -570,7 +585,7 @@ impl PartitionEvaluator for WindowShiftEvaluator { if array.is_valid(range.end - 1) { // Non-null add new offset self.non_null_offsets.push_back(1); - if self.non_null_offsets.len() > self.shift_offset as usize { + if self.non_null_offsets.len() > shift_offset { // WE do not need to keep track of more than `lag number of offset` values. self.non_null_offsets.pop_front(); } @@ -582,7 +597,7 @@ impl PartitionEvaluator for WindowShiftEvaluator { } else if self.ignore_nulls && !self.is_lag() { // LEAD when NULLS are ignored. // Stores the necessary non-null entry number further than the current row. - let non_null_row_count = (-self.shift_offset) as usize; + let non_null_row_count = offset_magnitude(self.shift_offset); if self.non_null_offsets.is_empty() { // When empty, fill non_null offsets with the data further than the current row. @@ -596,7 +611,8 @@ impl PartitionEvaluator for WindowShiftEvaluator { } // It is enough to keep track of `non_null_row_count + 1` non-null offset. // further data is unnecessary for the result. - if self.non_null_offsets.len() == non_null_row_count + 1 { + if self.non_null_offsets.len() == non_null_row_count.saturating_add(1) + { break; } } diff --git a/datafusion/sqllogictest/test_files/lead_lag_extreme_offsets.slt b/datafusion/sqllogictest/test_files/lead_lag_extreme_offsets.slt new file mode 100644 index 0000000000000..2fd02e82d34e4 --- /dev/null +++ b/datafusion/sqllogictest/test_files/lead_lag_extreme_offsets.slt @@ -0,0 +1,38 @@ +# Tests for extreme lead offsets regression cases (#22221, #22231) + +statement ok +CREATE TABLE lead_lag_extreme_offsets(id INT, value INT) AS VALUES + (1, 10), + (2, 20), + (3, 30); + +# i64::MIN lead offset should not panic and should produce the provided default +query I +SELECT lead(value, -9223372036854775808, 0) OVER (ORDER BY id) FROM lead_lag_extreme_offsets ORDER BY id; +---- +0 +0 +0 + +# Very large lead offset should not panic and should produce the provided default +query I +SELECT lead(value, 9223372036854775807, 0) OVER (ORDER BY id) FROM lead_lag_extreme_offsets ORDER BY id; +---- +0 +0 +0 + +# i64::MIN lead offset without explicit default should return NULL +query I +SELECT lead(value, -9223372036854775808) OVER (ORDER BY id) FROM lead_lag_extreme_offsets ORDER BY id; +---- +NULL +NULL +NULL + +# Out-of-range offset literal should fail as a non-integer offset +query error DataFusion error: Execution error: Expected an integer value +SELECT lead(value, -9223372036854775809, 0) OVER (ORDER BY id) FROM lead_lag_extreme_offsets; + +statement ok +DROP TABLE lead_lag_extreme_offsets;