Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
464 changes: 428 additions & 36 deletions datafusion/common/src/scalar/mod.rs

Large diffs are not rendered by default.

29 changes: 28 additions & 1 deletion datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use arrow::array::{
cast::AsArray, Array, ArrayRef, FixedSizeListArray, LargeListArray, ListArray,
OffsetSizeTrait,
};
use arrow::buffer::OffsetBuffer;
use arrow::array::{LargeListViewArray, ListViewArray};
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
use arrow::compute::{partition, SortColumn, SortOptions};
use arrow::datatypes::{DataType, Field, SchemaRef};
#[cfg(feature = "sql")]
Expand Down Expand Up @@ -479,6 +480,32 @@ impl SingleRowListArrayBuilder {
ScalarValue::FixedSizeList(Arc::new(self.build_fixed_size_list_array(list_size)))
}

/// Build a single element [`ListViewArray`]
pub fn build_list_view_array(self) -> ListViewArray {
let (field, arr) = self.into_field_and_arr();
let offsets = ScalarBuffer::from(vec![0]);
let sizes = ScalarBuffer::from(vec![arr.len() as i32]);
ListViewArray::new(field, offsets, sizes, arr, None)
}

/// Build a single element [`ListViewArray`] and wrap as [`ScalarValue::ListView`]
pub fn build_list_view_scalar(self) -> ScalarValue {
ScalarValue::ListView(Arc::new(self.build_list_view_array()))
}

/// Build a single element [`LargeListViewArray`]
pub fn build_large_list_view_array(self) -> LargeListViewArray {
let (field, arr) = self.into_field_and_arr();
let offsets = ScalarBuffer::from(vec![0]);
let sizes = ScalarBuffer::from(vec![arr.len() as i64]);
LargeListViewArray::new(field, offsets, sizes, arr, None)
}

/// Build a single element [`LargeListViewArray`] and wrap as [`ScalarValue::LargeListView`]
pub fn build_large_list_view_scalar(self) -> ScalarValue {
ScalarValue::LargeListView(Arc::new(self.build_large_list_view_array()))
}

