From 4fb0814a04fad68e3dce2e3d66f9be53249d34ab Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Fri, 11 Oct 2024 17:53:40 +0800 Subject: [PATCH 01/13] first draft Signed-off-by: jayzhan211 --- datafusion/common/src/dfschema.rs | 1 - datafusion/expr-common/Cargo.toml | 1 + .../expr-common/src/type_coercion/binary.rs | 134 ++++++++++++------ datafusion/expr/src/logical_plan/builder.rs | 82 +++++++++-- datafusion/expr/src/udf.rs | 2 +- datafusion/functions-nested/src/make_array.rs | 64 ++------- datafusion/functions/src/core/coalesce.rs | 7 +- .../simplify_expressions/simplify_exprs.rs | 2 +- datafusion/proto/src/logical_plan/mod.rs | 3 +- datafusion/sql/src/planner.rs | 15 ++ datafusion/sql/src/statement.rs | 16 ++- datafusion/sql/src/values.rs | 11 +- datafusion/sqllogictest/test_files/array.slt | 1 - .../test_files/create_external_table.slt | 1 - .../sqllogictest/test_files/group_by.slt | 12 +- datafusion/sqllogictest/test_files/joins.slt | 35 ++--- datafusion/sqllogictest/test_files/struct.slt | 73 ++++++++-- .../sqllogictest/test_files/subquery.slt | 40 +++--- datafusion/sqllogictest/test_files/unnest.slt | 2 +- datafusion/sqllogictest/test_files/window.slt | 2 +- 20 files changed, 334 insertions(+), 170 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 9a1fe9bba267..aa2d93989da1 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -315,7 +315,6 @@ impl DFSchema { None => self_unqualified_names.contains(field.name().as_str()), }; if !duplicated_field { - // self.inner.fields.push(field.clone()); schema_builder.push(Arc::clone(field)); qualifiers.push(qualifier.cloned()); } diff --git a/datafusion/expr-common/Cargo.toml b/datafusion/expr-common/Cargo.toml index 7e477efc4ebc..de11b19c3b06 100644 --- a/datafusion/expr-common/Cargo.toml +++ b/datafusion/expr-common/Cargo.toml @@ -40,4 +40,5 @@ path = "src/lib.rs" [dependencies] arrow = { workspace = true } datafusion-common = { workspace = true } +itertools = { workspace = true } paste = "^1.0" diff --git a/datafusion/expr-common/src/type_coercion/binary.rs b/datafusion/expr-common/src/type_coercion/binary.rs index e042dd5d3ac6..d05634d79d90 100644 --- a/datafusion/expr-common/src/type_coercion/binary.rs +++ b/datafusion/expr-common/src/type_coercion/binary.rs @@ -28,7 +28,10 @@ use arrow::datatypes::{ DataType, Field, FieldRef, Fields, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE, }; -use datafusion_common::{exec_datafusion_err, plan_datafusion_err, plan_err, Result}; +use datafusion_common::{ + exec_datafusion_err, exec_err, internal_err, plan_datafusion_err, plan_err, Result, +}; +use itertools::Itertools; /// The type signature of an instantiation of binary operator expression such as /// `lhs + rhs` @@ -478,46 +481,6 @@ fn type_union_resolution_coercion( type_union_resolution_coercion(lhs.data_type(), rhs.data_type()); new_item_type.map(|t| DataType::List(Arc::new(Field::new("item", t, true)))) } - (DataType::Struct(lhs), DataType::Struct(rhs)) => { - if lhs.len() != rhs.len() { - return None; - } - - // Search the field in the right hand side with the SAME field name - fn search_corresponding_coerced_type( - lhs_field: &FieldRef, - rhs: &Fields, - ) -> Option { - for rhs_field in rhs.iter() { - if lhs_field.name() == rhs_field.name() { - if let Some(t) = type_union_resolution_coercion( - lhs_field.data_type(), - rhs_field.data_type(), - ) { - return Some(t); - } else { - return None; - } - } - } - - None - } - - let types = lhs - .iter() - .map(|lhs_field| search_corresponding_coerced_type(lhs_field, rhs)) - .collect::>>()?; - - let fields = types - .into_iter() - .enumerate() - .map(|(i, datatype)| { - Arc::new(Field::new(format!("c{i}"), datatype, true)) - }) - .collect::>(); - Some(DataType::Struct(fields.into())) - } _ => { // numeric coercion is the same as comparison coercion, both find the narrowest type // that can accommodate both types @@ -529,6 +492,95 @@ fn type_union_resolution_coercion( } } +pub fn try_type_union_resolution(data_types: &[DataType]) -> Result> { + let mut errors = vec![]; + match try_type_union_resolution_with_struct(data_types) { + Ok(struct_types) => return Ok(struct_types), + Err(e) => { + errors.push(e); + } + } + + if let Some(new_type) = type_union_resolution(data_types) { + Ok(vec![new_type; data_types.len()]) + } else { + exec_err!("Fail to find the coerced type, errors: {:?}", errors) + } +} + +// Handle struct where we only change the data type but preserve the field name and nullability. +// Since field name is the key of the struct, so it shouldn't be updated to the common column name like "c0" or "c1" +pub fn try_type_union_resolution_with_struct( + data_types: &[DataType], +) -> Result> { + let mut keys_string: Option = None; + for data_type in data_types { + if let DataType::Struct(fields) = data_type { + let keys = fields.iter().map(|f| f.name().to_owned()).join(","); + if let Some(ref k) = keys_string { + if *k != keys { + return exec_err!("Expect same keys for struct type but got mismatched pair {} and {}", *k, keys); + } + } else { + keys_string = Some(keys); + } + } else { + return exec_err!("Expect to get struct but got {}", data_type); + } + } + + let mut struct_types: Vec = if let DataType::Struct(fields) = &data_types[0] + { + fields.iter().map(|f| f.data_type().to_owned()).collect() + } else { + return internal_err!("Struct type is checked is the previous function, so this should be unreachable"); + }; + + println!("struct_types 1: {:?}", struct_types); + + for data_type in data_types.iter().skip(1) { + if let DataType::Struct(fields) = data_type { + let incoming_struct_types: Vec = + fields.iter().map(|f| f.data_type().to_owned()).collect(); + // The order of field is verified above + for (lhs_type, rhs_type) in + struct_types.iter_mut().zip(incoming_struct_types.iter()) + { + if let Some(coerced_type) = + type_union_resolution_coercion(lhs_type, rhs_type) + { + *lhs_type = coerced_type; + } else { + return exec_err!( + "Fail to find the coerced type for {} and {}", + lhs_type, + rhs_type + ); + } + } + } else { + return exec_err!("Expect to get struct but got {}", data_type); + } + } + + println!("struct_types: {:?}", struct_types); + + let mut final_struct_types = vec![]; + for s in data_types { + let mut new_fields = vec![]; + if let DataType::Struct(fields) = s { + for (i, f) in fields.iter().enumerate() { + let field = Arc::unwrap_or_clone(Arc::clone(f)) + .with_data_type(struct_types[i].to_owned()); + new_fields.push(Arc::new(field)); + } + } + final_struct_types.push(DataType::Struct(new_fields.into())) + } + + Ok(final_struct_types) +} + /// Coerce `lhs_type` and `rhs_type` to a common type for the purposes of a /// comparison operation /// diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index da2a96327ce5..17dfef4601b8 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -44,13 +44,14 @@ use crate::{ TableProviderFilterPushDown, TableSource, WriteOp, }; +use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::file_options::file_type::FileType; use datafusion_common::{ - get_target_functional_dependencies, internal_err, not_impl_err, plan_datafusion_err, - plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, - TableReference, ToDFSchema, UnnestOptions, + exec_err, get_target_functional_dependencies, internal_err, not_impl_err, + plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, + Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions, }; use datafusion_expr_common::type_coercion::binary::type_union_resolution; @@ -178,7 +179,10 @@ impl LogicalPlanBuilder { /// so it's usually better to override the default names with a table alias list. /// /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided. - pub fn values(mut values: Vec>) -> Result { + pub fn values( + mut values: Vec>, + schema: Option<&DFSchemaRef>, + ) -> Result { if values.is_empty() { return plan_err!("Values list cannot be empty"); } @@ -197,16 +201,75 @@ impl LogicalPlanBuilder { } } - let empty_schema = DFSchema::empty(); + // Check the type of value against the schema + if let Some(schema) = schema { + let mut field_types: Vec = Vec::with_capacity(n_cols); + assert_eq!(schema.fields().len(), n_cols); + for j in 0..n_cols { + let field_type = schema.field(j).data_type(); + for row in values.iter() { + let value = &row[j]; + let data_type = value.get_type(schema)?; + + if !data_type.equals_datatype(field_type) { + if can_cast_types(&data_type, field_type) { + } else { + return exec_err!( + "type mistmatch and can't cast to got {} and {}", + data_type, + field_type + ); + } + } + } + field_types.push(field_type.to_owned()); + } + // wrap cast if data type is not same as common type. + for row in &mut values { + for (j, field_type) in field_types.iter().enumerate() { + if let Expr::Literal(ScalarValue::Null) = row[j] { + row[j] = Expr::Literal(ScalarValue::try_from(field_type)?); + } else { + row[j] = + std::mem::take(&mut row[j]).cast_to(field_type, schema)?; + } + } + } + let fields = field_types + .iter() + .enumerate() + .map(|(j, data_type)| { + // naming is following convention https://www.postgresql.org/docs/current/queries-values.html + let name = &format!("column{}", j + 1); + Field::new(name, data_type.clone(), true) + }) + .collect::>(); + let dfschema = + DFSchema::from_unqualified_fields(fields.into(), HashMap::new())?; + let schema = DFSchemaRef::new(dfschema); + + Ok(Self::new(LogicalPlan::Values(Values { schema, values }))) + } else { + // Infer from value itself + Self::infer_value(values) + } + } + + fn infer_value(mut values: Vec>) -> Result { + let n_cols = values[0].len(); + + let schema = DFSchema::empty(); + let mut field_types: Vec = Vec::with_capacity(n_cols); for j in 0..n_cols { let mut common_type: Option = None; for (i, row) in values.iter().enumerate() { let value = &row[j]; - let data_type = value.get_type(&empty_schema)?; + let data_type = value.get_type(&schema)?; if data_type == DataType::Null { continue; } + if let Some(prev_type) = common_type { // get common type of each column values. let data_types = vec![prev_type.clone(), data_type.clone()]; @@ -228,8 +291,7 @@ impl LogicalPlanBuilder { if let Expr::Literal(ScalarValue::Null) = row[j] { row[j] = Expr::Literal(ScalarValue::try_from(field_type)?); } else { - row[j] = - std::mem::take(&mut row[j]).cast_to(field_type, &empty_schema)?; + row[j] = std::mem::take(&mut row[j]).cast_to(field_type, &schema)?; } } } @@ -2350,10 +2412,10 @@ mod tests { fn test_union_after_join() -> Result<()> { let values = vec![vec![lit(1)]]; - let left = LogicalPlanBuilder::values(values.clone())? + let left = LogicalPlanBuilder::values(values.clone(), None)? .alias("left")? .build()?; - let right = LogicalPlanBuilder::values(values)? + let right = LogicalPlanBuilder::values(values, None)? .alias("right")? .build()?; diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 3759fb18f56d..beaf1d8e8a5e 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -23,7 +23,7 @@ use crate::sort_properties::{ExprProperties, SortProperties}; use crate::{ ColumnarValue, Documentation, Expr, ScalarFunctionImplementation, Signature, }; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, Field}; use datafusion_common::{not_impl_err, ExprSchema, Result}; use datafusion_expr_common::interval_arithmetic::Interval; use std::any::Any; diff --git a/datafusion/functions-nested/src/make_array.rs b/datafusion/functions-nested/src/make_array.rs index cafa073f9191..1de10f16bd63 100644 --- a/datafusion/functions-nested/src/make_array.rs +++ b/datafusion/functions-nested/src/make_array.rs @@ -27,12 +27,12 @@ use arrow_array::{ use arrow_buffer::OffsetBuffer; use arrow_schema::DataType::{LargeList, List, Null}; use arrow_schema::{DataType, Field}; -use datafusion_common::{exec_err, internal_err}; use datafusion_common::{plan_err, utils::array_into_list_array_nullable, Result}; -use datafusion_expr::binary::type_union_resolution; +use datafusion_expr::binary::{ + try_type_union_resolution_with_struct, type_union_resolution, +}; use datafusion_expr::TypeSignature; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; -use itertools::Itertools; use crate::utils::make_scalar_function; @@ -107,33 +107,16 @@ impl ScalarUDFImpl for MakeArray { } fn coerce_types(&self, arg_types: &[DataType]) -> Result> { - if let Some(new_type) = type_union_resolution(arg_types) { - // TODO: Move the logic to type_union_resolution if this applies to other functions as well - // Handle struct where we only change the data type but preserve the field name and nullability. - // Since field name is the key of the struct, so it shouldn't be updated to the common column name like "c0" or "c1" - let is_struct_and_has_same_key = are_all_struct_and_have_same_key(arg_types)?; - if is_struct_and_has_same_key { - let data_types: Vec<_> = if let DataType::Struct(fields) = &arg_types[0] { - fields.iter().map(|f| f.data_type().to_owned()).collect() - } else { - return internal_err!("Struct type is checked is the previous function, so this should be unreachable"); - }; - - let mut final_struct_types = vec![]; - for s in arg_types { - let mut new_fields = vec![]; - if let DataType::Struct(fields) = s { - for (i, f) in fields.iter().enumerate() { - let field = Arc::unwrap_or_clone(Arc::clone(f)) - .with_data_type(data_types[i].to_owned()); - new_fields.push(Arc::new(field)); - } - } - final_struct_types.push(DataType::Struct(new_fields.into())) - } - return Ok(final_struct_types); + let mut errors = vec![]; + match try_type_union_resolution_with_struct(arg_types) { + Ok(r) => return Ok(r), + Err(e) => { + errors.push(e); } + } + if let Some(new_type) = type_union_resolution(arg_types) { + // TODO: Move FixedSizeList to List in type_union_resolution if let DataType::FixedSizeList(field, _) = new_type { Ok(vec![DataType::List(field); arg_types.len()]) } else if new_type.is_null() { @@ -143,34 +126,15 @@ impl ScalarUDFImpl for MakeArray { } } else { plan_err!( - "Fail to find the valid type between {:?} for {}", + "Fail to find the valid type between {:?} for {}, errors are {:?}", arg_types, - self.name() + self.name(), + errors ) } } } -fn are_all_struct_and_have_same_key(data_types: &[DataType]) -> Result { - let mut keys_string: Option = None; - for data_type in data_types { - if let DataType::Struct(fields) = data_type { - let keys = fields.iter().map(|f| f.name().to_owned()).join(","); - if let Some(ref k) = keys_string { - if *k != keys { - return exec_err!("Expect same keys for struct type but got mismatched pair {} and {}", *k, keys); - } - } else { - keys_string = Some(keys); - } - } else { - return Ok(false); - } - } - - Ok(true) -} - // Empty array is a special case that is useful for many other array functions pub(super) fn empty_array_type() -> DataType { DataType::List(Arc::new(Field::new("item", DataType::Int64, true))) diff --git a/datafusion/functions/src/core/coalesce.rs b/datafusion/functions/src/core/coalesce.rs index d8ff44798f8a..d1225616b5bb 100644 --- a/datafusion/functions/src/core/coalesce.rs +++ b/datafusion/functions/src/core/coalesce.rs @@ -20,8 +20,8 @@ use arrow::compute::kernels::zip::zip; use arrow::compute::{and, is_not_null, is_null}; use arrow::datatypes::DataType; use datafusion_common::{exec_err, ExprSchema, Result}; +use datafusion_expr::binary::try_type_union_resolution; use datafusion_expr::scalar_doc_sections::DOC_SECTION_CONDITIONAL; -use datafusion_expr::type_coercion::binary::type_union_resolution; use datafusion_expr::{ColumnarValue, Documentation, Expr, ExprSchemable}; use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; use itertools::Itertools; @@ -154,9 +154,8 @@ impl ScalarUDFImpl for CoalesceFunc { if arg_types.is_empty() { return exec_err!("coalesce must have at least one argument"); } - let new_type = type_union_resolution(arg_types) - .unwrap_or(arg_types.first().unwrap().clone()); - Ok(vec![new_type; arg_types.len()]) + + try_type_union_resolution(arg_types) } fn documentation(&self) -> Option<&Documentation> { diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index c0142ae0fc5a..303b736b705a 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -417,7 +417,7 @@ mod tests { Box::new(lit(1)), )); let values = vec![vec![expr1, expr2]]; - let plan = LogicalPlanBuilder::values(values)?.build()?; + let plan = LogicalPlanBuilder::values(values, None)?.build()?; let expected = "\ Values: (Int32(3) AS Int32(1) + Int32(2), Int32(1) AS Int32(2) - Int32(1))"; diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 7156cee66aff..74943a607efc 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -283,7 +283,8 @@ impl AsLogicalPlan for LogicalPlanNode { .collect::, _>>() .map_err(|e| e.into()) }?; - LogicalPlanBuilder::values(values)?.build() + + LogicalPlanBuilder::values(values, None)?.build() } LogicalPlanType::Projection(projection) => { let input: LogicalPlan = diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 66e360a9ade9..b1d6c3cb0ab2 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -138,6 +138,8 @@ pub struct PlannerContext { /// The joined schemas of all FROM clauses planned so far. When planning LATERAL /// FROM clauses, this should become a suffix of the `outer_query_schema`. outer_from_schema: Option, + /// The query schema defined by the table + table_schema: Option, } impl Default for PlannerContext { @@ -154,6 +156,7 @@ impl PlannerContext { ctes: HashMap::new(), outer_query_schema: None, outer_from_schema: None, + table_schema: None, } } @@ -181,6 +184,18 @@ impl PlannerContext { schema } + pub fn set_table_schema( + &mut self, + mut schema: Option, + ) -> Option { + std::mem::swap(&mut self.table_schema, &mut schema); + schema + } + + pub fn table_schema(&self) -> Option { + self.table_schema.clone() + } + // Return a clone of the outer FROM schema pub fn outer_from_schema(&self) -> Option> { self.outer_from_schema.clone() diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 3111fab9a2ff..5c3d935be3ea 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -393,13 +393,23 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // Build column default values let column_defaults = self.build_column_defaults(&columns, planner_context)?; + + // println!("query: {:?}", query); + // println!("column_defaults: {:?}", column_defaults); + // println!("columns: {:?}", columns); + let has_columns = !columns.is_empty(); + let schema = self.build_schema(columns)?.to_dfschema_ref()?; + if has_columns { + planner_context.set_table_schema(Some(Arc::clone(&schema))); + } + match query { Some(query) => { let plan = self.query_to_plan(*query, planner_context)?; let input_schema = plan.schema(); - let plan = if !columns.is_empty() { - let schema = self.build_schema(columns)?.to_dfschema_ref()?; + let plan = if has_columns { + // let schema = self.build_schema(columns)?.to_dfschema_ref()?; if schema.fields().len() != input_schema.fields().len() { return plan_err!( "Mismatch: {} columns specified, but result has {} columns", @@ -445,7 +455,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } None => { - let schema = self.build_schema(columns)?.to_dfschema_ref()?; + // let schema = self.build_schema(columns)?.to_dfschema_ref()?; let plan = EmptyRelation { produce_one_row: false, schema, diff --git a/datafusion/sql/src/values.rs b/datafusion/sql/src/values.rs index cd33ddb3cfe7..521602523372 100644 --- a/datafusion/sql/src/values.rs +++ b/datafusion/sql/src/values.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_common::{DFSchema, Result}; use datafusion_expr::{LogicalPlan, LogicalPlanBuilder}; @@ -32,7 +34,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } = values; // Values should not be based on any other schema - let schema = DFSchema::empty(); + let empty_schema = DFSchema::empty(); + let schema = planner_context.table_schema(); + let value_schema = schema.as_ref().map(|s| Arc::clone(s)); + let schema = schema.unwrap_or(Arc::new(empty_schema)); + let values = rows .into_iter() .map(|row| { @@ -41,6 +47,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .collect::>>() }) .collect::>>()?; - LogicalPlanBuilder::values(values)?.build() + + LogicalPlanBuilder::values(values, value_schema.as_ref())?.build() } } diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 69f62057c761..bfdbfb1bcc5e 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -7288,4 +7288,3 @@ drop table values_all_empty; statement ok drop table fixed_size_col_table; - diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index 9ac2ecdce7cc..df35fd656aff 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -286,4 +286,3 @@ CREATE EXTERNAL TABLE staging.foo STORED AS parquet LOCATION '../../parquet-test # Create external table with qualified name, but no schema should error statement error DataFusion error: Error during planning: failed to resolve schema: release CREATE EXTERNAL TABLE release.bar STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet'; - diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index a80a0891e977..5dd4e711a5c6 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -3360,7 +3360,8 @@ physical_plan 05)--------CoalesceBatchesExec: target_batch_size=4 06)----------RepartitionExec: partitioning=Hash([sn@0, amount@1], 8), input_partitions=8 07)------------AggregateExec: mode=Partial, gby=[sn@0 as sn, amount@1 as amount], aggr=[] -08)--------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] +08)--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +09)----------------MemoryExec: partitions=1, partition_sizes=[1] query IRI SELECT s.sn, s.amount, 2*s.sn @@ -3430,9 +3431,9 @@ physical_plan 07)------------AggregateExec: mode=Partial, gby=[sn@1 as sn, amount@2 as amount], aggr=[sum(l.amount)] 08)--------------ProjectionExec: expr=[amount@1 as amount, sn@2 as sn, amount@3 as amount] 09)----------------NestedLoopJoinExec: join_type=Inner, filter=sn@0 >= sn@1 -10)------------------CoalescePartitionsExec -11)--------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] -12)------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] +10)------------------MemoryExec: partitions=1, partition_sizes=[1] +11)------------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +12)--------------------MemoryExec: partitions=1, partition_sizes=[1] query IRR SELECT r.sn, SUM(l.amount), r.amount @@ -3579,8 +3580,7 @@ physical_plan 08)--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 09)----------------ProjectionExec: expr=[zip_code@0 as zip_code, country@1 as country, sn@2 as sn, ts@3 as ts, currency@4 as currency, amount@5 as amount, sum(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@6 as sum_amount] 10)------------------BoundedWindowAggExec: wdw=[sum(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "sum(l.amount) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }], mode=[Sorted] -11)--------------------CoalescePartitionsExec -12)----------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0] +11)--------------------MemoryExec: partitions=1, partition_sizes=[1] query ITIPTRR diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index a7a252cc20d7..8dc05e7265e7 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3901,8 +3901,8 @@ SELECT * FROM ( ) AS rhs ON lhs.b=rhs.b ---- 11 1 21 1 -14 2 22 2 12 3 23 3 +14 2 22 2 15 4 24 4 query TT @@ -3922,11 +3922,12 @@ logical_plan 05)----Sort: right_table_no_nulls.b ASC NULLS LAST, fetch=10 06)------TableScan: right_table_no_nulls projection=[a, b] physical_plan -01)CoalesceBatchesExec: target_batch_size=3 -02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(b@1, b@1)] -03)----MemoryExec: partitions=1, partition_sizes=[1] -04)----SortExec: TopK(fetch=10), expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false] -05)------MemoryExec: partitions=1, partition_sizes=[1] +01)ProjectionExec: expr=[a@2 as a, b@3 as b, a@0 as a, b@1 as b] +02)--CoalesceBatchesExec: target_batch_size=3 +03)----HashJoinExec: mode=CollectLeft, join_type=Left, on=[(b@1, b@1)] +04)------SortExec: TopK(fetch=10), expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false] +05)--------MemoryExec: partitions=1, partition_sizes=[1] +06)------MemoryExec: partitions=1, partition_sizes=[1] @@ -3979,10 +3980,11 @@ logical_plan 04)--SubqueryAlias: rhs 05)----TableScan: right_table_no_nulls projection=[a, b] physical_plan -01)CoalesceBatchesExec: target_batch_size=3 -02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(b@1, b@1)] -03)----MemoryExec: partitions=1, partition_sizes=[1] -04)----MemoryExec: partitions=1, partition_sizes=[1] +01)ProjectionExec: expr=[a@2 as a, b@3 as b, a@0 as a, b@1 as b] +02)--CoalesceBatchesExec: target_batch_size=3 +03)----HashJoinExec: mode=CollectLeft, join_type=Left, on=[(b@1, b@1)] +04)------MemoryExec: partitions=1, partition_sizes=[1] +05)------MemoryExec: partitions=1, partition_sizes=[1] # Null build indices: @@ -4038,11 +4040,12 @@ logical_plan 05)----Sort: right_table_no_nulls.b ASC NULLS LAST, fetch=10 06)------TableScan: right_table_no_nulls projection=[a, b] physical_plan -01)CoalesceBatchesExec: target_batch_size=3 -02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(b@1, b@1)] -03)----MemoryExec: partitions=1, partition_sizes=[1] -04)----SortExec: TopK(fetch=10), expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false] -05)------MemoryExec: partitions=1, partition_sizes=[1] +01)ProjectionExec: expr=[a@2 as a, b@3 as b, a@0 as a, b@1 as b] +02)--CoalesceBatchesExec: target_batch_size=3 +03)----HashJoinExec: mode=CollectLeft, join_type=Left, on=[(b@1, b@1)] +04)------SortExec: TopK(fetch=10), expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false] +05)--------MemoryExec: partitions=1, partition_sizes=[1] +06)------MemoryExec: partitions=1, partition_sizes=[1] # Test CROSS JOIN LATERAL syntax (planning) @@ -4187,4 +4190,4 @@ physical_plan 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(b@1, y@1)], filter=a@0 < x@1 03)----MemoryExec: partitions=1, partition_sizes=[0] 04)----SortExec: expr=[x@0 ASC NULLS LAST], preserve_partitioning=[false] -05)------MemoryExec: partitions=1, partition_sizes=[0] \ No newline at end of file +05)------MemoryExec: partitions=1, partition_sizes=[0] diff --git a/datafusion/sqllogictest/test_files/struct.slt b/datafusion/sqllogictest/test_files/struct.slt index b76c78396aed..a0080bbf13a9 100644 --- a/datafusion/sqllogictest/test_files/struct.slt +++ b/datafusion/sqllogictest/test_files/struct.slt @@ -392,12 +392,12 @@ create table t(a struct, b struct) as valu query T select arrow_typeof([a, b]) from t; ---- -List(Field { name: "item", data_type: Struct([Field { name: "r", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) +List(Field { name: "item", data_type: Struct([Field { name: "r", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) query ? select [a, b] from t; ---- -[{r: red, c: 1}, {r: blue, c: 2}] +[{r: red, c: 1.0}, {r: blue, c: 2.3}] statement ok drop table t; @@ -474,13 +474,12 @@ select coalesce(s1) from t; {a: 2, b: blue} {a: 3, b: green} -# TODO: a's type should be float query T -select arrow_typeof(coalesce(s1)) from t; +select arrow_typeof(coalesce(s1, s2)) from t; ---- -Struct([Field { name: "a", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) -Struct([Field { name: "a", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) -Struct([Field { name: "a", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) +Struct([Field { name: "a", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) +Struct([Field { name: "a", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) +Struct([Field { name: "a", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) statement ok drop table t; @@ -495,26 +494,32 @@ CREATE TABLE t ( (row(3, 'green'), row(33.2, 'string3')) ; -# TODO: second column should not be null query ? -select coalesce(s1) from t; +select coalesce(s1, s2) from t; ---- -{a: 1, b: red} -NULL -{a: 3, b: green} +{a: 1.0, b: red} +{a: 2.2, b: string2} +{a: 3.0, b: green} + +query T +select arrow_typeof(coalesce(s1, s2)) from t; +---- +Struct([Field { name: "a", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) +Struct([Field { name: "a", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) +Struct([Field { name: "a", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) statement ok drop table t; # row() with incorrect order -statement error DataFusion error: Arrow error: Cast error: Cannot cast string 'blue' to value of Float64 type +statement error DataFusion error: Arrow error: Cast error: Cannot cast string 'blue' to value of Float32 type create table t(a struct(r varchar, c int), b struct(r varchar, c float)) as values (row('red', 1), row(2.3, 'blue')), (row('purple', 1), row('green', 2.3)); # out of order struct literal # TODO: This query should not fail -statement error DataFusion error: Arrow error: Cast error: Cannot cast string 'a' to value of Int64 type +statement error DataFusion error: Arrow error: Cast error: Cannot cast string 'b' to value of Int32 type create table t(a struct(r varchar, c int)) as values ({r: 'a', c: 1}), ({c: 2, r: 'b'}); ################################## @@ -529,3 +534,43 @@ select [{r: 'a', c: 1}, {r: 'b', c: 2}]; # Can't create a list of struct with different field types query error select [{r: 'a', c: 1}, {c: 2, r: 'b'}]; + +statement ok +create table t(a struct(r varchar, c int), b struct(r varchar, c float)) as values (row('a', 1), row('b', 2.3)); + +query T +select arrow_typeof([a, b]) from t; +---- +List(Field { name: "item", data_type: Struct([Field { name: "r", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) + +statement ok +drop table t; + +# create table with different struct type is fine +statement ok +create table t(a struct(r varchar, c int), b struct(c float, r varchar)) as values (row('a', 1), row(2.3, 'b')); + +# create array with different struct type is not valid +query error +select arrow_typeof([a, b]) from t; + +statement ok +drop table t; + +statement ok +create table t(a struct(r varchar, c int, g float), b struct(r varchar, c float, g int)) as values (row('a', 1, 2.3), row('b', 2.3, 2)); + +# type of each column should not coerced but perserve as it is +query T +select arrow_typeof(a) from t; +---- +Struct([Field { name: "r", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "g", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) + +# type of each column should not coerced but perserve as it is +query T +select arrow_typeof(b) from t; +---- +Struct([Field { name: "r", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "g", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) + +statement ok +drop table t; diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 30b3631681e7..70d1e048c079 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -208,10 +208,12 @@ physical_plan 06)----------CoalesceBatchesExec: target_batch_size=2 07)------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4 08)--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[sum(t2.t2_int)] -09)----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] -10)------CoalesceBatchesExec: target_batch_size=2 -11)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 -12)----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +10)------------------MemoryExec: partitions=1, partition_sizes=[1] +11)------CoalesceBatchesExec: target_batch_size=2 +12)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 +13)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +14)------------MemoryExec: partitions=1, partition_sizes=[1] query II rowsort SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1 @@ -242,10 +244,12 @@ physical_plan 06)----------CoalesceBatchesExec: target_batch_size=2 07)------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4 08)--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[sum(t2.t2_int * Float64(1))] -09)----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] -10)------CoalesceBatchesExec: target_batch_size=2 -11)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 -12)----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +10)------------------MemoryExec: partitions=1, partition_sizes=[1] +11)------CoalesceBatchesExec: target_batch_size=2 +12)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 +13)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +14)------------MemoryExec: partitions=1, partition_sizes=[1] query IR rowsort SELECT t1_id, (SELECT sum(t2_int * 1.0) + 1 FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1 @@ -276,10 +280,12 @@ physical_plan 06)----------CoalesceBatchesExec: target_batch_size=2 07)------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4 08)--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[sum(t2.t2_int)] -09)----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] -10)------CoalesceBatchesExec: target_batch_size=2 -11)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 -12)----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +10)------------------MemoryExec: partitions=1, partition_sizes=[1] +11)------CoalesceBatchesExec: target_batch_size=2 +12)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 +13)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +14)------------MemoryExec: partitions=1, partition_sizes=[1] query II rowsort SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by t2_id, 'a') as t2_sum from t1 @@ -313,10 +319,12 @@ physical_plan 08)--------------CoalesceBatchesExec: target_batch_size=2 09)----------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4 10)------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[sum(t2.t2_int)] -11)--------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] -12)------CoalesceBatchesExec: target_batch_size=2 -13)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 -14)----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +12)----------------------MemoryExec: partitions=1, partition_sizes=[1] +13)------CoalesceBatchesExec: target_batch_size=2 +14)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 +15)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +16)------------MemoryExec: partitions=1, partition_sizes=[1] query II rowsort SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id having sum(t2_int) < 3) as t2_sum from t1 diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index b923e94fc819..947eb8630b52 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -643,7 +643,7 @@ NULL [4] [{c0: [2], c1: [[3], [4]]}] 4 [3] [{c0: [2], c1: [[3], [4]]}] NULL [4] [{c0: [2], c1: [[3], [4]]}] -## demonstrate where recursive unnest is impossible +## demonstrate where recursive unnest is impossible ## and need multiple unnesting logical plans ## e.g unnest -> field_access -> unnest query TT diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 40309a1f2de9..1e478945035e 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -4932,4 +4932,4 @@ SELECT v1, NTH_VALUE(v2, 0) OVER (PARTITION BY v1 ORDER BY v2) FROM t; statement ok DROP TABLE t; -## end test handle NULL and 0 of NTH_VALUE \ No newline at end of file +## end test handle NULL and 0 of NTH_VALUE From 062f1187433ebfedbafe436118dbd74affd8d260 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Fri, 11 Oct 2024 18:08:41 +0800 Subject: [PATCH 02/13] cleanup Signed-off-by: jayzhan211 --- .../expr-common/src/type_coercion/binary.rs | 8 +- datafusion/expr/src/logical_plan/builder.rs | 93 ++++++++----------- datafusion/expr/src/udf.rs | 2 +- datafusion/sql/src/values.rs | 15 +-- 4 files changed, 51 insertions(+), 67 deletions(-) diff --git a/datafusion/expr-common/src/type_coercion/binary.rs b/datafusion/expr-common/src/type_coercion/binary.rs index d05634d79d90..1c74f03b4987 100644 --- a/datafusion/expr-common/src/type_coercion/binary.rs +++ b/datafusion/expr-common/src/type_coercion/binary.rs @@ -25,8 +25,8 @@ use crate::operator::Operator; use arrow::array::{new_empty_array, Array}; use arrow::compute::can_cast_types; use arrow::datatypes::{ - DataType, Field, FieldRef, Fields, TimeUnit, DECIMAL128_MAX_PRECISION, - DECIMAL128_MAX_SCALE, DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE, + DataType, Field, FieldRef, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, + DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE, }; use datafusion_common::{ exec_datafusion_err, exec_err, internal_err, plan_datafusion_err, plan_err, Result, @@ -536,8 +536,6 @@ pub fn try_type_union_resolution_with_struct( return internal_err!("Struct type is checked is the previous function, so this should be unreachable"); }; - println!("struct_types 1: {:?}", struct_types); - for data_type in data_types.iter().skip(1) { if let DataType::Struct(fields) = data_type { let incoming_struct_types: Vec = @@ -563,8 +561,6 @@ pub fn try_type_union_resolution_with_struct( } } - println!("struct_types: {:?}", struct_types); - let mut final_struct_types = vec![]; for s in data_types { let mut new_fields = vec![]; diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 17dfef4601b8..d2db2470206d 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -170,7 +170,7 @@ impl LogicalPlanBuilder { }))) } - /// Create a values list based relation, and the schema is inferred from data, consuming + /// Create a values list based relation, and the schema is inferred from data itself or table schema if provided, consuming /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html) /// documentation for more details. /// @@ -179,10 +179,7 @@ impl LogicalPlanBuilder { /// so it's usually better to override the default names with a table alias list. /// /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided. - pub fn values( - mut values: Vec>, - schema: Option<&DFSchemaRef>, - ) -> Result { + pub fn values(values: Vec>, schema: Option<&DFSchemaRef>) -> Result { if values.is_empty() { return plan_err!("Values list cannot be empty"); } @@ -203,61 +200,41 @@ impl LogicalPlanBuilder { // Check the type of value against the schema if let Some(schema) = schema { - let mut field_types: Vec = Vec::with_capacity(n_cols); - assert_eq!(schema.fields().len(), n_cols); - for j in 0..n_cols { - let field_type = schema.field(j).data_type(); - for row in values.iter() { - let value = &row[j]; - let data_type = value.get_type(schema)?; - - if !data_type.equals_datatype(field_type) { - if can_cast_types(&data_type, field_type) { - } else { - return exec_err!( - "type mistmatch and can't cast to got {} and {}", - data_type, - field_type - ); - } - } - } - field_types.push(field_type.to_owned()); - } - // wrap cast if data type is not same as common type. - for row in &mut values { - for (j, field_type) in field_types.iter().enumerate() { - if let Expr::Literal(ScalarValue::Null) = row[j] { - row[j] = Expr::Literal(ScalarValue::try_from(field_type)?); + Self::infer_from_schema(values, schema) + } else { + // Infer from data itself + Self::infer_data(values) + } + } + + fn infer_from_schema(values: Vec>, schema: &DFSchema) -> Result { + let n_cols = values[0].len(); + let mut field_types: Vec = Vec::with_capacity(n_cols); + for j in 0..n_cols { + let field_type = schema.field(j).data_type(); + for row in values.iter() { + let value = &row[j]; + let data_type = value.get_type(schema)?; + + if !data_type.equals_datatype(field_type) { + if can_cast_types(&data_type, field_type) { } else { - row[j] = - std::mem::take(&mut row[j]).cast_to(field_type, schema)?; + return exec_err!( + "type mistmatch and can't cast to got {} and {}", + data_type, + field_type + ); } } } - let fields = field_types - .iter() - .enumerate() - .map(|(j, data_type)| { - // naming is following convention https://www.postgresql.org/docs/current/queries-values.html - let name = &format!("column{}", j + 1); - Field::new(name, data_type.clone(), true) - }) - .collect::>(); - let dfschema = - DFSchema::from_unqualified_fields(fields.into(), HashMap::new())?; - let schema = DFSchemaRef::new(dfschema); - - Ok(Self::new(LogicalPlan::Values(Values { schema, values }))) - } else { - // Infer from value itself - Self::infer_value(values) + field_types.push(field_type.to_owned()); } + + Self::infer_inner(values, &field_types, schema) } - fn infer_value(mut values: Vec>) -> Result { + fn infer_data(values: Vec>) -> Result { let n_cols = values[0].len(); - let schema = DFSchema::empty(); let mut field_types: Vec = Vec::with_capacity(n_cols); @@ -285,13 +262,22 @@ impl LogicalPlanBuilder { // since the code loop skips NULL field_types.push(common_type.unwrap_or(DataType::Null)); } + + Self::infer_inner(values, &field_types, &schema) + } + + fn infer_inner( + mut values: Vec>, + field_types: &[DataType], + schema: &DFSchema, + ) -> Result { // wrap cast if data type is not same as common type. for row in &mut values { for (j, field_type) in field_types.iter().enumerate() { if let Expr::Literal(ScalarValue::Null) = row[j] { row[j] = Expr::Literal(ScalarValue::try_from(field_type)?); } else { - row[j] = std::mem::take(&mut row[j]).cast_to(field_type, &schema)?; + row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?; } } } @@ -306,6 +292,7 @@ impl LogicalPlanBuilder { .collect::>(); let dfschema = DFSchema::from_unqualified_fields(fields.into(), HashMap::new())?; let schema = DFSchemaRef::new(dfschema); + Ok(Self::new(LogicalPlan::Values(Values { schema, values }))) } diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index beaf1d8e8a5e..3759fb18f56d 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -23,7 +23,7 @@ use crate::sort_properties::{ExprProperties, SortProperties}; use crate::{ ColumnarValue, Documentation, Expr, ScalarFunctionImplementation, Signature, }; -use arrow::datatypes::{DataType, Field}; +use arrow::datatypes::DataType; use datafusion_common::{not_impl_err, ExprSchema, Result}; use datafusion_expr_common::interval_arithmetic::Interval; use std::any::Any; diff --git a/datafusion/sql/src/values.rs b/datafusion/sql/src/values.rs index 521602523372..f0fc2aed53dd 100644 --- a/datafusion/sql/src/values.rs +++ b/datafusion/sql/src/values.rs @@ -33,12 +33,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { rows, } = values; - // Values should not be based on any other schema - let empty_schema = DFSchema::empty(); - let schema = planner_context.table_schema(); - let value_schema = schema.as_ref().map(|s| Arc::clone(s)); - let schema = schema.unwrap_or(Arc::new(empty_schema)); - + let schema = planner_context + .table_schema() + .unwrap_or(Arc::new(DFSchema::empty())); let values = rows .into_iter() .map(|row| { @@ -48,6 +45,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }) .collect::>>()?; - LogicalPlanBuilder::values(values, value_schema.as_ref())?.build() + if schema.fields().is_empty() { + LogicalPlanBuilder::values(values, None)?.build() + } else { + LogicalPlanBuilder::values(values, Some(&schema))?.build() + } } } From 08e6bf37e13e40b78524af7e73ce350d895f2a0c Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Fri, 11 Oct 2024 19:54:15 +0800 Subject: [PATCH 03/13] add values table without schema Signed-off-by: jayzhan211 --- datafusion-cli/Cargo.lock | 65 ++++++++++--------- .../expr-common/src/type_coercion/binary.rs | 44 ++++++++++++- datafusion/sqllogictest/test_files/struct.slt | 21 ++++++ 3 files changed, 96 insertions(+), 34 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index aa64e14fca8e..29c07f45ca44 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -406,9 +406,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e614738943d3f68c628ae3dbce7c3daffb196665f82f8c8ea6b65de73c79429" +checksum = "998282f8f49ccd6116b0ed8a4de0fbd3151697920e7c7533416d6e25e76434a7" dependencies = [ "bzip2", "flate2", @@ -523,9 +523,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.45.0" +version = "1.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e33ae899566f3d395cbf42858e433930682cc9c1889fa89318896082fef45efb" +checksum = "0dc2faec3205d496c7e57eff685dd944203df7ce16a4116d0281c44021788a7b" dependencies = [ "aws-credential-types", "aws-runtime", @@ -545,9 +545,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.46.0" +version = "1.47.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f39c09e199ebd96b9f860b0fce4b6625f211e064ad7c8693b72ecf7ef03881e0" +checksum = "c93c241f52bc5e0476e259c953234dab7e2a35ee207ee202e86c0095ec4951dc" dependencies = [ "aws-credential-types", "aws-runtime", @@ -567,9 +567,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.45.0" +version = "1.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d95f93a98130389eb6233b9d615249e543f6c24a68ca1f109af9ca5164a8765" +checksum = "b259429be94a3459fa1b00c5684faee118d74f9577cc50aebadc36e507c63b5f" dependencies = [ "aws-credential-types", "aws-runtime", @@ -663,9 +663,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.1" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1ce695746394772e7000b39fe073095db6d45a862d0767dd5ad0ac0d7f8eb87" +checksum = "a065c0fe6fdbdf9f11817eb68582b2ab4aff9e9c39e986ae48f7ec576c6322db" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -917,9 +917,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.28" +version = "1.1.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e80e3b6a3ab07840e1cae9b0666a63970dc28e8ed5ffbcdacbfc760c281bfc1" +checksum = "58e804ac3194a48bb129643eb1d62fcc20d18c6b8c181704489353d13120bcd1" dependencies = [ "jobserver", "libc", @@ -974,9 +974,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.19" +version = "4.5.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7be5744db7978a28d9df86a214130d106a89ce49644cbc4e3f0c22c3fba30615" +checksum = "b97f376d85a664d5837dbae44bf546e6477a679ff6610010f17276f686d867e8" dependencies = [ "clap_builder", "clap_derive", @@ -984,9 +984,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.19" +version = "4.5.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5fbc17d3ef8278f55b282b2a2e75ae6f6c7d4bb70ed3d0382375104bfafdb4b" +checksum = "19bc80abd44e4bed93ca373a0704ccbd1b710dc5749406201bb018272808dc54" dependencies = [ "anstream", "anstyle", @@ -1358,6 +1358,7 @@ version = "42.0.0" dependencies = [ "arrow", "datafusion-common", + "itertools", "paste", ] @@ -2258,9 +2259,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.70" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a" +checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" dependencies = [ "wasm-bindgen", ] @@ -4004,9 +4005,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5" +checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" dependencies = [ "cfg-if", "once_cell", @@ -4015,9 +4016,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b" +checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" dependencies = [ "bumpalo", "log", @@ -4030,9 +4031,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.43" +version = "0.4.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e9300f63a621e96ed275155c108eb6f843b6a26d053f122ab69724559dc8ed" +checksum = "cc7ec4f8827a71586374db3e87abdb5a2bb3a15afed140221307c3ec06b1f63b" dependencies = [ "cfg-if", "js-sys", @@ -4042,9 +4043,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf" +checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -4052,9 +4053,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" +checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", @@ -4065,9 +4066,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" +checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" [[package]] name = "wasm-streams" @@ -4084,9 +4085,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.70" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26fdeaafd9bd129f65e7c031593c24d62186301e0c72c8978fa1678be7d532c0" +checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/datafusion/expr-common/src/type_coercion/binary.rs b/datafusion/expr-common/src/type_coercion/binary.rs index 1c74f03b4987..f5b705334798 100644 --- a/datafusion/expr-common/src/type_coercion/binary.rs +++ b/datafusion/expr-common/src/type_coercion/binary.rs @@ -25,8 +25,8 @@ use crate::operator::Operator; use arrow::array::{new_empty_array, Array}; use arrow::compute::can_cast_types; use arrow::datatypes::{ - DataType, Field, FieldRef, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, - DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE, + DataType, Field, FieldRef, Fields, TimeUnit, DECIMAL128_MAX_PRECISION, + DECIMAL128_MAX_SCALE, DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE, }; use datafusion_common::{ exec_datafusion_err, exec_err, internal_err, plan_datafusion_err, plan_err, Result, @@ -481,6 +481,46 @@ fn type_union_resolution_coercion( type_union_resolution_coercion(lhs.data_type(), rhs.data_type()); new_item_type.map(|t| DataType::List(Arc::new(Field::new("item", t, true)))) } + (DataType::Struct(lhs), DataType::Struct(rhs)) => { + if lhs.len() != rhs.len() { + return None; + } + + // Search the field in the right hand side with the SAME field name + fn search_corresponding_coerced_type( + lhs_field: &FieldRef, + rhs: &Fields, + ) -> Option { + for rhs_field in rhs.iter() { + if lhs_field.name() == rhs_field.name() { + if let Some(t) = type_union_resolution_coercion( + lhs_field.data_type(), + rhs_field.data_type(), + ) { + return Some(t); + } else { + return None; + } + } + } + + None + } + + let types = lhs + .iter() + .map(|lhs_field| search_corresponding_coerced_type(lhs_field, rhs)) + .collect::>>()?; + + let fields = types + .into_iter() + .enumerate() + .map(|(i, datatype)| { + Arc::new(Field::new(format!("c{i}"), datatype, true)) + }) + .collect::>(); + Some(DataType::Struct(fields.into())) + } _ => { // numeric coercion is the same as comparison coercion, both find the narrowest type // that can accommodate both types diff --git a/datafusion/sqllogictest/test_files/struct.slt b/datafusion/sqllogictest/test_files/struct.slt index a0080bbf13a9..7596b820c688 100644 --- a/datafusion/sqllogictest/test_files/struct.slt +++ b/datafusion/sqllogictest/test_files/struct.slt @@ -453,6 +453,27 @@ Struct([Field { name: "r", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ statement ok drop table t; +statement ok +create table t as values({r: 'a', c: 1}), ({r: 'b', c: 2.3}); + +query ? +select * from t; +---- +{c0: a, c1: 1.0} +{c0: b, c1: 2.3} + +query T +select arrow_typeof(column1) from t; +---- +Struct([Field { name: "c0", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c1", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) +Struct([Field { name: "c0", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c1", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) + +statement ok +drop table t; + +query error DataFusion error: Arrow error: Cast error: Cannot cast string 'a' to value of Float64 type +create table t as values({r: 'a', c: 1}), ({c: 2.3, r: 'b'}); + ################################## ## Test Coalesce with Struct ################################## From ee7880e3a08a76399720121fbe5f7cebb6d50635 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Fri, 11 Oct 2024 19:58:10 +0800 Subject: [PATCH 04/13] cleanup Signed-off-by: jayzhan211 --- datafusion/sql/src/statement.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 5c3d935be3ea..9c71f145988b 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -394,9 +394,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let column_defaults = self.build_column_defaults(&columns, planner_context)?; - // println!("query: {:?}", query); - // println!("column_defaults: {:?}", column_defaults); - // println!("columns: {:?}", columns); let has_columns = !columns.is_empty(); let schema = self.build_schema(columns)?.to_dfschema_ref()?; if has_columns { @@ -409,7 +406,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let input_schema = plan.schema(); let plan = if has_columns { - // let schema = self.build_schema(columns)?.to_dfschema_ref()?; if schema.fields().len() != input_schema.fields().len() { return plan_err!( "Mismatch: {} columns specified, but result has {} columns", @@ -455,7 +451,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } None => { - // let schema = self.build_schema(columns)?.to_dfschema_ref()?; let plan = EmptyRelation { produce_one_row: false, schema, From 66d29e16567d6a29ee20580184aa41a1ac44bc70 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Fri, 18 Oct 2024 14:25:34 +0800 Subject: [PATCH 05/13] fmt Signed-off-by: jayzhan211 --- datafusion/functions-nested/src/make_array.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions-nested/src/make_array.rs b/datafusion/functions-nested/src/make_array.rs index 4877219a5d0b..84d51851ba08 100644 --- a/datafusion/functions-nested/src/make_array.rs +++ b/datafusion/functions-nested/src/make_array.rs @@ -32,8 +32,8 @@ use datafusion_common::{plan_err, utils::array_into_list_array_nullable, Result} use datafusion_expr::binary::{ try_type_union_resolution_with_struct, type_union_resolution, }; -use datafusion_expr::TypeSignature; use datafusion_expr::scalar_doc_sections::DOC_SECTION_ARRAY; +use datafusion_expr::TypeSignature; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; From 17ffd2bddb3d210a1f777e578315633388d8bfc3 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 19 Oct 2024 09:12:53 +0800 Subject: [PATCH 06/13] rm unused import Signed-off-by: jayzhan211 --- datafusion/functions-nested/src/make_array.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/functions-nested/src/make_array.rs b/datafusion/functions-nested/src/make_array.rs index 84d51851ba08..abd7649e9ec7 100644 --- a/datafusion/functions-nested/src/make_array.rs +++ b/datafusion/functions-nested/src/make_array.rs @@ -37,7 +37,6 @@ use datafusion_expr::TypeSignature; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; -use itertools::Itertools; use crate::utils::make_scalar_function; From afee3b7069540cd29e325ce5f9658811f0bf278f Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Mon, 21 Oct 2024 16:34:45 +0800 Subject: [PATCH 07/13] fmt Signed-off-by: jayzhan211 --- datafusion/expr/src/logical_plan/builder.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 29b5d0d3944b..2df63972dd22 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -44,16 +44,17 @@ use crate::{ TableProviderFilterPushDown, TableSource, WriteOp, }; -use arrow::compute::can_cast_types; use super::dml::InsertOp; use super::plan::ColumnUnnestList; +use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::file_options::file_type::FileType; use datafusion_common::{ exec_err, get_target_functional_dependencies, internal_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, - Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions, + FunctionalDependencies, Result, ScalarValue, TableReference, ToDFSchema, + UnnestOptions, }; use datafusion_expr_common::type_coercion::binary::type_union_resolution; From fdb15f6dfbebe7354461c608847382e75417ac24 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 22 Oct 2024 21:43:26 +0800 Subject: [PATCH 08/13] use option instead of vec Signed-off-by: jayzhan211 --- datafusion/expr-common/src/type_coercion/binary.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/datafusion/expr-common/src/type_coercion/binary.rs b/datafusion/expr-common/src/type_coercion/binary.rs index 6e1a134b578a..2bbb5fdd7479 100644 --- a/datafusion/expr-common/src/type_coercion/binary.rs +++ b/datafusion/expr-common/src/type_coercion/binary.rs @@ -29,7 +29,8 @@ use arrow::datatypes::{ DECIMAL128_MAX_SCALE, DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE, }; use datafusion_common::{ - exec_datafusion_err, exec_err, internal_err, plan_datafusion_err, plan_err, Result, + exec_datafusion_err, exec_err, internal_err, plan_datafusion_err, plan_err, + DataFusionError, Result, }; use itertools::Itertools; @@ -375,6 +376,8 @@ impl From<&DataType> for TypeCategory { /// decimal precision and scale when coercing decimal types. /// /// This function doesn't preserve correct field name and nullability for the struct type, we only care about data type. +/// +/// Returns Option because we might want to continue on the code even if the data types are not coercible to the common type pub fn type_union_resolution(data_types: &[DataType]) -> Option { if data_types.is_empty() { return None; @@ -533,18 +536,16 @@ fn type_union_resolution_coercion( } pub fn try_type_union_resolution(data_types: &[DataType]) -> Result> { - let mut errors = vec![]; + let mut err = None; match try_type_union_resolution_with_struct(data_types) { Ok(struct_types) => return Ok(struct_types), - Err(e) => { - errors.push(e); - } + Err(e) => err = Some(e), } if let Some(new_type) = type_union_resolution(data_types) { Ok(vec![new_type; data_types.len()]) } else { - exec_err!("Fail to find the coerced type, errors: {:?}", errors) + exec_err!("Fail to find the coerced type, errors: {:?}", err) } } From 47fa7822f06487a73e9eacd80cf48f5d33c1c58e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 22 Oct 2024 14:41:56 -0400 Subject: [PATCH 09/13] Fix clippy --- datafusion/expr-common/src/type_coercion/binary.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/datafusion/expr-common/src/type_coercion/binary.rs b/datafusion/expr-common/src/type_coercion/binary.rs index 2bbb5fdd7479..3071a904c11a 100644 --- a/datafusion/expr-common/src/type_coercion/binary.rs +++ b/datafusion/expr-common/src/type_coercion/binary.rs @@ -29,8 +29,7 @@ use arrow::datatypes::{ DECIMAL128_MAX_SCALE, DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE, }; use datafusion_common::{ - exec_datafusion_err, exec_err, internal_err, plan_datafusion_err, plan_err, - DataFusionError, Result, + exec_datafusion_err, exec_err, internal_err, plan_datafusion_err, plan_err, Result, }; use itertools::Itertools; @@ -536,11 +535,10 @@ fn type_union_resolution_coercion( } pub fn try_type_union_resolution(data_types: &[DataType]) -> Result> { - let mut err = None; - match try_type_union_resolution_with_struct(data_types) { + let err = match try_type_union_resolution_with_struct(data_types) { Ok(struct_types) => return Ok(struct_types), - Err(e) => err = Some(e), - } + Err(e) => Some(e), + }; if let Some(new_type) = type_union_resolution(data_types) { Ok(vec![new_type; data_types.len()]) From a0eb9cabd0c4b31fe5bdd8219e0a886e85210097 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Wed, 23 Oct 2024 07:56:07 +0800 Subject: [PATCH 10/13] add values back and rename Signed-off-by: jayzhan211 --- .../expr-common/src/type_coercion/binary.rs | 1 + datafusion/expr/src/logical_plan/builder.rs | 39 ++++++++++++++++--- .../simplify_expressions/simplify_exprs.rs | 2 +- datafusion/proto/src/logical_plan/mod.rs | 2 +- datafusion/sql/src/planner.rs | 8 ++-- datafusion/sql/src/values.rs | 4 +- 6 files changed, 43 insertions(+), 13 deletions(-) diff --git a/datafusion/expr-common/src/type_coercion/binary.rs b/datafusion/expr-common/src/type_coercion/binary.rs index 3071a904c11a..2f806bf76d16 100644 --- a/datafusion/expr-common/src/type_coercion/binary.rs +++ b/datafusion/expr-common/src/type_coercion/binary.rs @@ -534,6 +534,7 @@ fn type_union_resolution_coercion( } } +/// Handle type union resolution including struct type and others. pub fn try_type_union_resolution(data_types: &[DataType]) -> Result> { let err = match try_type_union_resolution_with_struct(data_types) { Ok(struct_types) => return Ok(struct_types), diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 2df63972dd22..3db5ea40404d 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -170,6 +170,29 @@ impl LogicalPlanBuilder { }))) } + pub fn values(values: Vec>) -> Result { + if values.is_empty() { + return plan_err!("Values list cannot be empty"); + } + let n_cols = values[0].len(); + if n_cols == 0 { + return plan_err!("Values list cannot be zero length"); + } + for (i, row) in values.iter().enumerate() { + if row.len() != n_cols { + return plan_err!( + "Inconsistent data length across values list: got {} values in row {} but expected {}", + row.len(), + i, + n_cols + ); + } + } + + // Infer from data itself + Self::infer_data(values) + } + /// Create a values list based relation, and the schema is inferred from data itself or table schema if provided, consuming /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html) /// documentation for more details. @@ -179,7 +202,10 @@ impl LogicalPlanBuilder { /// so it's usually better to override the default names with a table alias list. /// /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided. - pub fn values(values: Vec>, schema: Option<&DFSchemaRef>) -> Result { + pub fn values_with_schema( + values: Vec>, + schema: Option<&DFSchemaRef>, + ) -> Result { if values.is_empty() { return plan_err!("Values list cannot be empty"); } @@ -200,14 +226,17 @@ impl LogicalPlanBuilder { // Check the type of value against the schema if let Some(schema) = schema { - Self::infer_from_schema(values, schema) + Self::infer_values_from_schema(values, schema) } else { // Infer from data itself Self::infer_data(values) } } - fn infer_from_schema(values: Vec>, schema: &DFSchema) -> Result { + fn infer_values_from_schema( + values: Vec>, + schema: &DFSchema, + ) -> Result { let n_cols = values[0].len(); let mut field_types: Vec = Vec::with_capacity(n_cols); for j in 0..n_cols { @@ -2364,10 +2393,10 @@ mod tests { fn test_union_after_join() -> Result<()> { let values = vec![vec![lit(1)]]; - let left = LogicalPlanBuilder::values(values.clone(), None)? + let left = LogicalPlanBuilder::values_with_schema(values.clone(), None)? .alias("left")? .build()?; - let right = LogicalPlanBuilder::values(values, None)? + let right = LogicalPlanBuilder::values_with_schema(values, None)? .alias("right")? .build()?; diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index 251ef1acce64..f61db667aef5 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -417,7 +417,7 @@ mod tests { Box::new(lit(1)), )); let values = vec![vec![expr1, expr2]]; - let plan = LogicalPlanBuilder::values(values, None)?.build()?; + let plan = LogicalPlanBuilder::values_with_schema(values, None)?.build()?; let expected = "\ Values: (Int32(3) AS Int32(1) + Int32(2), Int32(1) AS Int32(2) - Int32(1))"; diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index c329c3b5bb41..052862b2afdf 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -282,7 +282,7 @@ impl AsLogicalPlan for LogicalPlanNode { .map_err(|e| e.into()) }?; - LogicalPlanBuilder::values(values, None)?.build() + LogicalPlanBuilder::values_with_schema(values, None)?.build() } LogicalPlanType::Projection(projection) => { let input: LogicalPlan = diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index b1d6c3cb0ab2..072d2320fccf 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -139,7 +139,7 @@ pub struct PlannerContext { /// FROM clauses, this should become a suffix of the `outer_query_schema`. outer_from_schema: Option, /// The query schema defined by the table - table_schema: Option, + create_table_schema: Option, } impl Default for PlannerContext { @@ -156,7 +156,7 @@ impl PlannerContext { ctes: HashMap::new(), outer_query_schema: None, outer_from_schema: None, - table_schema: None, + create_table_schema: None, } } @@ -188,12 +188,12 @@ impl PlannerContext { &mut self, mut schema: Option, ) -> Option { - std::mem::swap(&mut self.table_schema, &mut schema); + std::mem::swap(&mut self.create_table_schema, &mut schema); schema } pub fn table_schema(&self) -> Option { - self.table_schema.clone() + self.create_table_schema.clone() } // Return a clone of the outer FROM schema diff --git a/datafusion/sql/src/values.rs b/datafusion/sql/src/values.rs index f0fc2aed53dd..d64bb8d3f02b 100644 --- a/datafusion/sql/src/values.rs +++ b/datafusion/sql/src/values.rs @@ -46,9 +46,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .collect::>>()?; if schema.fields().is_empty() { - LogicalPlanBuilder::values(values, None)?.build() + LogicalPlanBuilder::values_with_schema(values, None)?.build() } else { - LogicalPlanBuilder::values(values, Some(&schema))?.build() + LogicalPlanBuilder::values_with_schema(values, Some(&schema))?.build() } } } From 3310cbf7d72a4f5d2f72e5c362a08453d10c62b3 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Wed, 23 Oct 2024 16:18:05 +0800 Subject: [PATCH 11/13] invalid query Signed-off-by: jayzhan211 --- datafusion/sql/src/values.rs | 7 +++---- datafusion/sqllogictest/test_files/ddl.slt | 6 ++++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/datafusion/sql/src/values.rs b/datafusion/sql/src/values.rs index d64bb8d3f02b..35b4c5b99937 100644 --- a/datafusion/sql/src/values.rs +++ b/datafusion/sql/src/values.rs @@ -33,18 +33,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { rows, } = values; - let schema = planner_context - .table_schema() - .unwrap_or(Arc::new(DFSchema::empty())); + let empty_schema = Arc::new(DFSchema::empty()); let values = rows .into_iter() .map(|row| { row.into_iter() - .map(|v| self.sql_to_expr(v, &schema, planner_context)) + .map(|v| self.sql_to_expr(v, &empty_schema, planner_context)) .collect::>>() }) .collect::>>()?; + let schema = planner_context.table_schema().unwrap_or(empty_schema); if schema.fields().is_empty() { LogicalPlanBuilder::values_with_schema(values, None)?.build() } else { diff --git a/datafusion/sqllogictest/test_files/ddl.slt b/datafusion/sqllogictest/test_files/ddl.slt index 813f7e95adf0..3205920d7110 100644 --- a/datafusion/sqllogictest/test_files/ddl.slt +++ b/datafusion/sqllogictest/test_files/ddl.slt @@ -799,3 +799,9 @@ CREATE EXTERNAL TEMPORARY TABLE tty STORED as ARROW LOCATION '../core/tests/data statement error DataFusion error: This feature is not implemented: Temporary views not supported CREATE TEMPORARY VIEW y AS VALUES (1,2,3); + +query error DataFusion error: Schema error: No field named a\. +EXPLAIN CREATE TABLE t(a int) AS VALUES (a + a); + +statement error DataFusion error: Schema error: No field named a\. +CREATE TABLE t(a int) AS SELECT x FROM (VALUES (a)) t(x) WHERE false; \ No newline at end of file From b9b85248c7af2a419629081d2b4ab5bdff3c9a1c Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Wed, 23 Oct 2024 21:35:36 +0800 Subject: [PATCH 12/13] use values if no schema Signed-off-by: jayzhan211 --- datafusion/expr/src/logical_plan/builder.rs | 13 ++++--------- .../src/simplify_expressions/simplify_exprs.rs | 2 +- datafusion/proto/src/logical_plan/mod.rs | 2 +- datafusion/sql/src/values.rs | 4 ++-- 4 files changed, 8 insertions(+), 13 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 3db5ea40404d..ce68012c6cf4 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -204,7 +204,7 @@ impl LogicalPlanBuilder { /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided. pub fn values_with_schema( values: Vec>, - schema: Option<&DFSchemaRef>, + schema: &DFSchemaRef, ) -> Result { if values.is_empty() { return plan_err!("Values list cannot be empty"); @@ -225,12 +225,7 @@ impl LogicalPlanBuilder { } // Check the type of value against the schema - if let Some(schema) = schema { - Self::infer_values_from_schema(values, schema) - } else { - // Infer from data itself - Self::infer_data(values) - } + Self::infer_values_from_schema(values, schema) } fn infer_values_from_schema( @@ -2393,10 +2388,10 @@ mod tests { fn test_union_after_join() -> Result<()> { let values = vec![vec![lit(1)]]; - let left = LogicalPlanBuilder::values_with_schema(values.clone(), None)? + let left = LogicalPlanBuilder::values(values.clone())? .alias("left")? .build()?; - let right = LogicalPlanBuilder::values_with_schema(values, None)? + let right = LogicalPlanBuilder::values(values)? .alias("right")? .build()?; diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index f61db667aef5..200f1f159d81 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -417,7 +417,7 @@ mod tests { Box::new(lit(1)), )); let values = vec![vec![expr1, expr2]]; - let plan = LogicalPlanBuilder::values_with_schema(values, None)?.build()?; + let plan = LogicalPlanBuilder::values(values)?.build()?; let expected = "\ Values: (Int32(3) AS Int32(1) + Int32(2), Int32(1) AS Int32(2) - Int32(1))"; diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 052862b2afdf..4adbb9318d51 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -282,7 +282,7 @@ impl AsLogicalPlan for LogicalPlanNode { .map_err(|e| e.into()) }?; - LogicalPlanBuilder::values_with_schema(values, None)?.build() + LogicalPlanBuilder::values(values)?.build() } LogicalPlanType::Projection(projection) => { let input: LogicalPlan = diff --git a/datafusion/sql/src/values.rs b/datafusion/sql/src/values.rs index 35b4c5b99937..a4001bea7dea 100644 --- a/datafusion/sql/src/values.rs +++ b/datafusion/sql/src/values.rs @@ -45,9 +45,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let schema = planner_context.table_schema().unwrap_or(empty_schema); if schema.fields().is_empty() { - LogicalPlanBuilder::values_with_schema(values, None)?.build() + LogicalPlanBuilder::values(values)?.build() } else { - LogicalPlanBuilder::values_with_schema(values, Some(&schema))?.build() + LogicalPlanBuilder::values_with_schema(values, &schema)?.build() } } } From 4e0056cd465858fabf71c7f335b15197891bdd92 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Wed, 23 Oct 2024 21:37:58 +0800 Subject: [PATCH 13/13] add doc Signed-off-by: jayzhan211 --- datafusion/expr/src/logical_plan/builder.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index ce68012c6cf4..d2ecd56cdc23 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -170,6 +170,13 @@ impl LogicalPlanBuilder { }))) } + /// Create a values list based relation, and the schema is inferred from data, consuming + /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html) + /// documentation for more details. + /// + /// so it's usually better to override the default names with a table alias list. + /// + /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided. pub fn values(values: Vec>) -> Result { if values.is_empty() { return plan_err!("Values list cannot be empty");