Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
768b3e9
impl map_from_entries
Dec 14, 2025
c68c342
Revert "impl map_from_entries"
Dec 16, 2025
d887555
Merge branch 'apache:main' into main
kazantsev-maksim Dec 16, 2025
231aa90
Merge branch 'apache:main' into main
kazantsev-maksim Dec 17, 2025
9500bbb
Merge branch 'apache:main' into main
kazantsev-maksim Dec 24, 2025
9577481
Merge branch 'apache:main' into main
kazantsev-maksim Dec 28, 2025
3791557
Merge branch 'apache:main' into main
kazantsev-maksim Jan 2, 2026
7c2f082
Merge branch 'apache:main' into main
kazantsev-maksim Jan 3, 2026
609a605
Merge branch 'apache:main' into main
kazantsev-maksim Jan 6, 2026
a151b2c
Merge branch 'apache:main' into main
kazantsev-maksim Jan 7, 2026
ad3e7f5
Merge branch 'apache:main' into main
kazantsev-maksim Jan 10, 2026
ea92e4b
Merge branch 'apache:main' into main
kazantsev-maksim Jan 14, 2026
8dfeca3
Merge branch 'apache:main' into main
kazantsev-maksim Jan 17, 2026
559741e
Merge branch 'apache:main' into main
kazantsev-maksim Jan 20, 2026
ebda14e
Merge branch 'apache:main' into main
kazantsev-maksim Jan 21, 2026
408152e
Merge branch 'apache:main' into main
kazantsev-maksim Jan 23, 2026
d7857b2
Merge branch 'apache:main' into main
kazantsev-maksim Jan 24, 2026
aef41be
Merge branch 'apache:main' into main
kazantsev-maksim Jan 29, 2026
5ac1c58
Merge branch 'apache:main' into main
kazantsev-maksim Jan 30, 2026
9ae8e23
Merge branch 'apache:main' into main
kazantsev-maksim Feb 1, 2026
5ca3888
Merge branch 'apache:main' into main
kazantsev-maksim Feb 4, 2026
160a817
Merge branch 'apache:main' into main
kazantsev-maksim Feb 5, 2026
88fc313
Merge branch 'apache:main' into main
kazantsev-maksim Feb 7, 2026
e14c180
Merge branch 'apache:main' into main
kazantsev-maksim Feb 13, 2026
610a885
Merge branch 'apache:main' into main
kazantsev-maksim Feb 20, 2026
f8acb2c
Merge branch 'apache:main' into main
kazantsev-maksim Feb 21, 2026
ec94897
Merge branch 'apache:main' into main
kazantsev-maksim Feb 26, 2026
43405e4
Merge branch 'apache:main' into main
kazantsev-maksim Feb 27, 2026
47b4915
Merge branch 'apache:main' into main
kazantsev-maksim Mar 1, 2026
26e2682
Merge branch 'apache:main' into main
kazantsev-maksim Mar 3, 2026
6cb5f07
Merge branch 'apache:main' into main
kazantsev-maksim Mar 4, 2026
ec194fb
Merge branch 'apache:main' into main
kazantsev-maksim Mar 31, 2026
256fccb
Merge branch 'apache:main' into main
kazantsev-maksim Apr 3, 2026
912c8f9
Merge branch 'apache:main' into main
kazantsev-maksim Apr 3, 2026
8430632
WIP
Apr 5, 2026
561a664
Merge branch 'apache:main' into main
kazantsev-maksim Apr 8, 2026
d926ef4
Merge branch 'apache:main' into main
kazantsev-maksim Apr 11, 2026
08f3e5b
Merge remote-tracking branch 'origin/main' into array_distinct
Apr 12, 2026
17cbf4f
Feat: support function compatible array_distinct
Apr 12, 2026
1069eb2
Merge branch 'main' into array_distinct
kazantsev-maksim Apr 13, 2026
cc49db4
Merge branch 'main' into array_distinct
kazantsev-maksim Apr 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions native/spark-expr/src/array_funcs/array_distinct.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{
as_large_list_array, as_list_array, Array, ArrayRef, GenericListArray, OffsetSizeTrait,
};
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::{DataType, Field, FieldRef};
use arrow::row::{Row, RowConverter, SortField};
use datafusion::common::utils::take_function_args;
use datafusion::common::Result;
use datafusion::common::{exec_err, HashSet};
use datafusion::functions::utils::make_scalar_function;
use datafusion::logical_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
};
use std::any::Any;
use std::sync::Arc;

#[derive(Debug, Hash, Eq, PartialEq)]
pub struct SparkArrayDistinct {
signature: Signature,
}

impl Default for SparkArrayDistinct {
fn default() -> Self {
Self::new()
}
}

