diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt b/datafusion/core/tests/sqllogictests/test_files/groupby.slt index b2677679c8b4..5db2ad928007 100644 --- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt +++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt @@ -3348,3 +3348,71 @@ ORDER BY l.sn 2 75 3 3 200 4 4 100 5 + +# create a table +statement ok +CREATE TABLE FOO (x int, y int) AS VALUES (1, 2), (2, 3), (1, 3); + +# make sure that query runs in multi partitions +statement ok +set datafusion.execution.target_partitions = 8; + +query I +SELECT LAST_VALUE(x) +FROM FOO; +---- +1 + +query II +SELECT x, LAST_VALUE(x) +FROM FOO +GROUP BY x +ORDER BY x; +---- +1 1 +2 2 + +query II +SELECT y, LAST_VALUE(x) +FROM FOO +GROUP BY y +ORDER BY y; +---- +2 1 +3 1 + +# plan of the query above should contain partial +# and final aggregation stages +query TT +EXPLAIN SELECT LAST_VALUE(x) + FROM FOO; +---- +logical_plan +Aggregate: groupBy=[[]], aggr=[[LAST_VALUE(foo.x)]] +--TableScan: foo projection=[x] +physical_plan +AggregateExec: mode=Final, gby=[], aggr=[LAST_VALUE(foo.x)] +--CoalescePartitionsExec +----AggregateExec: mode=Partial, gby=[], aggr=[LAST_VALUE(foo.x)] +------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] + +query I +SELECT FIRST_VALUE(x) +FROM FOO; +---- +1 + +# similarly plan of the above query should +# contain partial and final aggregation stages. +query TT +EXPLAIN SELECT FIRST_VALUE(x) + FROM FOO; +---- +logical_plan +Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(foo.x)]] +--TableScan: foo projection=[x] +physical_plan +AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(foo.x)] +--CoalescePartitionsExec +----AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(foo.x)] +------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs index f322419a7bdc..656f30a13504 100644 --- a/datafusion/physical-expr/src/aggregate/first_last.rs +++ b/datafusion/physical-expr/src/aggregate/first_last.rs @@ -202,7 +202,7 @@ impl Accumulator for FirstValueAccumulator { let is_set_flags = &states[last_idx]; let flags = is_set_flags.as_boolean(); let mut filtered_first_vals = vec![]; - for state in states.iter().take(last_idx - 1) { + for state in states.iter().take(last_idx) { filtered_first_vals.push(compute::filter(state, flags)?) } self.update_batch(&filtered_first_vals) @@ -387,7 +387,7 @@ impl Accumulator for LastValueAccumulator { let is_set_flags = &states[last_idx]; let flags = is_set_flags.as_boolean(); let mut filtered_first_vals = vec![]; - for state in states.iter().take(last_idx - 1) { + for state in states.iter().take(last_idx) { filtered_first_vals.push(compute::filter(state, flags)?) } self.update_batch(&filtered_first_vals)