From 8e67fcc4d1d6b67729804825f8f9e46397fdebff Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 6 Jul 2024 11:07:21 +0800 Subject: [PATCH 01/11] change array agg semantic for empty result Signed-off-by: jayzhan211 --- datafusion/common/src/scalar/mod.rs | 12 ++ .../physical-expr/src/aggregate/array_agg.rs | 6 +- .../src/aggregate/array_agg_distinct.rs | 6 + .../sqllogictest/test_files/aggregate.slt | 115 ++++++++++++++---- 4 files changed, 115 insertions(+), 24 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 55ce76c4b939..efa0b547aee3 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -1978,6 +1978,18 @@ impl ScalarValue { Self::new_list(values, data_type, true) } + /// Create empty ListArray with specific data type + /// + /// This is different from `new_list(&[], data_type, nullable)`, where it has no inner array. + /// + /// - new_empty_list(i32): `ListArray[]` + /// + /// - new_list(&[], i32, nullable): `ListArray[Int32Array[]]`, + pub fn new_empty_list(data_type: DataType, nullable: bool) -> Self { + let data_type = DataType::List(Field::new_list_field(data_type, nullable).into()); + Self::List(Arc::new(ListArray::from(ArrayData::new_empty(&data_type)))) + } + /// Converts `IntoIterator` where each element has type corresponding to /// `data_type`, to a [`ListArray`]. /// diff --git a/datafusion/physical-expr/src/aggregate/array_agg.rs b/datafusion/physical-expr/src/aggregate/array_agg.rs index c5a0662a2283..e8a2c1908190 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg.rs @@ -167,8 +167,10 @@ impl Accumulator for ArrayAggAccumulator { self.values.iter().map(|a| a.as_ref()).collect(); if element_arrays.is_empty() { - let arr = ScalarValue::new_list(&[], &self.datatype, self.nullable); - return Ok(ScalarValue::List(arr)); + return Ok(ScalarValue::new_empty_list( + self.datatype.clone(), + self.nullable, + )); } let concated_array = arrow::compute::concat(&element_arrays)?; diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs index fc838196de20..a334cd2bcd8d 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs @@ -165,6 +165,12 @@ impl Accumulator for DistinctArrayAggAccumulator { fn evaluate(&mut self) -> Result { let values: Vec = self.values.iter().cloned().collect(); + if values.is_empty() { + return Ok(ScalarValue::new_empty_list( + self.datatype.clone(), + self.nullable, + )); + } let arr = ScalarValue::new_list(&values, &self.datatype, self.nullable); Ok(ScalarValue::List(arr)) } diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index e891093c8156..7a91a644ea38 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -1694,7 +1694,6 @@ SELECT array_agg(c13) FROM (SELECT * FROM aggregate_test_100 ORDER BY c13 LIMIT query ? SELECT array_agg(c13) FROM (SELECT * FROM aggregate_test_100 LIMIT 0) test ---- -[] # csv_query_array_agg_one query ? @@ -1758,7 +1757,7 @@ NULL NULL 781 7.81 125 -117 100 # additional count(1) forces array_agg_distinct instead of array_agg over aggregated by c2 data # # csv_query_array_agg_distinct -query III +query error DataFusion error: External error: Arrow error: Invalid argument error: all columns in a record batch must have the same length WITH indices AS ( SELECT 1 AS idx UNION ALL SELECT 2 AS idx UNION ALL @@ -1772,12 +1771,6 @@ FROM ( ) data CROSS JOIN indices ORDER BY 1 ----- -1 5 100 -2 5 100 -3 5 100 -4 5 100 -5 5 100 # aggregate_time_min_and_max query TT @@ -2732,6 +2725,16 @@ SELECT COUNT(DISTINCT c1) FROM test # TODO: aggregate_with_alias +# test_approx_percentile_cont_decimal_support +query TI +SELECT c1, approx_percentile_cont(c2, cast(0.85 as decimal(10,2))) apc FROM aggregate_test_100 GROUP BY 1 ORDER BY 1 +---- +a 4 +b 5 +c 4 +d 4 +e 4 + # array_agg_zero query ? SELECT ARRAY_AGG([]) @@ -2744,28 +2747,96 @@ SELECT ARRAY_AGG([1]) ---- [[1]] -# test_approx_percentile_cont_decimal_support -query TI -SELECT c1, approx_percentile_cont(c2, cast(0.85 as decimal(10,2))) apc FROM aggregate_test_100 GROUP BY 1 ORDER BY 1 +# test array_agg with no row qualified +statement ok +create table t(a int, b float, c bigint) as values (1, 1.2, 2); + +query ? +select array_agg(a) from t where a > 2; ---- -a 4 -b 5 -c 4 -d 4 -e 4 +query ? +select array_agg(b) from t where b > 3.1; +---- -# array_agg_zero query ? -SELECT ARRAY_AGG([]); +select array_agg(c) from t where c > 3; ---- -[[]] -# array_agg_one query ? -SELECT ARRAY_AGG([1]); +select array_agg(a) from t where a > 3 group by a; ---- -[[1]] + +query TT +explain select array_agg(a) from t where a > 3 group by a; +---- +logical_plan +01)Projection: ARRAY_AGG(t.a) +02)--Aggregate: groupBy=[[t.a]], aggr=[[ARRAY_AGG(t.a)]] +03)----Filter: t.a > Int32(3) +04)------TableScan: t projection=[a] +physical_plan +01)ProjectionExec: expr=[ARRAY_AGG(t.a)@1 as ARRAY_AGG(t.a)] +02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[ARRAY_AGG(t.a)] +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 +05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[ARRAY_AGG(t.a)] +06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)------------CoalesceBatchesExec: target_batch_size=8192 +08)--------------FilterExec: a@0 > 3 +09)----------------MemoryExec: partitions=1, partition_sizes=[1] + +# TODO: Expect no row, but got empty list +query ? +select array_agg(distinct a) from t where a > 3; +---- +[] + +query TT +explain select array_agg(distinct a) from t where a > 3; +---- +logical_plan +01)Projection: ARRAY_AGG(alias1) AS ARRAY_AGG(DISTINCT t.a) +02)--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(alias1)]] +03)----Aggregate: groupBy=[[t.a AS alias1]], aggr=[[]] +04)------Filter: t.a > Int32(3) +05)--------TableScan: t projection=[a] +physical_plan +01)ProjectionExec: expr=[ARRAY_AGG(alias1)@0 as ARRAY_AGG(DISTINCT t.a)] +02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(alias1)] +03)----CoalescePartitionsExec +04)------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(alias1)] +05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[] +06)----------CoalesceBatchesExec: target_batch_size=8192 +07)------------RepartitionExec: partitioning=Hash([alias1@0], 4), input_partitions=4 +08)--------------AggregateExec: mode=Partial, gby=[a@0 as alias1], aggr=[] +09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +10)------------------CoalesceBatchesExec: target_batch_size=8192 +11)--------------------FilterExec: a@0 > 3 +12)----------------------MemoryExec: partitions=1, partition_sizes=[1] + +statement ok +drop table t; + +# test with no values +statement ok +create table t(a int, b float, c bigint); + +query ? +select array_agg(a) from t; +---- + +query ? +select array_agg(b) from t; +---- + +query ? +select array_agg(c) from t; +---- + +statement ok +drop table t; + # array_agg_i32 statement ok From 13aaf09c05e96e654a0a1ed891744008d4bc5b0e Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 6 Jul 2024 12:01:53 +0800 Subject: [PATCH 02/11] return null Signed-off-by: jayzhan211 --- datafusion/common/src/scalar/mod.rs | 14 ++- .../physical-expr/src/aggregate/array_agg.rs | 7 +- .../src/aggregate/array_agg_distinct.rs | 7 +- .../sqllogictest/test_files/aggregate.slt | 90 +++++++------------ 4 files changed, 47 insertions(+), 71 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index efa0b547aee3..38ed6699ddd4 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -1978,16 +1978,14 @@ impl ScalarValue { Self::new_list(values, data_type, true) } - /// Create empty ListArray with specific data type + /// Create ListArray with Null with specific data type /// - /// This is different from `new_list(&[], data_type, nullable)`, where it has no inner array. - /// - /// - new_empty_list(i32): `ListArray[]` - /// - /// - new_list(&[], i32, nullable): `ListArray[Int32Array[]]`, - pub fn new_empty_list(data_type: DataType, nullable: bool) -> Self { + /// - new_null_list(i32, nullable, 1): `ListArray[NULL]` + pub fn new_null_list(data_type: DataType, nullable: bool, null_len: usize) -> Self { let data_type = DataType::List(Field::new_list_field(data_type, nullable).into()); - Self::List(Arc::new(ListArray::from(ArrayData::new_empty(&data_type)))) + Self::List(Arc::new(ListArray::from(ArrayData::new_null( + &data_type, null_len, + )))) } /// Converts `IntoIterator` where each element has type corresponding to diff --git a/datafusion/physical-expr/src/aggregate/array_agg.rs b/datafusion/physical-expr/src/aggregate/array_agg.rs index e8a2c1908190..78af4f89bcdc 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg.rs @@ -71,7 +71,7 @@ impl AggregateExpr for ArrayAgg { &self.name, // This should be the same as return type of AggregateFunction::ArrayAgg Field::new("item", self.input_data_type.clone(), self.nullable), - false, + true, )) } @@ -86,7 +86,7 @@ impl AggregateExpr for ArrayAgg { Ok(vec![Field::new_list( format_state_name(&self.name, "array_agg"), Field::new("item", self.input_data_type.clone(), self.nullable), - false, + true, )]) } @@ -167,9 +167,10 @@ impl Accumulator for ArrayAggAccumulator { self.values.iter().map(|a| a.as_ref()).collect(); if element_arrays.is_empty() { - return Ok(ScalarValue::new_empty_list( + return Ok(ScalarValue::new_null_list( self.datatype.clone(), self.nullable, + 1, )); } diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs index a334cd2bcd8d..bcf73855ec63 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs @@ -75,7 +75,7 @@ impl AggregateExpr for DistinctArrayAgg { &self.name, // This should be the same as return type of AggregateFunction::ArrayAgg Field::new("item", self.input_data_type.clone(), self.nullable), - false, + true, )) } @@ -90,7 +90,7 @@ impl AggregateExpr for DistinctArrayAgg { Ok(vec![Field::new_list( format_state_name(&self.name, "distinct_array_agg"), Field::new("item", self.input_data_type.clone(), self.nullable), - false, + true, )]) } @@ -166,9 +166,10 @@ impl Accumulator for DistinctArrayAggAccumulator { fn evaluate(&mut self) -> Result { let values: Vec = self.values.iter().cloned().collect(); if values.is_empty() { - return Ok(ScalarValue::new_empty_list( + return Ok(ScalarValue::new_null_list( self.datatype.clone(), self.nullable, + 1, )); } let arr = ScalarValue::new_list(&values, &self.datatype, self.nullable); diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 7a91a644ea38..d9b468ad219b 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -1694,6 +1694,7 @@ SELECT array_agg(c13) FROM (SELECT * FROM aggregate_test_100 ORDER BY c13 LIMIT query ? SELECT array_agg(c13) FROM (SELECT * FROM aggregate_test_100 LIMIT 0) test ---- +NULL # csv_query_array_agg_one query ? @@ -1752,25 +1753,12 @@ NULL 4 29 1.260869565217 123 -117 23 NULL 5 -194 -13.857142857143 118 -101 14 NULL NULL 781 7.81 125 -117 100 -# TODO: array_agg_distinct output is non-deterministic -- rewrite with array_sort(list_sort) -# unnest is also not available, so manually unnesting via CROSS JOIN -# additional count(1) forces array_agg_distinct instead of array_agg over aggregated by c2 data -# +# select with count to forces array_agg_distinct function, since single distinct expression is converted to group by by optimizer # csv_query_array_agg_distinct -query error DataFusion error: External error: Arrow error: Invalid argument error: all columns in a record batch must have the same length -WITH indices AS ( - SELECT 1 AS idx UNION ALL - SELECT 2 AS idx UNION ALL - SELECT 3 AS idx UNION ALL - SELECT 4 AS idx UNION ALL - SELECT 5 AS idx -) -SELECT data.arr[indices.idx] as element, array_length(data.arr) as array_len, dummy -FROM ( - SELECT array_agg(distinct c2) as arr, count(1) as dummy FROM aggregate_test_100 -) data - CROSS JOIN indices -ORDER BY 1 +query ?I +SELECT array_sort(array_agg(distinct c2)), count(1) FROM aggregate_test_100 +---- +[1, 2, 3, 4, 5] 100 # aggregate_time_min_and_max query TT @@ -2754,66 +2742,51 @@ create table t(a int, b float, c bigint) as values (1, 1.2, 2); query ? select array_agg(a) from t where a > 2; ---- +NULL query ? select array_agg(b) from t where b > 3.1; ---- +NULL query ? select array_agg(c) from t where c > 3; ---- +NULL + +query ?I +select array_agg(c), count(1) from t where c > 3; +---- +NULL 0 +# returns 0 rows if group by is applied query ? select array_agg(a) from t where a > 3 group by a; ---- -query TT -explain select array_agg(a) from t where a > 3 group by a; +query ?I +select array_agg(a), count(1) from t where a > 3 group by a; ---- -logical_plan -01)Projection: ARRAY_AGG(t.a) -02)--Aggregate: groupBy=[[t.a]], aggr=[[ARRAY_AGG(t.a)]] -03)----Filter: t.a > Int32(3) -04)------TableScan: t projection=[a] -physical_plan -01)ProjectionExec: expr=[ARRAY_AGG(t.a)@1 as ARRAY_AGG(t.a)] -02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[ARRAY_AGG(t.a)] -03)----CoalesceBatchesExec: target_batch_size=8192 -04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 -05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[ARRAY_AGG(t.a)] -06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -07)------------CoalesceBatchesExec: target_batch_size=8192 -08)--------------FilterExec: a@0 > 3 -09)----------------MemoryExec: partitions=1, partition_sizes=[1] -# TODO: Expect no row, but got empty list +# TODO: Expect NULL, got empty list query ? select array_agg(distinct a) from t where a > 3; ---- [] -query TT -explain select array_agg(distinct a) from t where a > 3; +query ?I +select array_agg(distinct a), count(1) from t where a > 3; +---- +NULL 0 + +# returns 0 rows if group by is applied +query ? +select array_agg(distinct a) from t where a > 3 group by a; +---- + +query ?I +select array_agg(distinct a), count(1) from t where a > 3 group by a; ---- -logical_plan -01)Projection: ARRAY_AGG(alias1) AS ARRAY_AGG(DISTINCT t.a) -02)--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(alias1)]] -03)----Aggregate: groupBy=[[t.a AS alias1]], aggr=[[]] -04)------Filter: t.a > Int32(3) -05)--------TableScan: t projection=[a] -physical_plan -01)ProjectionExec: expr=[ARRAY_AGG(alias1)@0 as ARRAY_AGG(DISTINCT t.a)] -02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(alias1)] -03)----CoalescePartitionsExec -04)------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(alias1)] -05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[] -06)----------CoalesceBatchesExec: target_batch_size=8192 -07)------------RepartitionExec: partitioning=Hash([alias1@0], 4), input_partitions=4 -08)--------------AggregateExec: mode=Partial, gby=[a@0 as alias1], aggr=[] -09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -10)------------------CoalesceBatchesExec: target_batch_size=8192 -11)--------------------FilterExec: a@0 > 3 -12)----------------------MemoryExec: partitions=1, partition_sizes=[1] statement ok drop table t; @@ -2825,14 +2798,17 @@ create table t(a int, b float, c bigint); query ? select array_agg(a) from t; ---- +NULL query ? select array_agg(b) from t; ---- +NULL query ? select array_agg(c) from t; ---- +NULL statement ok drop table t; From 8d57ae8bc85c9a8d82cd38927589ed2e86a7dc96 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 6 Jul 2024 13:27:21 +0800 Subject: [PATCH 03/11] fix test Signed-off-by: jayzhan211 --- datafusion/expr/src/aggregate_function.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/aggregate_function.rs b/datafusion/expr/src/aggregate_function.rs index 760952d94815..1070a1b0aa3b 100644 --- a/datafusion/expr/src/aggregate_function.rs +++ b/datafusion/expr/src/aggregate_function.rs @@ -123,7 +123,7 @@ impl AggregateFunction { pub fn nullable(&self) -> Result { match self { AggregateFunction::Max | AggregateFunction::Min => Ok(true), - AggregateFunction::ArrayAgg => Ok(false), + AggregateFunction::ArrayAgg => Ok(true), AggregateFunction::NthValue => Ok(true), } } From b4747a4f611576e49c3dc3ca2a4cc966aa8f7940 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 6 Jul 2024 13:45:39 +0800 Subject: [PATCH 04/11] fix order sensitive Signed-off-by: jayzhan211 --- .../src/aggregate/array_agg_ordered.rs | 12 ++++++++++-- datafusion/sqllogictest/test_files/aggregate.slt | 14 ++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs index 1234ab40c188..406c01b15edc 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs @@ -92,7 +92,7 @@ impl AggregateExpr for OrderSensitiveArrayAgg { &self.name, // This should be the same as return type of AggregateFunction::ArrayAgg Field::new("item", self.input_data_type.clone(), self.nullable), - false, + true, )) } @@ -111,7 +111,7 @@ impl AggregateExpr for OrderSensitiveArrayAgg { let mut fields = vec![Field::new_list( format_state_name(&self.name, "array_agg"), Field::new("item", self.input_data_type.clone(), self.nullable), - false, // This should be the same as field() + true, // This should be the same as field() )]; let orderings = ordering_fields(&self.ordering_req, &self.order_by_data_types); fields.push(Field::new_list( @@ -309,6 +309,14 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { } fn evaluate(&mut self) -> Result { + if self.values.is_empty() { + return Ok(ScalarValue::new_null_list( + self.datatypes[0].clone(), + self.nullable, + 1, + )); + } + let values = self.values.clone(); let array = if self.reverse { ScalarValue::new_list_from_iter( diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index d9b468ad219b..1f2bb2c5638c 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -2788,6 +2788,20 @@ query ?I select array_agg(distinct a), count(1) from t where a > 3 group by a; ---- +# test order sensitive array agg +query ? +select array_agg(a order by a) from t where a > 3; +---- +NULL + +query ? +select array_agg(a order by a) from t where a > 3 group by a; +---- + +query ?I +select array_agg(a order by a), count(1) from t where a > 3 group by a; +---- + statement ok drop table t; From 7e235697f81be446bc9754229308e80a4e16282a Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 6 Jul 2024 15:20:02 +0800 Subject: [PATCH 05/11] fix test Signed-off-by: jayzhan211 --- datafusion/core/tests/dataframe/mod.rs | 2 +- datafusion/core/tests/sql/aggregates.rs | 2 +- datafusion/physical-expr/src/aggregate/build_in.rs | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index c3bc2fcca2b5..92d88dfdee3e 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -1386,7 +1386,7 @@ async fn unnest_with_redundant_columns() -> Result<()> { let expected = vec![ "Projection: shapes.shape_id [shape_id:UInt32]", " Unnest: lists[shape_id2] structs[] [shape_id:UInt32, shape_id2:UInt32;N]", - " Aggregate: groupBy=[[shapes.shape_id]], aggr=[[ARRAY_AGG(shapes.shape_id) AS shape_id2]] [shape_id:UInt32, shape_id2:List(Field { name: \"item\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} })]", + " Aggregate: groupBy=[[shapes.shape_id]], aggr=[[ARRAY_AGG(shapes.shape_id) AS shape_id2]] [shape_id:UInt32, shape_id2:List(Field { name: \"item\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} });N]", " TableScan: shapes projection=[shape_id] [shape_id:UInt32]", ]; diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index e503b74992c3..86032dc9bc96 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -37,7 +37,7 @@ async fn csv_query_array_agg_distinct() -> Result<()> { Schema::new(vec![Field::new_list( "ARRAY_AGG(DISTINCT aggregate_test_100.c2)", Field::new("item", DataType::UInt32, false), - false + true ),]) ); diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs b/datafusion/physical-expr/src/aggregate/build_in.rs index adbbbd3e631e..35709b6b6c37 100644 --- a/datafusion/physical-expr/src/aggregate/build_in.rs +++ b/datafusion/physical-expr/src/aggregate/build_in.rs @@ -167,7 +167,7 @@ mod tests { Field::new_list( "c1", Field::new("item", data_type.clone(), true), - false, + true, ), result_agg_phy_exprs.field().unwrap() ); @@ -187,7 +187,7 @@ mod tests { Field::new_list( "c1", Field::new("item", data_type.clone(), true), - false, + true, ), result_agg_phy_exprs.field().unwrap() ); From b4ab1265cd833a22b5ac0d6297c01b9de8c0e329 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sun, 7 Jul 2024 08:03:01 +0800 Subject: [PATCH 06/11] add more test Signed-off-by: jayzhan211 --- datafusion/sqllogictest/test_files/aggregate.slt | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 1f2bb2c5638c..0ee91838cd28 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -2824,6 +2824,21 @@ select array_agg(c) from t; ---- NULL +query ?I +select array_agg(distinct a), count(1) from t; +---- +NULL 0 + +query ?I +select array_agg(distinct b), count(1) from t; +---- +NULL 0 + +query ?I +select array_agg(distinct b), count(1) from t; +---- +NULL 0 + statement ok drop table t; From 81b8cb6d128169b2b286c997aa2a9b1c890bbb3b Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 9 Jul 2024 08:46:08 +0800 Subject: [PATCH 07/11] fix null Signed-off-by: jayzhan211 --- datafusion/expr/src/aggregate_function.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/aggregate_function.rs b/datafusion/expr/src/aggregate_function.rs index 23e98714dfa4..3cae78eaed9b 100644 --- a/datafusion/expr/src/aggregate_function.rs +++ b/datafusion/expr/src/aggregate_function.rs @@ -118,7 +118,7 @@ impl AggregateFunction { pub fn nullable(&self) -> Result { match self { AggregateFunction::Max | AggregateFunction::Min => Ok(true), - AggregateFunction::ArrayAgg => Ok(false), + AggregateFunction::ArrayAgg => Ok(true), } } } From 248c0ee8e75cd13e3a63877b0068a53c0842e117 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 9 Jul 2024 21:25:24 +0800 Subject: [PATCH 08/11] fix multi-phase case Signed-off-by: jayzhan211 --- datafusion/physical-expr/src/aggregate/array_agg.rs | 6 ++++-- datafusion/sqllogictest/test_files/aggregate.slt | 3 +-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/array_agg.rs b/datafusion/physical-expr/src/aggregate/array_agg.rs index 7756bc4d12b4..38a973802933 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg.rs @@ -137,8 +137,11 @@ impl Accumulator for ArrayAggAccumulator { return Ok(()); } assert!(values.len() == 1, "array_agg can only take 1 param!"); + let val = Arc::clone(&values[0]); - self.values.push(val); + if val.len() > 0 { + self.values.push(val); + } Ok(()) } @@ -162,7 +165,6 @@ impl Accumulator for ArrayAggAccumulator { fn evaluate(&mut self) -> Result { // Transform Vec to ListArr - let element_arrays: Vec<&dyn Array> = self.values.iter().map(|a| a.as_ref()).collect(); diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 0ee91838cd28..993691f4e9d8 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -2768,11 +2768,10 @@ query ?I select array_agg(a), count(1) from t where a > 3 group by a; ---- -# TODO: Expect NULL, got empty list query ? select array_agg(distinct a) from t where a > 3; ---- -[] +NULL query ?I select array_agg(distinct a), count(1) from t where a > 3; From fb8a6782e094ed20923875342de79593fc45ef30 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 9 Jul 2024 21:59:39 +0800 Subject: [PATCH 09/11] add comment Signed-off-by: jayzhan211 --- datafusion/sqllogictest/test_files/aggregate.slt | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 993691f4e9d8..3c715684f805 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -2739,6 +2739,7 @@ SELECT ARRAY_AGG([1]) statement ok create table t(a int, b float, c bigint) as values (1, 1.2, 2); +# Returns NULL, follows DuckDB's behaviour query ? select array_agg(a) from t where a > 2; ---- @@ -2759,7 +2760,7 @@ select array_agg(c), count(1) from t where c > 3; ---- NULL 0 -# returns 0 rows if group by is applied +# returns 0 rows if group by is applied, follows DuckDB's behaviour query ? select array_agg(a) from t where a > 3 group by a; ---- @@ -2768,6 +2769,7 @@ query ?I select array_agg(a), count(1) from t where a > 3 group by a; ---- +# returns NULL, follows DuckDB's behaviour query ? select array_agg(distinct a) from t where a > 3; ---- @@ -2778,7 +2780,7 @@ select array_agg(distinct a), count(1) from t where a > 3; ---- NULL 0 -# returns 0 rows if group by is applied +# returns 0 rows if group by is applied, follows DuckDB's behaviour query ? select array_agg(distinct a) from t where a > 3 group by a; ---- From 9fa5e6af35d4204368b415ac17730d02850aa573 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 9 Jul 2024 22:01:19 +0800 Subject: [PATCH 10/11] cleanup Signed-off-by: jayzhan211 --- datafusion/sqllogictest/test_files/aggregate.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 3c715684f805..7dd1ea82b327 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -2739,7 +2739,7 @@ SELECT ARRAY_AGG([1]) statement ok create table t(a int, b float, c bigint) as values (1, 1.2, 2); -# Returns NULL, follows DuckDB's behaviour +# returns NULL, follows DuckDB's behaviour query ? select array_agg(a) from t where a > 2; ---- From 701b12ab65f7e7f5a72bd4922ba8298a22b5ff11 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 9 Jul 2024 22:25:35 +0800 Subject: [PATCH 11/11] fix clone Signed-off-by: jayzhan211 --- datafusion/physical-plan/src/filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 84afc227578f..bfa36d7ddc5d 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -91,7 +91,7 @@ impl FilterExec { Ok(Self { predicate, - input: input.clone(), + input: Arc::clone(&input), metrics: ExecutionPlanMetricsSet::new(), default_selectivity, cache,