Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 138 additions & 31 deletions datafusion/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@
use crate::streaming::StreamingTable;
use crate::{CatalogProviderList, SchemaProvider, TableProvider};
use arrow::array::builder::{BooleanBuilder, UInt8Builder};
use arrow::datatypes::{Fields, TimeUnit, UnionFields, UnionMode};
use arrow::{
array::{StringBuilder, UInt64Builder},
datatypes::{DataType, Field, FieldRef, Schema, SchemaRef},
record_batch::RecordBatch,
};
use async_trait::async_trait;
use datafusion_common::DataFusionError;
use datafusion_common::config::{ConfigEntry, ConfigOptions};
use datafusion_common::error::Result;
use datafusion_common::types::NativeType;
use datafusion_common::{DataFusionError, internal_datafusion_err};
use datafusion_execution::TaskContext;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_expr::function::WindowUDFFieldArgs;
Expand Down Expand Up @@ -411,26 +412,136 @@ impl InformationSchemaConfig {
}
}

/// Resolve a native type `NativeType` to `DataType` for use in the information schema
/// Since it is one-to-many, use the most representative type on tie
fn get_data_type_for_schema(native_type: &NativeType) -> Result<DataType> {
match native_type {
NativeType::Null => Ok(DataType::Null),
NativeType::Boolean => Ok(DataType::Boolean),
NativeType::Int8 => Ok(DataType::Int8),
NativeType::Int16 => Ok(DataType::Int16),
NativeType::Int32 => Ok(DataType::Int32),
NativeType::Int64 => Ok(DataType::Int64),
NativeType::UInt8 => Ok(DataType::UInt8),
NativeType::UInt16 => Ok(DataType::UInt16),
NativeType::UInt32 => Ok(DataType::UInt32),
NativeType::UInt64 => Ok(DataType::UInt64),
NativeType::Float16 => Ok(DataType::Float16),
NativeType::Float32 => Ok(DataType::Float32),
NativeType::Float64 => Ok(DataType::Float64),
NativeType::Date => Ok(DataType::Date32), // A tie
NativeType::Binary => Ok(DataType::Binary), // A tie
NativeType::String => Ok(DataType::Utf8), // A tie
NativeType::Decimal(precision, scale) => {
Ok(DataType::Decimal128(*precision, *scale)) // A tie
}
NativeType::Timestamp(time_unit, timezone) => {
Ok(DataType::Timestamp(*time_unit, timezone.to_owned()))
}
NativeType::Time(TimeUnit::Second) => Ok(DataType::Time32(TimeUnit::Second)),
NativeType::Time(TimeUnit::Millisecond) => {
Ok(DataType::Time32(TimeUnit::Millisecond))
}
NativeType::Time(TimeUnit::Microsecond) => {
Ok(DataType::Time64(TimeUnit::Microsecond))
}
NativeType::Time(TimeUnit::Nanosecond) => {
Ok(DataType::Time64(TimeUnit::Nanosecond))
}
NativeType::Duration(time_unit) => Ok(DataType::Duration(*time_unit)),
NativeType::Interval(interval_unit) => Ok(DataType::Interval(*interval_unit)),
NativeType::FixedSizeBinary(size) => Ok(DataType::FixedSizeBinary(*size)),
NativeType::FixedSizeList(logical_field, size) => {
let child_dt = get_data_type_for_schema(logical_field.logical_type.native())?;
Ok(DataType::FixedSizeList(
Arc::new(Field::new(
logical_field.name.clone(),
child_dt,
logical_field.nullable,
)),
*size,
))
}
NativeType::List(logical_field) => {
let child_dt = get_data_type_for_schema(logical_field.logical_type.native())?;
Ok(DataType::List(Arc::new(Field::new(
logical_field.name.clone(),
child_dt,
logical_field.nullable,
))))
}
NativeType::Struct(logical_fields) => {
let fields = logical_fields
.iter()
.map(|logical_field| {
let dt =
get_data_type_for_schema(logical_field.logical_type.native())?;
Ok(Arc::new(Field::new(
logical_field.name.clone(),
dt,
logical_field.nullable,
)))
})
.collect::<Result<Fields>>()?;
Ok(DataType::Struct(fields))
}
NativeType::Union(logical_fields) => {
let ids = logical_fields.iter().map(|(i, _)| *i).collect::<Vec<i8>>();
let fields: Vec<FieldRef> = logical_fields
.iter()
.map(|(_, logical_field)| {
let dt =
get_data_type_for_schema(logical_field.logical_type.native())?;
Ok(Arc::new(Field::new(
logical_field.name.clone(),
dt,
logical_field.nullable,
Comment thread
martin-g marked this conversation as resolved.
)))
})
.collect::<Result<Vec<FieldRef>>>()?;
Ok(DataType::Union(
UnionFields::try_new(ids, fields)
.map_err(|e| internal_datafusion_err!("UnionFields error: {e}"))?,
UnionMode::Dense,
))
}
NativeType::Map(logical_field) => {
let child_dt = get_data_type_for_schema(logical_field.logical_type.native())?;
Ok(DataType::Map(
Arc::new(Field::new(
logical_field.name.clone(),
child_dt,
logical_field.nullable,
)),
true,
))
}
}
}