/// Helper function: convert this builder into a tuple of field and array
fn into_field_and_arr(self) -> (Arc<Field>, ArrayRef) {
let Self {
Expand Down
8 changes: 4 additions & 4 deletions datafusion/expr-common/src/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,14 +456,14 @@ impl TypeSignatureClass {

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum ArrayFunctionSignature {
/// A function takes at least one List/LargeList/FixedSizeList argument.
/// A function takes at least one List/LargeList/FixedSizeList/ListView/LargeListView argument.
Array {
/// A full list of the arguments accepted by this function.
arguments: Vec<ArrayFunctionArgument>,
/// Additional information about how array arguments should be coerced.
array_coercion: Option<ListCoercion>,
},
/// A function takes a single argument that must be a List/LargeList/FixedSizeList
/// A function takes a single argument that must be a List/LargeList/FixedSizeList/ListView/LargeListView
/// which gets coerced to List, with element type recursively coerced to List too if it is list-like.
RecursiveArray,
/// Specialized Signature for MapArray
Expand Down Expand Up @@ -500,8 +500,8 @@ pub enum ArrayFunctionArgument {
Element,
/// An Int64 index argument.
Index,
/// An argument of type List/LargeList/FixedSizeList. All Array arguments must be coercible
/// to the same type.
/// An argument of type List/LargeList/FixedSizeList/ListView/LargeListView.
/// All Array arguments must be coercible to the same type.
Array,
// A Utf8 argument.
String,
Expand Down
6 changes: 5 additions & 1 deletion datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ message Union{
repeated int32 type_ids = 3;
}

// Used for List/FixedSizeList/LargeList/Struct/Map
// Used for List/FixedSizeList/LargeList/ListView/LargeListView/Struct/Map
message ScalarNestedValue {
message Dictionary {
bytes ipc_message = 1;
Expand Down Expand Up @@ -295,6 +295,8 @@ message ScalarValue{
ScalarNestedValue large_list_value = 16;
ScalarNestedValue list_value = 17;
ScalarNestedValue fixed_size_list_value = 18;
ScalarNestedValue list_view_value = 45;
ScalarNestedValue large_list_view_value = 46;
ScalarNestedValue struct_value = 32;
ScalarNestedValue map_value = 41;

Expand Down Expand Up @@ -385,6 +387,8 @@ message ArrowType{
List LIST = 25;
List LARGE_LIST = 26;
FixedSizeList FIXED_SIZE_LIST = 27;
List LIST_VIEW = 42;
List LARGE_LIST_VIEW = 43;
Struct STRUCT = 28;
Union UNION = 29;
Dictionary DICTIONARY = 30;
Expand Down
18 changes: 18 additions & 0 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,16 @@ impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType {
let list_size = list.list_size;
DataType::FixedSizeList(Arc::new(list_type), list_size)
}
arrow_type::ArrowTypeEnum::ListView(list) => {
let list_type =
list.as_ref().field_type.as_deref().required("field_type")?;
DataType::ListView(Arc::new(list_type))
}
arrow_type::ArrowTypeEnum::LargeListView(list) => {
let list_type =
list.as_ref().field_type.as_deref().required("field_type")?;
DataType::LargeListView(Arc::new(list_type))
}
arrow_type::ArrowTypeEnum::Struct(strct) => DataType::Struct(
parse_proto_fields_to_fields(&strct.sub_field_types)?.into(),
),
Expand Down Expand Up @@ -385,6 +395,8 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
Value::ListValue(v)
| Value::FixedSizeListValue(v)
| Value::LargeListValue(v)
| Value::ListViewValue(v)
| Value::LargeListViewValue(v)
| Value::StructValue(v)
| Value::MapValue(v) => {
let protobuf::ScalarNestedValue {
Expand Down Expand Up @@ -469,6 +481,12 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
Value::FixedSizeListValue(_) => {
Self::FixedSizeList(arr.as_fixed_size_list().to_owned().into())
}
Value::ListViewValue(_) => {
Self::ListView(arr.as_list_view::<i32>().to_owned().into())
}
Value::LargeListViewValue(_) => {
Self::LargeListView(arr.as_list_view::<i64>().to_owned().into())
}
Value::StructValue(_) => {
Self::Struct(arr.as_struct().to_owned().into())
}
Expand Down
56 changes: 56 additions & 0 deletions datafusion/proto-common/src/generated/pbjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@ impl serde::Serialize for ArrowType {
arrow_type::ArrowTypeEnum::FixedSizeList(v) => {
struct_ser.serialize_field("FIXEDSIZELIST", v)?;
}
arrow_type::ArrowTypeEnum::ListView(v) => {
struct_ser.serialize_field("LISTVIEW", v)?;
}
arrow_type::ArrowTypeEnum::LargeListView(v) => {
struct_ser.serialize_field("LARGELISTVIEW", v)?;
}
arrow_type::ArrowTypeEnum::Struct(v) => {
struct_ser.serialize_field("STRUCT", v)?;
}
Expand Down Expand Up @@ -329,6 +335,10 @@ impl<'de> serde::Deserialize<'de> for ArrowType {
"LARGELIST",
"FIXED_SIZE_LIST",
"FIXEDSIZELIST",
"LIST_VIEW",
"LISTVIEW",
"LARGE_LIST_VIEW",
"LARGELISTVIEW",
"STRUCT",
"UNION",
"DICTIONARY",
Expand Down Expand Up @@ -371,6 +381,8 @@ impl<'de> serde::Deserialize<'de> for ArrowType {
List,
LargeList,
FixedSizeList,
ListView,
LargeListView,
Struct,
Union,
Dictionary,
Expand Down Expand Up @@ -430,6 +442,8 @@ impl<'de> serde::Deserialize<'de> for ArrowType {
"LIST" => Ok(GeneratedField::List),
"LARGELIST" | "LARGE_LIST" => Ok(GeneratedField::LargeList),
"FIXEDSIZELIST" | "FIXED_SIZE_LIST" => Ok(GeneratedField::FixedSizeList),
"LISTVIEW" | "LIST_VIEW" => Ok(GeneratedField::ListView),
"LARGELISTVIEW" | "LARGE_LIST_VIEW" => Ok(GeneratedField::LargeListView),
"STRUCT" => Ok(GeneratedField::Struct),
"UNION" => Ok(GeneratedField::Union),
"DICTIONARY" => Ok(GeneratedField::Dictionary),
Expand Down Expand Up @@ -687,6 +701,20 @@ impl<'de> serde::Deserialize<'de> for ArrowType {
return Err(serde::de::Error::duplicate_field("FIXEDSIZELIST"));
}
arrow_type_enum__ = map_.next_value::<::std::option::Option<_>>()?.map(arrow_type::ArrowTypeEnum::FixedSizeList)
;
}
GeneratedField::ListView => {
if arrow_type_enum__.is_some() {
return Err(serde::de::Error::duplicate_field("LISTVIEW"));
}
arrow_type_enum__ = map_.next_value::<::std::option::Option<_>>()?.map(arrow_type::ArrowTypeEnum::ListView)
;
}
GeneratedField::LargeListView => {
if arrow_type_enum__.is_some() {
return Err(serde::de::Error::duplicate_field("LARGELISTVIEW"));
}
arrow_type_enum__ = map_.next_value::<::std::option::Option<_>>()?.map(arrow_type::ArrowTypeEnum::LargeListView)
;
}
GeneratedField::Struct => {
Expand Down Expand Up @@ -7477,6 +7505,12 @@ impl serde::Serialize for ScalarValue {
scalar_value::Value::FixedSizeListValue(v) => {
struct_ser.serialize_field("fixedSizeListValue", v)?;
}
scalar_value::Value::ListViewValue(v) => {
struct_ser.serialize_field("listViewValue", v)?;
}
scalar_value::Value::LargeListViewValue(v) => {
struct_ser.serialize_field("largeListViewValue", v)?;
}
scalar_value::Value::StructValue(v) => {
struct_ser.serialize_field("structValue", v)?;
}
Expand Down Expand Up @@ -7611,6 +7645,10 @@ impl<'de> serde::Deserialize<'de> for ScalarValue {
"listValue",
"fixed_size_list_value",
"fixedSizeListValue",
"list_view_value",
"listViewValue",
"large_list_view_value",
"largeListViewValue",
"struct_value",
"structValue",
"map_value",
Expand Down Expand Up @@ -7679,6 +7717,8 @@ impl<'de> serde::Deserialize<'de> for ScalarValue {
LargeListValue,
ListValue,
FixedSizeListValue,
ListViewValue,
LargeListViewValue,
StructValue,
MapValue,
Decimal32Value,
Expand Down Expand Up @@ -7742,6 +7782,8 @@ impl<'de> serde::Deserialize<'de> for ScalarValue {
"largeListValue" | "large_list_value" => Ok(GeneratedField::LargeListValue),
"listValue" | "list_value" => Ok(GeneratedField::ListValue),
"fixedSizeListValue" | "fixed_size_list_value" => Ok(GeneratedField::FixedSizeListValue),
"listViewValue" | "list_view_value" => Ok(GeneratedField::ListViewValue),
"largeListViewValue" | "large_list_view_value" => Ok(GeneratedField::LargeListViewValue),
"structValue" | "struct_value" => Ok(GeneratedField::StructValue),
"mapValue" | "map_value" => Ok(GeneratedField::MapValue),
"decimal32Value" | "decimal32_value" => Ok(GeneratedField::Decimal32Value),
Expand Down Expand Up @@ -7909,6 +7951,20 @@ impl<'de> serde::Deserialize<'de> for ScalarValue {
return Err(serde::de::Error::duplicate_field("fixedSizeListValue"));
}
value__ = map_.next_value::<::std::option::Option<_>>()?.map(scalar_value::Value::FixedSizeListValue)
;
}
GeneratedField::ListViewValue => {
if value__.is_some() {
return Err(serde::de::Error::duplicate_field("listViewValue"));
}
value__ = map_.next_value::<::std::option::Option<_>>()?.map(scalar_value::Value::ListViewValue)
;
}
GeneratedField::LargeListViewValue => {
if value__.is_some() {
return Err(serde::de::Error::duplicate_field("largeListViewValue"));
}
value__ = map_.next_value::<::std::option::Option<_>>()?.map(scalar_value::Value::LargeListViewValue)
;
}
GeneratedField::StructValue => {
Expand Down
14 changes: 11 additions & 3 deletions datafusion/proto-common/src/generated/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ pub struct Union {
#[prost(int32, repeated, tag = "3")]
pub type_ids: ::prost::alloc::vec::Vec<i32>,
}
/// Used for List/FixedSizeList/LargeList/Struct/Map
/// Used for List/FixedSizeList/LargeList/ListView/LargeListView/Struct/Map
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ScalarNestedValue {
#[prost(bytes = "vec", tag = "1")]
Expand Down Expand Up @@ -311,7 +311,7 @@ pub struct ScalarFixedSizeBinary {
pub struct ScalarValue {
#[prost(
oneof = "scalar_value::Value",
tags = "33, 1, 2, 3, 23, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 32, 41, 43, 44, 20, 39, 21, 24, 35, 36, 37, 38, 26, 27, 28, 29, 22, 30, 25, 31, 34, 42"
tags = "33, 1, 2, 3, 23, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 45, 46, 32, 41, 43, 44, 20, 39, 21, 24, 35, 36, 37, 38, 26, 27, 28, 29, 22, 30, 25, 31, 34, 42"
)]
pub value: ::core::option::Option<scalar_value::Value>,
}
Expand Down Expand Up @@ -362,6 +362,10 @@ pub mod scalar_value {
ListValue(super::ScalarNestedValue),
#[prost(message, tag = "18")]
FixedSizeListValue(super::ScalarNestedValue),
#[prost(message, tag = "45")]
ListViewValue(super::ScalarNestedValue),
#[prost(message, tag = "46")]
LargeListViewValue(super::ScalarNestedValue),
#[prost(message, tag = "32")]
StructValue(super::ScalarNestedValue),
#[prost(message, tag = "41")]
Expand Down Expand Up @@ -449,7 +453,7 @@ pub struct Decimal256 {
pub struct ArrowType {
#[prost(
oneof = "arrow_type::ArrowTypeEnum",
tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 35, 32, 15, 34, 16, 31, 17, 18, 19, 20, 21, 22, 23, 40, 41, 24, 36, 25, 26, 27, 28, 29, 30, 33"
tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 35, 32, 15, 34, 16, 31, 17, 18, 19, 20, 21, 22, 23, 40, 41, 24, 36, 25, 26, 27, 42, 43, 28, 29, 30, 33"
)]
pub arrow_type_enum: ::core::option::Option<arrow_type::ArrowTypeEnum>,
}
Expand Down Expand Up @@ -530,6 +534,10 @@ pub mod arrow_type {
LargeList(::prost::alloc::boxed::Box<super::List>),
#[prost(message, tag = "27")]
FixedSizeList(::prost::alloc::boxed::Box<super::FixedSizeList>),
#[prost(message, tag = "42")]
ListView(::prost::alloc::boxed::Box<super::List>),
#[prost(message, tag = "43")]
LargeListView(::prost::alloc::boxed::Box<super::List>),
#[prost(message, tag = "28")]
Struct(super::Struct),
#[prost(message, tag = "29")]
Expand Down
18 changes: 17 additions & 1 deletion datafusion/proto-common/src/to_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,12 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
ScalarValue::FixedSizeList(arr) => {
encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
}
ScalarValue::ListView(arr) => {
encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
}
ScalarValue::LargeListView(arr) => {
encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
}
ScalarValue::Struct(arr) => {
encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
}
Expand Down Expand Up @@ -1006,7 +1012,7 @@ fn create_proto_scalar<I, T: FnOnce(&I) -> protobuf::scalar_value::Value>(
Ok(protobuf::ScalarValue { value: Some(value) })
}

// ScalarValue::List / FixedSizeList / LargeList / Struct / Map are serialized using
// ScalarValue::List / FixedSizeList / LargeList / ListView / LargeListView / Struct / Map are serialized using
// Arrow IPC messages as a single column RecordBatch
fn encode_scalar_nested_value(
arr: ArrayRef,
Expand Down Expand Up @@ -1062,6 +1068,16 @@ fn encode_scalar_nested_value(
scalar_list_value,
)),
}),
ScalarValue::ListView(_) => Ok(protobuf::ScalarValue {
value: Some(protobuf::scalar_value::Value::ListViewValue(
scalar_list_value,
)),
}),
ScalarValue::LargeListView(_) => Ok(protobuf::ScalarValue {
value: Some(protobuf::scalar_value::Value::LargeListViewValue(
scalar_list_value,
)),
}),
ScalarValue::Struct(_) => Ok(protobuf::ScalarValue {
value: Some(protobuf::scalar_value::Value::StructValue(
scalar_list_value,
Expand Down
14 changes: 11 additions & 3 deletions datafusion/proto/src/generated/datafusion_proto_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ pub struct Union {
#[prost(int32, repeated, tag = "3")]
pub type_ids: ::prost::alloc::vec::Vec<i32>,
}
/// Used for List/FixedSizeList/LargeList/Struct/Map
/// Used for List/FixedSizeList/LargeList/ListView/LargeListView/Struct/Map
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ScalarNestedValue {
#[prost(bytes = "vec", tag = "1")]
Expand Down Expand Up @@ -311,7 +311,7 @@ pub struct ScalarFixedSizeBinary {
pub struct ScalarValue {
#[prost(
oneof = "scalar_value::Value",
tags = "33, 1, 2, 3, 23, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 32, 41, 43, 44, 20, 39, 21, 24, 35, 36, 37, 38, 26, 27, 28, 29, 22, 30, 25, 31, 34, 42"
tags = "33, 1, 2, 3, 23, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 45, 46, 32, 41, 43, 44, 20, 39, 21, 24, 35, 36, 37, 38, 26, 27, 28, 29, 22, 30, 25, 31, 34, 42"
)]
pub value: ::core::option::Option<scalar_value::Value>,
}
Expand Down Expand Up @@ -362,6 +362,10 @@ pub mod scalar_value {
ListValue(super::ScalarNestedValue),
#[prost(message, tag = "18")]
FixedSizeListValue(super::ScalarNestedValue),
#[prost(message, tag = "45")]
ListViewValue(super::ScalarNestedValue),
#[prost(message, tag = "46")]
LargeListViewValue(super::ScalarNestedValue),
#[prost(message, tag = "32")]
StructValue(super::ScalarNestedValue),
#[prost(message, tag = "41")]
Expand Down Expand Up @@ -449,7 +453,7 @@ pub struct Decimal256 {
pub struct ArrowType {
#[prost(
oneof = "arrow_type::ArrowTypeEnum",
tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 35, 32, 15, 34, 16, 31, 17, 18, 19, 20, 21, 22, 23, 40, 41, 24, 36, 25, 26, 27, 28, 29, 30, 33"
tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 35, 32, 15, 34, 16, 31, 17, 18, 19, 20, 21, 22, 23, 40, 41, 24, 36, 25, 26, 27, 42, 43, 28, 29, 30, 33"
)]
pub arrow_type_enum: ::core::option::Option<arrow_type::ArrowTypeEnum>,
}
Expand Down Expand Up @@ -530,6 +534,10 @@ pub mod arrow_type {
LargeList(::prost::alloc::boxed::Box<super::List>),
#[prost(message, tag = "27")]
FixedSizeList(::prost::alloc::boxed::Box<super::FixedSizeList>),
#[prost(message, tag = "42")]
ListView(::prost::alloc::boxed::Box<super::List>),
#[prost(message, tag = "43")]
LargeListView(::prost::alloc::boxed::Box<super::List>),
#[prost(message, tag = "28")]
Struct(super::Struct),
#[prost(message, tag = "29")]
Expand Down
Loading