impl SparkArrayDistinct {
pub fn new() -> Self {
use DataType::*;
Self {
signature: Signature::uniform(
1,
vec![
List(Arc::new(Field::new("item", Null, true))),
LargeList(Arc::new(Field::new("item", Null, true))),
],
Volatility::Immutable,
),
}
}
}

impl ScalarUDFImpl for SparkArrayDistinct {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"array_distinct"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].clone())
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
make_scalar_function(array_distinct_inner, vec![])(&args.args)
}
}

fn array_distinct_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
let [array] = take_function_args("array_distinct", args)?;
match array.data_type() {
DataType::List(field) => {
let array = as_list_array(array);
general_array_distinct(array, field)
}
DataType::LargeList(field) => {
let array = as_large_list_array(array);
general_array_distinct(array, field)
}
_ => {
exec_err!(
"array_distinct function only support arrays, got: {:?}",
array.data_type()
)
}
}
}

fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
array: &GenericListArray<OffsetSize>,
field: &FieldRef,
) -> Result<ArrayRef> {
if array.is_empty() {
return Ok(Arc::new(array.clone()) as ArrayRef);
}

let value_offsets = array.value_offsets();
let original_data = array.values().to_data();
let dt = array.value_type();
let mut offsets = Vec::with_capacity(array.len() + 1);
offsets.push(OffsetSize::usize_as(0));

let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;

let first_offset = value_offsets[0].as_usize();
let visible_len = value_offsets[array.len()].as_usize() - first_offset;
let rows = converter.convert_columns(&[array.values().slice(first_offset, visible_len)])?;

let mut mutable = arrow::array::MutableArrayData::new(vec![&original_data], false, visible_len);

for i in 0..array.len() {
let last_offset = *offsets.last().unwrap();

if array.is_null(i) {
offsets.push(last_offset);
continue;
}

let start = value_offsets[i].as_usize() - first_offset;
let end = value_offsets[i + 1].as_usize() - first_offset;
let array_len = end - start;

let mut seen: HashSet<Row<'_>> = HashSet::with_capacity(array_len);
let mut seen_null = false;
let mut distinct_count: usize = 0;

for idx in start..end {
let abs_idx = idx + first_offset;

if array.values().is_null(abs_idx) {
if !seen_null {
seen_null = true;
mutable.extend(0, abs_idx, abs_idx + 1);
distinct_count += 1;
}
} else {
let row = rows.row(idx);
if seen.insert(row) {
mutable.extend(0, abs_idx, abs_idx + 1);
distinct_count += 1;
}
}
}

offsets.push(last_offset + OffsetSize::usize_as(distinct_count));
}

let final_values = arrow::array::make_array(mutable.freeze());

Ok(Arc::new(GenericListArray::<OffsetSize>::try_new(
Arc::clone(field),
OffsetBuffer::new(offsets.into()),
final_values,
array.nulls().cloned(),
)?))
}
2 changes: 2 additions & 0 deletions native/spark-expr/src/array_funcs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
// under the License.

mod array_compact;
mod array_distinct;
mod array_insert;
mod get_array_struct_fields;
mod list_extract;
mod size;

pub use array_compact::SparkArrayCompact;
pub use array_distinct::SparkArrayDistinct;
pub use array_insert::ArrayInsert;
pub use get_array_struct_fields::GetArrayStructFields;
pub use list_extract::ListExtract;
Expand Down
5 changes: 3 additions & 2 deletions native/spark-expr/src/comet_scalar_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::math_funcs::modulo_expr::spark_modulo;
use crate::{
spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor, spark_isnan,
spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, spark_unhex,
spark_unscaled_value, EvalMode, SparkArrayCompact, SparkContains, SparkDateDiff,
SparkDateTrunc, SparkMakeDate, SparkSizeFunc,
spark_unscaled_value, EvalMode, SparkArrayCompact, SparkArrayDistinct, SparkContains,
SparkDateDiff, SparkDateTrunc, SparkMakeDate, SparkSizeFunc,
};
use arrow::datatypes::DataType;
use datafusion::common::{DataFusionError, Result as DataFusionResult};
Expand Down Expand Up @@ -206,6 +206,7 @@ fn all_scalar_functions() -> Vec<Arc<ScalarUDF>> {
Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())),
Arc::new(ScalarUDF::new_from_impl(SparkMakeDate::default())),
Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())),
Arc::new(ScalarUDF::new_from_impl(SparkArrayDistinct::default())),
]
}

Expand Down
17 changes: 1 addition & 16 deletions spark/src/main/scala/org/apache/comet/serde/arrays.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,7 @@ object CometArrayContains extends CometExpressionSerde[ArrayContains] {
}
}