fn resolve_informational_field(idx: usize, t: &NativeType) -> Result<FieldRef> {
let data_type = get_data_type_for_schema(t)?;
Ok(Arc::new(Field::new(format!("arg_{idx}"), data_type, true)))
}

/// get the arguments and return types of a UDF
/// returns a tuple of (arg_types, return_type)
fn get_udf_args_and_return_types(
udf: &Arc<ScalarUDF>,
) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
let signature = udf.signature();
let arg_types = signature.type_signature.get_example_types();
let arg_types = signature.type_signature.get_representative_types();
if arg_types.is_empty() {
Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
} else {
Ok(arg_types
arg_types
.into_iter()
.map(|arg_types| {
let arg_fields: Vec<FieldRef> = arg_types
let arg_fields = arg_types
.iter()
.enumerate()
.map(|(i, t)| {
Arc::new(Field::new(format!("arg_{i}"), t.clone(), true))
})
.collect();
.map(|(i, t)| resolve_informational_field(i, t))
.collect::<Result<Vec<FieldRef>>>()?;
let scalar_arguments = vec![None; arg_fields.len()];
let return_type = udf
.return_field_from_args(ReturnFieldArgs {
Expand All @@ -445,32 +556,30 @@ fn get_udf_args_and_return_types(
.ok();
let arg_types = arg_types
.into_iter()
.map(|t| remove_native_type_prefix(&NativeType::from(t)))
.map(|t| remove_native_type_prefix(&t))
.collect::<Vec<_>>();
(arg_types, return_type)
Ok((arg_types, return_type))
})
.collect::<BTreeSet<_>>())
.collect::<Result<BTreeSet<_>>>()
}
}

fn get_udaf_args_and_return_types(
udaf: &Arc<AggregateUDF>,
) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
let signature = udaf.signature();
let arg_types = signature.type_signature.get_example_types();
let arg_types = signature.type_signature.get_representative_types();
if arg_types.is_empty() {
Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
} else {
Ok(arg_types
arg_types
.into_iter()
.map(|arg_types| {
let arg_fields: Vec<FieldRef> = arg_types
let arg_fields = arg_types
.iter()
.enumerate()
.map(|(i, t)| {
Arc::new(Field::new(format!("arg_{i}"), t.clone(), true))
})
.collect();
.map(|(i, t)| resolve_informational_field(i, t))
.collect::<Result<Vec<FieldRef>>>()?;
let return_type = udaf
.return_field(&arg_fields)
.map(|f| {
Expand All @@ -481,32 +590,30 @@ fn get_udaf_args_and_return_types(
.ok();
let arg_types = arg_types
.into_iter()
.map(|t| remove_native_type_prefix(&NativeType::from(t)))
.map(|t| remove_native_type_prefix(&t))
.collect::<Vec<_>>();
(arg_types, return_type)
Ok((arg_types, return_type))
})
.collect::<BTreeSet<_>>())
.collect::<Result<BTreeSet<_>>>()
}
}

fn get_udwf_args_and_return_types(
udwf: &Arc<WindowUDF>,
) -> Result<BTreeSet<(Vec<String>, Option<String>)>> {
let signature = udwf.signature();
let arg_types = signature.type_signature.get_example_types();
let arg_types = signature.type_signature.get_representative_types();
if arg_types.is_empty() {
Ok(vec![(vec![], None)].into_iter().collect::<BTreeSet<_>>())
} else {
Ok(arg_types
arg_types
.into_iter()
.map(|arg_types| {
let arg_fields: Vec<FieldRef> = arg_types
let arg_fields = arg_types
.iter()
.enumerate()
.map(|(i, t)| {
Arc::new(Field::new(format!("arg_{i}"), t.clone(), true))
})
.collect();
.map(|(i, t)| resolve_informational_field(i, t))
.collect::<Result<Vec<FieldRef>>>()?;
let return_type = udwf
.field(WindowUDFFieldArgs::new(&arg_fields, udwf.name()))
.map(|f| {
Expand All @@ -517,11 +624,11 @@ fn get_udwf_args_and_return_types(
.ok();
let arg_types = arg_types
.into_iter()
.map(|t| remove_native_type_prefix(&NativeType::from(t)))
.map(|t| remove_native_type_prefix(&t))
.collect::<Vec<_>>();
(arg_types, return_type)
Ok((arg_types, return_type))
})
.collect::<BTreeSet<_>>())
.collect::<Result<BTreeSet<_>>>()
}
}

Expand Down
Loading
Loading