object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] {

override def getSupportLevel(expr: ArrayDistinct): SupportLevel =
Incompatible(Some("Output elements are sorted rather than preserving insertion order"))

override def convert(
expr: ArrayDistinct,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
val arrayExprProto = exprToProto(expr.children.head, inputs, binding)

val arrayDistinctScalarExpr =
scalarFunctionExprToProto("array_distinct", arrayExprProto)
optExprWithInfo(arrayDistinctScalarExpr, expr)
}
}
object CometArrayDistinct extends CometScalarFunction[ArrayDistinct]("array_distinct")

object CometArrayIntersect extends CometExpressionSerde[ArrayIntersect] {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,26 @@ INSERT INTO test_array_distinct_int VALUES
(array(0, -1, -1, 0, 1))

-- column argument
query spark_answer_only
query
SELECT array_distinct(arr) FROM test_array_distinct_int

-- literal arguments
query spark_answer_only
query
SELECT array_distinct(array(1, 2, 2, 3, 3))

query
SELECT array_distinct(array(3, 2, 2, 1, 1))

-- all NULLs
query spark_answer_only
query
SELECT array_distinct(array(CAST(NULL AS INT), CAST(NULL AS INT)))

-- NULL input
query spark_answer_only
query
SELECT array_distinct(CAST(NULL AS array<int>))

-- boundary values
query spark_answer_only
query
SELECT array_distinct(array(-2147483648, 2147483647, -2147483648, 2147483647, 0))

-- ===== LONG arrays =====
Expand All @@ -65,11 +68,11 @@ INSERT INTO test_array_distinct_long VALUES
(array(NULL, 1, NULL, 2)),
(array(-9223372036854775808, 9223372036854775807, -9223372036854775808))

query spark_answer_only
query
SELECT array_distinct(arr) FROM test_array_distinct_long

-- boundary values
query spark_answer_only
query
SELECT array_distinct(array(CAST(-9223372036854775808 AS BIGINT), CAST(9223372036854775807 AS BIGINT), CAST(-9223372036854775808 AS BIGINT)))

-- ===== STRING arrays =====
Expand All @@ -86,11 +89,11 @@ INSERT INTO test_array_distinct_string VALUES
(array('', '', NULL, '')),
(array('hello', 'world', 'hello'))

query spark_answer_only
query
SELECT array_distinct(arr) FROM test_array_distinct_string

-- empty string and NULL distinction
query spark_answer_only
query
SELECT array_distinct(array('', NULL, '', NULL, 'a'))

-- ===== BOOLEAN arrays =====
Expand All @@ -105,7 +108,7 @@ INSERT INTO test_array_distinct_bool VALUES
(NULL),
(array(NULL, true, NULL, false))

query spark_answer_only
query
SELECT array_distinct(arr) FROM test_array_distinct_bool

-- ===== DOUBLE arrays =====
Expand All @@ -119,23 +122,23 @@ INSERT INTO test_array_distinct_double VALUES
(NULL),
(array(NULL, 1.0, NULL, 2.0))

query spark_answer_only
query
SELECT array_distinct(arr) FROM test_array_distinct_double

-- NaN deduplication
query spark_answer_only
query
SELECT array_distinct(array(CAST('NaN' AS DOUBLE), CAST('NaN' AS DOUBLE), 1.0, 1.0))

-- NaN with NULL
query spark_answer_only
query
SELECT array_distinct(array(CAST('NaN' AS DOUBLE), NULL, CAST('NaN' AS DOUBLE), NULL, 1.0))

-- Infinity
query spark_answer_only
query
SELECT array_distinct(array(CAST('Infinity' AS DOUBLE), CAST('-Infinity' AS DOUBLE), CAST('Infinity' AS DOUBLE), 0.0))

-- negative zero
query spark_answer_only
query
SELECT array_distinct(array(0.0, -0.0, 1.0))

-- ===== FLOAT arrays =====
Expand All @@ -149,11 +152,11 @@ INSERT INTO test_array_distinct_float VALUES
(NULL),
(array(CAST(NULL AS FLOAT), CAST(1.0 AS FLOAT), CAST(NULL AS FLOAT)))

query spark_answer_only
query
SELECT array_distinct(arr) FROM test_array_distinct_float

-- Float NaN deduplication
query spark_answer_only
query
SELECT array_distinct(array(CAST('NaN' AS FLOAT), CAST('NaN' AS FLOAT), CAST(1.0 AS FLOAT)))

-- ===== DECIMAL arrays =====
Expand All @@ -167,13 +170,13 @@ INSERT INTO test_array_distinct_decimal VALUES
(NULL),
(array(NULL, 1.10, NULL, 1.10))

query spark_answer_only
query
SELECT array_distinct(arr) FROM test_array_distinct_decimal

-- ===== Nested array (array of arrays) =====

query spark_answer_only
query
SELECT array_distinct(array(array(1, 2), array(3, 4), array(1, 2), array(3, 4)))

query spark_answer_only
query
SELECT array_distinct(array(array(1, 2), CAST(NULL AS array<int>), array(1, 2), CAST(NULL AS array<int>)))