From 5a78fdc5b226af09fd3407120531a5636ad92de9 Mon Sep 17 00:00:00 2001 From: Andrei Dragomir Date: Tue, 18 Jun 2024 12:19:26 +0300 Subject: [PATCH 1/4] [HSTACK] Bugfix #10978: Inconsistent behavior in HashJoin Projections --- .../src/physical_optimizer/join_selection.rs | 23 ++++++- .../physical-plan/src/joins/hash_join.rs | 23 +------ datafusion/physical-plan/src/joins/utils.rs | 64 ++++++++++++++++++- 3 files changed, 88 insertions(+), 22 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index b849df88e4aa..e495ca79c100 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -42,6 +42,7 @@ use datafusion_expr::sort_properties::SortProperties; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::joins::utils::{project_index_to_exprs, remap_join_projections_join_to_output}; /// The [`JoinSelection`] rule tries to modify a given plan so that it can /// accommodate infinite sources and optimize joins in the plan according to @@ -193,8 +194,28 @@ pub fn swap_hash_join( Ok(Arc::new(new_join)) } else { // TODO avoid adding ProjectionExec again and again, only adding Final Projection + // ADR: FIXME the projection inside the hash join functionality is not consistent + // see https://github.com/apache/datafusion/commit/afddb321e9a98ffc1947005c38b6b50a6ef2a401 + // Failing to do the below code will create a projection exec with a projection that is + // possibly outside the schema. + let actual_projection = if new_join.projection.is_some() { + let tmp = remap_join_projections_join_to_output( + new_join.left().clone(), + new_join.right().clone(), + new_join.join_type(), + new_join.projection.clone(), + )?.unwrap(); + project_index_to_exprs( + &tmp, + &new_join.schema() + ) + } else { + swap_reverting_projection(&left.schema(), &right.schema()) + }; + // let swap_proj = swap_reverting_projection(&left.schema(), &right.schema()); + let proj = ProjectionExec::try_new( - swap_reverting_projection(&left.schema(), &right.schema()), + actual_projection, Arc::new(new_join), )?; Ok(Arc::new(proj)) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 7fac23ad5557..8067e78f9b17 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -69,13 +69,14 @@ use datafusion_physical_expr::equivalence::{ join_equivalence_properties, ProjectionMapping, }; use datafusion_physical_expr::expressions::UnKnownColumn; -use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; +use datafusion_physical_expr::PhysicalExprRef; use ahash::RandomState; use datafusion_expr::Operator; use datafusion_physical_expr_common::datum::compare_op_for_nested; use futures::{ready, Stream, StreamExt, TryStreamExt}; use parking_lot::Mutex; +use crate::joins::utils::project_index_to_exprs; type SharedBitmapBuilder = Mutex; @@ -598,25 +599,6 @@ impl DisplayAs for HashJoinExec { } } -fn project_index_to_exprs( - projection_index: &[usize], - schema: &SchemaRef, -) -> Vec<(Arc, String)> { - projection_index - .iter() - .map(|index| { - let field = schema.field(*index); - ( - Arc::new(datafusion_physical_expr::expressions::Column::new( - field.name(), - *index, - )) as Arc, - field.name().to_owned(), - ) - }) - .collect::>() -} - impl ExecutionPlan for HashJoinExec { fn name(&self) -> &'static str { "HashJoinExec" @@ -1579,6 +1561,7 @@ mod tests { use hashbrown::raw::RawTable; use rstest::*; use rstest_reuse::*; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; fn div_ceil(a: usize, b: usize) -> usize { (a + b - 1) / b diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 8fdbf7041e2f..96f132ed0c84 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -39,6 +39,7 @@ use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use arrow_array::builder::UInt64Builder; use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray}; use arrow_buffer::ArrowNativeType; +use arrow_schema::SchemaRef; use datafusion_common::cast::as_boolean_array; use datafusion_common::stats::Precision; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; @@ -46,7 +47,7 @@ use datafusion_common::{ plan_err, DataFusionError, JoinSide, JoinType, Result, SharedResult, }; use datafusion_expr::interval_arithmetic::Interval; -use datafusion_physical_expr::equivalence::add_offset_to_expr; +use datafusion_physical_expr::equivalence::{add_offset_to_expr, ProjectionMapping}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::{collect_columns, merge_vectors}; use datafusion_physical_expr::{ @@ -57,6 +58,7 @@ use futures::future::{BoxFuture, Shared}; use futures::{ready, FutureExt}; use hashbrown::raw::RawTable; use parking_lot::Mutex; +use crate::common::can_project; /// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value. /// @@ -703,6 +705,66 @@ pub fn build_join_schema( (fields.finish(), column_indices) } +/// This assumes that the projections are relative to the join schema. +/// We need to redo them to point to the actual hash join output schema +pub fn remap_join_projections_join_to_output( + left: Arc, + right: Arc, + join_type: &JoinType, + projection: Option>, +) -> datafusion_common::Result>> { + match projection { + Some(ref projection) => { + let (join_schema, indices) = build_join_schema( + left.schema().as_ref(), + right.schema().as_ref(), + join_type + ); + + let join_schema = Arc::new(join_schema); + can_project(&join_schema, Some(projection.clone()).as_ref())?; + + let projection_exprs = project_index_to_exprs( + &projection.clone(), + &join_schema + ); + let projection_mapping = + ProjectionMapping::try_new(&projection_exprs, &join_schema)?; + + // projection mapping contains from and to, get the second one + let dest_physical_exprs = projection_mapping.map.iter().map(|(f, t)| t.clone()).collect::>(); + let dest_columns = dest_physical_exprs.iter().map(|pe| pe.as_any().downcast_ref::()).collect::>(); + let output = dest_physical_exprs.iter().enumerate().map(|(idx, pe)| { + // :Vec<(Arc, String)> + // (pe.clone(), dest_column.name().to_owned()) + let dest_column = dest_columns.get(idx).unwrap().unwrap(); + dest_column.index() + }).collect::>(); + Ok(Some(output)) + }, + None => Ok(None) + } +} + +pub fn project_index_to_exprs( + projection_index: &[usize], + schema: &SchemaRef, +) -> Vec<(Arc, String)> { + projection_index + .iter() + .map(|index| { + let field = schema.field(*index); + ( + Arc::new(datafusion_physical_expr::expressions::Column::new( + field.name(), + *index, + )) as Arc, + field.name().to_owned(), + ) + }) + .collect::>() +} + /// A [`OnceAsync`] can be used to run an async closure once, with subsequent calls /// to [`OnceAsync::once`] returning a [`OnceFut`] to the same asynchronous computation /// From 6b071045705bf7aa89e930266475e16fa9301e6d Mon Sep 17 00:00:00 2001 From: Andrei Dragomir Date: Tue, 23 Jul 2024 16:14:18 +0300 Subject: [PATCH 2/4] [HSTACK] Add NestedSchemaMapping functionality - correct behavior for casting and converting batches between any schema --- Cargo.toml | 12 + datafusion/common/Cargo.toml | 1 + datafusion/common/src/deep.rs | 1008 +++++++++++++++++ datafusion/common/src/lib.rs | 1 + datafusion/core/src/datasource/mod.rs | 1 + .../src/datasource/schema_adapter_deep.rs | 658 +++++++++++ 6 files changed, 1681 insertions(+) create mode 100644 datafusion/common/src/deep.rs create mode 100644 datafusion/core/src/datasource/schema_adapter_deep.rs diff --git a/Cargo.toml b/Cargo.toml index 124747999041..c4cf0fb61c2c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,18 @@ # specific language governing permissions and limitations # under the License. +[profile.idea] +inherits = "dev" +opt-level = 0 +debug = 2 +split-debuginfo = "packed" +strip = "none" +debug-assertions = true +overflow-checks = false +incremental = true +codegen-units = 256 +lto = "off" + [workspace] exclude = ["datafusion-cli", "dev/depcheck"] members = [ diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 79e20ba1215c..6997f6884d09 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -64,6 +64,7 @@ paste = "1.0.15" pyo3 = { version = "0.21.0", optional = true } sqlparser = { workspace = true } tokio = { workspace = true } +log = { workspace = true } [target.'cfg(target_family = "wasm")'.dependencies] instant = { version = "0.1", features = ["wasm-bindgen"] } diff --git a/datafusion/common/src/deep.rs b/datafusion/common/src/deep.rs new file mode 100644 index 000000000000..bc775cec5bca --- /dev/null +++ b/datafusion/common/src/deep.rs @@ -0,0 +1,1008 @@ +use crate::cast::as_map_array; +use crate::{project_schema, DataFusionError, DFSchemaRef}; +use arrow::compute::{can_cast_types, cast}; +use arrow_array::cast::{ + as_fixed_size_list_array, as_generic_list_array, as_struct_array, +}; +use arrow_array::{ + new_null_array, Array, ArrayRef, FixedSizeListArray, LargeListArray, ListArray, + MapArray, RecordBatch, RecordBatchOptions, StructArray, +}; +use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; +use log::{error, trace}; +use std::collections::HashMap; +use std::fs::OpenOptions; +use std::io::Write; +use std::sync::Arc; + +/// Check whether an Arrow [DataType] is recursive in the sense that we need to +/// look inside and continue unpacking the data +/// This is used when creating a schema based on a deep projection +pub fn data_type_recurs(dt: &DataType) -> bool { + match dt { + // scalars + DataType::Null + | DataType::Boolean + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float16 + | DataType::Float32 + | DataType::Float64 + | DataType::Timestamp(_, _) + | DataType::Date32 + | DataType::Date64 + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Duration(_) + | DataType::Interval(_) + | DataType::Binary + | DataType::FixedSizeBinary(_) + | DataType::LargeBinary + | DataType::BinaryView + | DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Utf8View + | DataType::Decimal128(_, _) + | DataType::Decimal256(_, _) + | DataType::Dictionary(_, _) => false, + // containers + DataType::RunEndEncoded(_, val) => data_type_recurs(val.data_type()), + DataType::Union(_, _) => true, + DataType::List(f) => data_type_recurs(f.data_type()), + DataType::ListView(f) => data_type_recurs(f.data_type()), + DataType::FixedSizeList(f, _) => data_type_recurs(f.data_type()), + DataType::LargeList(f) => data_type_recurs(f.data_type()), + DataType::LargeListView(f) => data_type_recurs(f.data_type()), + // list of struct + DataType::Map(f, _) => true, + DataType::Struct(_) => true, + } +} + +/// Mutually recursive function with [rewrite_record_batch_field] +/// Given the source and destination fields, as well as the list of Arrow [Array]s +/// Returns the modified arrays as well as the modified fields. +pub fn rewrite_record_batch_fields( + dst_fields: &Fields, + src_fields: &Fields, + arrays: Vec, + num_rows: usize, + fill_missing_source_fields: bool, + error_on_missing_source_fields: bool, +) -> crate::Result<(Vec, Vec)> { + let mut out_arrays: Vec = vec![]; + let mut out_fields: Vec = vec![]; + for i in 0..dst_fields.len() { + let dst_field = dst_fields[i].clone(); + let dst_name = dst_field.name(); + + let src_field_opt = src_fields + .iter() + .enumerate() + .find(|(_idx, b)| b.name() == dst_name); + + // if the field exists in the source + if src_field_opt.is_some() { + let (src_idx, src_field) = src_field_opt.unwrap(); + let src_field = src_field.clone(); + let src_arr = arrays[src_idx].clone(); + let (tmp_array, tmp_field) = rewrite_record_batch_field( + dst_field, + src_field, + src_arr, + num_rows, + fill_missing_source_fields, + error_on_missing_source_fields, + )?; + out_arrays.push(tmp_array); + out_fields.push(tmp_field); + } else { + if fill_missing_source_fields { + let tmp_array = new_null_array(dst_field.data_type(), num_rows); + out_arrays.push(tmp_array); + out_fields.push(dst_field); + } else if error_on_missing_source_fields { + return Err(crate::DataFusionError::Internal(format!( + "field {dst_name} not found in source" + ))); + } + } + } + Ok((out_arrays, out_fields)) +} + +/// Mutually recursive function with [rewrite_record_batch_fields] +/// Rewrites a single field, returns a single modified field and array +/// If the field can be trivially casted (does not recur, simple data types) - falls back to Arrow functionality +/// If we have a container like field or struct, the function recurs for the data type inside +pub fn rewrite_record_batch_field( + dst_field: FieldRef, + src_field: FieldRef, + src_array: ArrayRef, + num_rows: usize, + fill_missing_source_fields: bool, + error_on_missing_source_fields: bool, +) -> crate::Result<(ArrayRef, FieldRef)> { + let arrow_cast_available = + can_cast_types(src_field.data_type(), dst_field.data_type()); + if arrow_cast_available { + let casted_array = cast(src_array.as_ref(), dst_field.data_type())?; + return Ok((casted_array, dst_field.clone())); + } + match (src_field.data_type(), dst_field.data_type()) { + (DataType::List(src_inner), DataType::List(dst_inner)) => { + if data_type_recurs(src_field.data_type()) { + let src_array_clone = src_array.clone(); + + let src_inner_list_array = + as_generic_list_array::(src_array_clone.as_ref()).clone(); + let src_offset_buffer = src_inner_list_array.offsets().clone(); + let src_nulls = match src_inner_list_array.nulls() { + None => None, + Some(x) => Some(x.clone()), + }; + let (values, field) = rewrite_record_batch_field( + dst_inner.clone(), + src_inner.clone(), + src_inner_list_array.values().clone(), + num_rows, + fill_missing_source_fields, + error_on_missing_source_fields, + )?; + let nlarr = ListArray::try_new( + field.clone(), + src_offset_buffer, + values, + src_nulls, + ); + let list_field = Arc::new(Field::new( + dst_field.name().clone(), + DataType::List(field.clone()), + dst_field.is_nullable(), + )); + + Ok((Arc::new(nlarr.unwrap()), list_field)) + } else { + let casted_array = + cast(src_array.as_ref(), dst_field.data_type())?; + return Ok((casted_array, dst_field.clone())); + } + } + ( + DataType::FixedSizeList(src_inner, src_sz), + DataType::FixedSizeList(dst_inner, dst_sz), + ) => { + if src_sz != dst_sz { + // Let Arrow do its thing, it's going to error + let casted_array = + cast(src_array.as_ref(), dst_field.data_type())?; + return Ok((casted_array, dst_field.clone())); + } + if data_type_recurs(src_field.data_type()) { + let tmp = src_array.clone(); + let src_inner_list_array = + as_fixed_size_list_array(tmp.as_ref()).clone(); + let src_nulls = match src_inner_list_array.nulls() { + None => None, + Some(x) => Some(x.clone()), + }; + let (values, field) = rewrite_record_batch_field( + dst_inner.clone(), + src_inner.clone(), + src_inner_list_array.values().clone(), + num_rows, + fill_missing_source_fields, + error_on_missing_source_fields, + )?; + + let nlarr = FixedSizeListArray::try_new( + field.clone(), + *dst_sz, + values, + src_nulls, + ); + let list_field = Arc::new(Field::new( + dst_field.name().clone(), + DataType::FixedSizeList(field.clone(), *dst_sz), + dst_field.is_nullable(), + )); + + Ok((Arc::new(nlarr.unwrap()), list_field)) + } else { + let casted_array = + cast(src_array.as_ref(), dst_field.data_type())?; + return Ok((casted_array, dst_field.clone())); + } + } + (DataType::LargeList(src_inner), DataType::LargeList(dst_inner)) => { + if data_type_recurs(src_field.data_type()) { + let tmp = src_array.clone(); + let src_inner_list_array = + as_generic_list_array::(tmp.as_ref()).clone(); + let src_offset_buffer = src_inner_list_array.offsets().clone(); + let src_nulls = match src_inner_list_array.nulls() { + None => None, + Some(x) => Some(x.clone()), + }; + let (values, field) = rewrite_record_batch_field( + dst_inner.clone(), + src_inner.clone(), + src_inner_list_array.values().clone(), + num_rows, + fill_missing_source_fields, + error_on_missing_source_fields, + )?; + + let nlarr = LargeListArray::try_new( + field.clone(), + src_offset_buffer, + values, + src_nulls, + ); + let list_field = Arc::new(Field::new( + dst_field.name().clone(), + DataType::LargeList(field.clone()), + dst_field.is_nullable(), + )); + + Ok((Arc::new(nlarr.unwrap()), list_field)) + } else { + let casted_array = + cast(src_array.as_ref(), dst_field.data_type())?; + return Ok((casted_array, dst_field.clone())); + } + } + + (DataType::Map(src_inner, _), DataType::Map(dst_inner, dst_ordered)) => { + match (src_inner.data_type(), dst_inner.data_type()) { + (DataType::Struct(src_inner_f), DataType::Struct(dst_inner_f)) => { + let src_map = as_map_array(src_array.as_ref())?; + let src_nulls = match src_map.nulls() { + None => None, + Some(x) => Some(x.clone()), + }; + let src_offset_buffer = src_map.offsets().clone(); + + let (tmp_values_array, tmp_values_field) = rewrite_record_batch_field( + dst_inner_f[1].clone(), + src_inner_f[1].clone(), + src_map.values().clone(), + num_rows, + fill_missing_source_fields, + error_on_missing_source_fields, + )?; + // re-build map from keys and values after recursing only on the values + let entry_struct = StructArray::from(vec![ + (dst_inner_f[0].clone(), src_map.keys().clone()), + (tmp_values_field, tmp_values_array), + ]); + + let struct_field = Arc::new(Field::new( + dst_inner.name().clone(), + entry_struct.data_type().clone(), + false, + )); + + let out_map = MapArray::try_new( + struct_field.clone(), + src_offset_buffer, + entry_struct, + src_nulls, + *dst_ordered, + )?; + let map_field = Arc::new(Field::new( + dst_field.name().clone(), + DataType::Map(struct_field.clone(), *dst_ordered), + dst_field.is_nullable(), + )); + + Ok((Arc::new(out_map), map_field)) + } + _ => unreachable!(), // unreachable + } + } + + (DataType::Struct(src_inner), DataType::Struct(dst_inner)) => { + let src_struct_array = as_struct_array(src_array.as_ref()); + let src_nulls = match src_struct_array.nulls() { + None => None, + Some(x) => Some(x.clone()), + }; + let src_columns = src_struct_array + .columns() + .iter() + .map(|a| a.clone()) + .collect::>(); + let (dst_columns, dst_fields) = rewrite_record_batch_fields( + dst_inner, + src_inner, + src_columns, + num_rows, + fill_missing_source_fields, + error_on_missing_source_fields, + )?; + let struct_array = + StructArray::try_new(dst_inner.clone(), dst_columns, src_nulls) + .map_err(|ae| DataFusionError::from(ae))?; + let struct_field = Field::new_struct( + dst_field.name(), + dst_fields, + dst_field.is_nullable(), + ); + Ok((Arc::new(struct_array), Arc::new(struct_field))) + } + _ => { + Err(DataFusionError::Internal(format!( + "Could not remap field src type={}, dst type={}", + src_field.data_type(), dst_field.data_type() + ))) + } + } +} + +/// Rewrites a record batch with a source schema to match a destination schema +pub fn try_rewrite_record_batch( + src: SchemaRef, + src_record_batch: RecordBatch, + dst: SchemaRef, + fill_missing_source_fields: bool, + error_on_missing_source_fields: bool, +) -> crate::Result { + + let num_rows = src_record_batch.num_rows(); + let (final_columns, final_fields) = rewrite_record_batch_fields( + dst.fields(), + src.fields(), + src_record_batch.columns().into(), + num_rows, + fill_missing_source_fields, + error_on_missing_source_fields, + )?; + + let options = RecordBatchOptions::new().with_row_count(Some(num_rows)); + let schema = Arc::new(Schema::new(final_fields)); + let record_batch = + RecordBatch::try_new_with_options(schema, final_columns, &options)?; + Ok(record_batch) +} + +/// Rewrites a record batch with a source schema to match a destination schema +/// The destination schema is further filtered using the mappings +pub fn try_rewrite_record_batch_with_mappings( + src: SchemaRef, + src_record_batch: RecordBatch, + dst: SchemaRef, + mappings: Vec>, +) -> crate::Result { + let src_record_batch_cols = src_record_batch.columns().to_vec(); + let num_rows = src_record_batch.num_rows(); + let field_vecs = dst.fields() + .iter() + .zip(mappings) + .map(|(field, src_idx)| match src_idx { + Some(batch_idx) => { + let arr = src_record_batch_cols[batch_idx].clone(); + let src_field = Arc::new(src.field(batch_idx).clone()); + rewrite_record_batch_field(field.clone(), src_field, arr, num_rows, true, false) + }, + None => Ok((new_null_array(field.data_type(), num_rows), field.clone())), + }) + .collect::, _>>()?; + + let mut final_columns: Vec = vec![]; + let mut final_fields: Vec = vec![]; + for (a, f) in field_vecs { + final_columns.push(a); + final_fields.push(f); + } + + let options = RecordBatchOptions::new().with_row_count(Some(num_rows)); + let schema = Arc::new(Schema::new(final_fields)); + let record_batch = + RecordBatch::try_new_with_options(schema, final_columns, &options)?; + Ok(record_batch) +} + +/// Mutually recursive with [can_rewrite_field] +/// checks whether we can rewrite a source [Fields] object to a destination one +/// the missing fields in the source behavior can be changed with the [`fill_missing_source_field`] +/// parameter. +pub fn can_rewrite_fields( + dst_fields: &Fields, + src_fields: &Fields, + fill_missing_source_fields: bool, +) -> bool { + let mut out = true; + for i in 0..dst_fields.len() { + let dst_field = dst_fields[i].clone(); + let dst_name = dst_field.name(); + + let src_field_opt = src_fields + .iter() + .enumerate() + .find(|(_idx, b)| b.name() == dst_name); + + // if the field exists in the source + if src_field_opt.is_some() { + let (_src_idx, src_field) = src_field_opt.unwrap(); + let src_field = src_field.clone(); + let can_cast = + can_rewrite_field(dst_field, src_field, fill_missing_source_fields); + out = out && can_cast; + } else { + out = out && fill_missing_source_fields; + } + } + out +} + +/// Mutually recursive with [can_rewrite_fielda] +/// checks whether we can rewrite a source [FieldRef] object to a destination one +/// the missing fields in the source behavior can be changed with the [`fill_missing_source_field`] +/// parameter. +pub fn can_rewrite_field( + dst_field: FieldRef, + src_field: FieldRef, + fill_missing_source_fields: bool, +) -> bool { + let can_cast_by_arrow = !data_type_recurs(dst_field.data_type()) + && !data_type_recurs(src_field.data_type()); + if can_cast_by_arrow { + return can_cast_types(src_field.data_type(), dst_field.data_type()); + } + match (src_field.data_type(), dst_field.data_type()) { + (DataType::List(src_inner), DataType::List(dst_inner)) + | (DataType::List(src_inner), DataType::LargeList(dst_inner)) + | (DataType::LargeList(src_inner), DataType::LargeList(dst_inner)) => { + if data_type_recurs(src_inner.data_type()) + && data_type_recurs(dst_inner.data_type()) + { + return can_rewrite_field( + dst_inner.clone(), + src_inner.clone(), + fill_missing_source_fields, + ); + } else { + return can_cast_types(src_inner.data_type(), dst_inner.data_type()); + } + } + ( + DataType::FixedSizeList(src_inner, src_sz), + DataType::FixedSizeList(dst_inner, dst_sz), + ) => { + if src_sz != dst_sz { + return false; + } + if data_type_recurs(src_inner.data_type()) + && data_type_recurs(dst_inner.data_type()) + { + return can_rewrite_field( + dst_inner.clone(), + src_inner.clone(), + fill_missing_source_fields, + ); + } else { + return can_cast_types(src_inner.data_type(), dst_inner.data_type()); + } + } + (DataType::Map(src_inner, _), DataType::Map(dst_inner, _)) => { + return match (src_inner.data_type(), dst_inner.data_type()) { + (DataType::Struct(src_inner_f), DataType::Struct(dst_inner_f)) => { + can_rewrite_field( + dst_inner_f[1].clone(), + src_inner_f[1].clone(), + fill_missing_source_fields, + ) + } + _ => false + } + } + (DataType::Struct(src_inner), DataType::Struct(dst_inner)) => { + return can_rewrite_fields(dst_inner, src_inner, fill_missing_source_fields); + } + (_src, _dest) => { + error!( + target: "deep", + " can_rewrite_field: Unhandled src dest field: src {}={:?}, dst {}={:?}", + src_field.name(), + src_field.data_type(), + dst_field.name(), + dst_field.data_type() + ); + false + }, + } +} + +/// Deep projections are represented using a HashMap> +/// for backwards compatibility (current projections are represented using a [`Vec`] +/// Currently, deep projections are represented (even if there's some duplicated information as a +/// [`HashMap>`] +/// the key is the source field id of the top-level field +/// the value is a list of "paths" inside the top-level field +/// Examples: +/// Scalar fields - no representations of paths inside the field possible +/// List fields - same thing +/// List> - possible paths may be "*.id", "*.name", "*.address" +/// List>> +/// - possible paths may be "*.*", "*.*.*.id", "*.*.*.name", "*.*.*.address" +pub fn has_deep_projection(possible: Option<&HashMap>>) -> bool { + if possible.is_none() { + return false; + } + let tmp = possible.unwrap(); + !(tmp.is_empty() || tmp.iter().all(|(k, v)| v.len() == 0)) +} + +/// Combines the current projection (numeric indices of top-level columns) with +/// the deep projection - "paths" inside a top-level column +pub fn splat_columns( + src: SchemaRef, + projection: &Vec, + projection_deep: &HashMap>, +) -> Vec { + let mut out: Vec = vec![]; + for pi in projection.iter() { + let f = src.field(*pi); + match projection_deep.get(pi) { + None => { + out.push(f.name().to_owned()); + } + Some(rests) => { + if rests.len() > 0 { + for rest in rests.iter() { + match f.data_type() { + _ => out.push(format!("{}.{}", f.name(), rest)), + } + } + } else { + out.push(f.name().to_owned()); + } + } + } + } + out +} + +pub fn try_rewrite_schema_opt( + src: SchemaRef, + projection_opt: Option<&Vec>, + projection_deep_opt: Option<&HashMap>>, +) -> crate::Result { + match projection_opt { + None => Ok(src), + Some(projection) => match projection_deep_opt { + None => project_schema(&src, projection_opt), + Some(projection_deep) => Ok(rewrite_schema(src, projection, projection_deep)), + }, + } +} + +pub fn rewrite_field_projection( + src: SchemaRef, + projected_field_idx: usize, + projection_deep: &HashMap>, +) -> FieldRef { + let original_field = Arc::new(src.field(projected_field_idx).clone()); + let single_field_schema = Arc::new(Schema::new(vec![original_field])); + // rewrite projection, deep projection to use 0 + let projected_vec = vec![0]; + let mut projected_deep_vec = HashMap::new(); + let empty_vec: Vec = vec![]; + projected_deep_vec.insert( + 0 as usize, + projection_deep + .get(&projected_field_idx) + .unwrap_or(&empty_vec) + .clone(), + ); + + let rewritten_single_field_schema = + rewrite_schema(single_field_schema, &projected_vec, &projected_deep_vec); + Arc::new(rewritten_single_field_schema.field(0).clone()) +} + +fn make_path(parent: &String, name: &str) -> String { + if parent == "" { + return name.to_owned(); + } else { + return format!("{}.{}", parent, name); + } +} + +fn path_prefix_exists(filters: &Vec, path: &String) -> bool { + filters.iter().any(|f| { + let tmp = f.find(path); + tmp.is_some() && tmp.unwrap() == 0 + }) +} + +fn path_included(filters: &Vec, path: &String) -> bool { + filters.iter().any(|f| { + let tmp = path.find(f); + tmp.is_some() && tmp.unwrap() == 0 + }) +} + +pub fn fix_possible_field_accesses(schema: &DFSchemaRef, field_idx: usize, rest: &Vec) -> crate::Result> { + let mut field = Arc::new(schema.field(field_idx).clone()); + let mut rest_idx = 0 as usize; + let mut out = rest.clone(); + while rest_idx < out.len() { + let (fix_non_star_access, should_continue, new_field) = match field.data_type() { + DataType::Null | DataType::Boolean | DataType::Int8 | DataType::Int16 | DataType::Int32 | + DataType::Int64 | DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 | + DataType::Float16 | DataType::Float32 | DataType::Float64 | DataType::Timestamp(_, _) | + DataType::Date32 | DataType::Date64 | DataType::Time32(_) | DataType::Time64(_) | DataType::Duration(_) | + DataType::Interval(_) | DataType::Binary | DataType::FixedSizeBinary(_) | + DataType::LargeBinary | DataType::BinaryView | DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View | + DataType::Dictionary(_, _) | DataType::Decimal128(_, _) | DataType::Decimal256(_, _) | + DataType::RunEndEncoded(_, _)=> { + (false, false, None) + } + DataType::Union(_, _) => { + // FIXME @HStack + // don't know what to do here + (false, false, None) + } + DataType::List(inner) | DataType::ListView(inner) | + DataType::FixedSizeList(inner, _) | DataType::LargeList(inner) | + DataType::LargeListView(inner) => { + let new_field = inner.clone(); + (true, true, Some(new_field)) + } + DataType::Struct(inner_struct) => { + let mut new_field: Option = None; + for f in inner_struct.iter() { + if f.name() == &out[rest_idx] { + new_field = Some(f.clone()); + } + } + (false, true, new_field) + } + DataType::Map(inner_map, _) => { + let mut new_field: Option = None; + match inner_map.data_type() { + DataType::Struct(inner_map_struct) => { + new_field = Some(inner_map_struct[1].clone()); + } + _ => { + return Err(DataFusionError::Internal(String::from("Invalid inner map type"))); + } + } + (true, true, new_field) + } + }; + if fix_non_star_access && rest[rest_idx] != "*" { + out[rest_idx] = "*".to_string(); + } + if !should_continue { + break; + } + field = new_field.unwrap(); + rest_idx += 1; + } + Ok(out) +} + +pub fn rewrite_schema( + src: SchemaRef, + projection: &Vec, + projection_deep: &HashMap>, +) -> SchemaRef { + fn rewrite_schema_fields( + parent: String, + src_fields: &Fields, + filters: &Vec, + ) -> Vec { + let mut out_fields: Vec = vec![]; + for i in 0..src_fields.len() { + let src_field = src_fields[i].clone(); + let src_field_path = make_path(&parent, src_field.name()); + + let field_path_included = path_included(filters, &src_field_path); //filters.contains(&src_field_path); + if field_path_included { + out_fields.push(src_field.clone()); + } else { + if data_type_recurs(src_field.data_type()) + && path_prefix_exists(filters, &src_field_path) + { + match rewrite_schema_field(parent.clone(), src_field, filters) { + None => {} + Some(f) => out_fields.push(f), + } + } + } + } + out_fields + } + + fn rewrite_schema_field( + parent: String, + src_field: FieldRef, + filters: &Vec, + ) -> Option { + let src_field_name = src_field.name(); + // FIXME: @HStack + // if we already navigated to this field and the accessor is "*" + // that means we don't care about the field name + // RETEST THIS for lists + let src_field_path = if parent.len() > 0 && parent.chars().last().unwrap() == '*' as char { + parent.clone() + } else { + make_path(&parent, src_field_name) + }; + trace!(target:"deep", "rewrite field: {} = {} ({:?})", src_field_name, src_field_path, filters); + + let field_path_included = path_included(filters, &src_field_path); //filters.contains(&src_field_path); + if field_path_included { + trace!(target:"deep", " return {} directly ", src_field_path); + return Some(src_field.clone()); + } else { + if data_type_recurs(src_field.data_type()) + && path_prefix_exists(filters, &src_field_path) + { + let out = match src_field.data_type() { + DataType::List(src_inner) => { + rewrite_schema_field( + make_path(&src_field_path, "*"), + src_inner.clone(), + filters, + ) + .map(|inner| { + trace!(target:"deep", "return new list {} = {:#?}", src_field_name.clone(), inner.clone()); + Arc::new(Field::new_list( + src_field.name(), + inner, + src_field.is_nullable(), + )) + }) + } + DataType::FixedSizeList(src_inner, src_sz) => rewrite_schema_field( + make_path(&src_field_path, "*"), + src_inner.clone(), + filters, + ) + .map(|inner| { + Arc::new(Field::new_fixed_size_list( + src_field.name(), + inner, + *src_sz, + src_field.is_nullable(), + )) + }), + DataType::LargeList(src_inner) => rewrite_schema_field( + make_path(&src_field_path, "*"), + src_inner.clone(), + filters, + ) + .map(|inner| { + Arc::new(Field::new_large_list( + src_field.name(), + inner, + src_field.is_nullable(), + )) + }), + + DataType::Map(map_entry, map_sorted) => { + if let DataType::Struct(map_entry_fields) = map_entry.data_type() + { + let map_key_field = map_entry_fields.get(0).unwrap(); + let map_value_field = map_entry_fields.get(1).unwrap(); + rewrite_schema_field( + make_path(&src_field_path, "*"), + map_value_field.clone(), + filters, + ) + .map(|inner| { + Arc::new(Field::new_map( + src_field_name, + map_entry.name().clone(), + map_key_field.clone(), + inner, + *map_sorted, + src_field.is_nullable(), + )) + }) + } else { + panic!("Invalid internal field map: expected struct, but got {}", map_entry.data_type()); + } + } + + DataType::Struct(src_inner) => { + let dst_fields = + rewrite_schema_fields(src_field_path.clone(), src_inner, filters); + trace!(target:"deep", "for struct: {} {} = {:#?}", src_field_name, src_field_path.clone(), dst_fields); + if dst_fields.len() > 0 { + Some(Arc::new(Field::new_struct( + src_field.name(), + dst_fields, + src_field.is_nullable(), + ))) + } else { + None + } + } + x => { + panic!("Unhandled data type: {:#?}", x); + } + }; + out + } else { + None + } + } + } + + let actual_projection = if projection.len() == 0 { + (0..src.fields().len()).collect() + } else { + projection.clone() + }; + let splatted = splat_columns(src.clone(), &actual_projection, &projection_deep); + + // trace!(target:"deep", "rewrite_schema source: {:#?}", src); + trace!(target:"deep", "rewrite_schema splatted: {:?} {:?} = {:?}", &actual_projection, &projection_deep, splatted); + let mut dst_fields: Vec = vec![]; + for pi in actual_projection.iter() { + let src_field = src.field(*pi); + trace!(target:"deep", "rewrite_schema at field {}", src_field.name()); + let foutopt = + rewrite_schema_field("".to_string(), Arc::new(src_field.clone()), &splatted); + match foutopt { + None => {} + Some(fout) => { + dst_fields.push(fout.clone()); + } + } + } + + // let dst_fields = rewrite_fields("".to_string(), src.clone().fields(), &splatted); + // trace!(target:"deep", "rewrite_schema dst: {:#?}", dst_fields); + + let output = if dst_fields.len() > 0 { + Arc::new(Schema::new_with_metadata(dst_fields, src.metadata.clone())) + } else { + src.clone() + }; + + return output; +} + +pub fn debug_to_file(name: &str, contents: &str) { + let mut file = OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(name) + .unwrap(); + file.write_all(contents.as_bytes()).unwrap(); +} + +#[cfg(test)] +mod tests { + use crate::deep::can_rewrite_fields; + use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; + use std::sync::Arc; + + #[test] + fn test_cast() -> crate::error::Result<()> { + // source, destination, is_fill_dependent + let cases = vec![ + ( + Arc::new(Schema::new(vec![Field::new("i1", DataType::Int32, true)])), + Arc::new(Schema::new(vec![Field::new("i1", DataType::Int8, true)])), + false, + true, + ), + ( + Arc::new(Schema::new(vec![Field::new("i1", DataType::Int32, true)])), + Arc::new(Schema::new(vec![Field::new( + "i1", + DataType::Struct(Fields::from(vec![Field::new( + "s1", + DataType::Utf8, + true, + )])), + true, + )])), + false, + false, + ), + ( + Arc::new(Schema::new(vec![Field::new( + "l1", + DataType::List(Arc::new(Field::new( + "s1", + DataType::Struct(Fields::from(vec![ + Field::new("s1extra1", DataType::Utf8, true), + Field::new("s1extra2", DataType::Utf8, true), + Field::new("s1i2", DataType::Int32, true), + Field::new("s1s1", DataType::Utf8, true), + Field::new( + "s1m1", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct(Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Utf8, false), + ])), + true, + )), + false, + ), + true, + ), + Field::new( + "s1l1", + DataType::List(Arc::new(Field::new( + "s1l1i1", + DataType::Int32, + true, + ))), + true, + ), + ])), + true, + ))), + true, + )])), + Arc::new(Schema::new(vec![Field::new( + "l1", + DataType::List(Arc::new(Field::new( + "s1", + DataType::Struct(Fields::from(vec![ + Field::new("s1s1", DataType::Utf8, true), + Field::new("s1i2", DataType::Int32, true), + Field::new( + "s1m1", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct(Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Utf8, false), + ])), + true, + )), + false, + ), + true, + ), + Field::new( + "s1l1", + DataType::List(Arc::new(Field::new( + "s1l1i1", + DataType::Date32, + true, + ))), + true, + ), + // extra field + Field::new("s1ts1", DataType::Time32(TimeUnit::Second), true), + ])), + true, + ))), + true, + )])), + true, + true, + ), + ]; + for (from, to, can_fill, res) in cases.iter() { + assert_eq!( + can_rewrite_fields(from.fields(), to.fields(), *can_fill), + *res, + "Wrong result" + ); + } + Ok(()) + } +} diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 10541e01914a..ada69941c5f2 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -44,6 +44,7 @@ pub mod stats; pub mod test_util; pub mod tree_node; pub mod utils; +pub mod deep; /// Reexport arrow crate pub use arrow; diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 1c9924735735..b0376f83f283 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -31,6 +31,7 @@ pub mod memory; pub mod physical_plan; pub mod provider; pub mod schema_adapter; +pub mod schema_adapter_deep; mod statistics; pub mod stream; pub mod streaming; diff --git a/datafusion/core/src/datasource/schema_adapter_deep.rs b/datafusion/core/src/datasource/schema_adapter_deep.rs new file mode 100644 index 000000000000..fda63fd6bf91 --- /dev/null +++ b/datafusion/core/src/datasource/schema_adapter_deep.rs @@ -0,0 +1,658 @@ +use crate::datasource::schema_adapter::{SchemaAdapter, SchemaMapper}; +use arrow_array::RecordBatch; +use arrow_schema::{Fields, Schema, SchemaRef}; +use datafusion_common::deep::{can_rewrite_field, try_rewrite_record_batch, try_rewrite_record_batch_with_mappings}; +use datafusion_common::plan_err; +use std::sync::Arc; +use log::trace; + +impl NestedSchemaAdapter { + fn map_schema_nested( + &self, + fields: &Fields, + ) -> datafusion_common::Result<(Arc, Vec)> { + let mut projection = Vec::with_capacity(fields.len()); + let mut field_mappings = vec![None; self.table_schema.fields().len()]; + + // start from the destination fields + for (table_idx, table_field) in self.table_schema.fields.iter().enumerate() { + // if the file exists in the source, check if we can rewrite it to the destination, + // and add it to the projections + if let Some((file_idx, file_field)) = fields.find(table_field.name()) { + if can_rewrite_field(table_field.clone(), file_field.clone(), true) { + field_mappings[table_idx] = Some(projection.len()); + projection.push(file_idx); + } else { + return plan_err!( + "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", + file_field.name(), + file_field.data_type(), + table_field.data_type() + ); + } + } + } + Ok(( + Arc::new(NestedSchemaMapping { + table_schema: self.table_schema.clone(), + field_mappings + }), + projection, + )) + } +} + +#[derive(Clone, Debug)] +pub(crate) struct NestedSchemaAdapter { + /// Schema for the table + pub table_schema: SchemaRef, +} + +impl SchemaAdapter for NestedSchemaAdapter { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { + let field = self.table_schema.field(index); + Some(file_schema.fields.find(field.name())?.0) + } + + fn map_schema( + &self, + file_schema: &Schema, + ) -> datafusion_common::Result<(Arc, Vec)> { + // self.map_schema_nested(file_schema.fields()) + // .map(|(s, v)| (s as Arc, v)) + trace!(target: "deep", "map_schema: file_schema: {:#?}", file_schema); + trace!(target: "deep", "map_schema: table_schema: {:#?}", self.table_schema); + let mut projection = Vec::with_capacity(file_schema.fields().len()); + let mut field_mappings = vec![None; self.table_schema.fields().len()]; + + for (file_idx, file_field) in file_schema.fields.iter().enumerate() { + if let Some((table_idx, table_field)) = + self.table_schema.fields().find(file_field.name()) + { + match can_rewrite_field(table_field.clone(), file_field.clone(), false) { + true => { + field_mappings[table_idx] = Some(projection.len()); + projection.push(file_idx); + } + false => { + return plan_err!( + "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", + file_field.name(), + file_field.data_type(), + table_field.data_type() + ) + } + } + } + } + + Ok(( + Arc::new(NestedSchemaMapping { + table_schema: self.table_schema.clone(), + field_mappings, + }), + projection, + )) + } +} + +#[derive(Debug)] +pub struct NestedSchemaMapping { + table_schema: SchemaRef, + field_mappings: Vec>, +} + +impl SchemaMapper for NestedSchemaMapping { + fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { + let record_batch = try_rewrite_record_batch_with_mappings( + batch.schema(), + batch, + self.table_schema.clone(), + // FIXME: @HStack ADR: will this break delta tests ? + // There are some cases + self.field_mappings.clone(), + )?; + Ok(record_batch) + } + + fn map_partial_batch( + &self, + batch: RecordBatch, + ) -> datafusion_common::Result { + try_rewrite_record_batch( + batch.schema().clone(), + batch, + self.table_schema.clone(), + false, + false, + ) + } +} + +#[cfg(test)] +mod tests { + use crate::dataframe::DataFrame; + use crate::datasource::MemTable; + use crate::prelude::SessionContext; + use arrow_array::builder::{ + ArrayBuilder, BooleanBuilder, GenericStringBuilder, Int32Builder, ListBuilder, + StringBuilder, StructBuilder, UInt32Builder, + }; + use arrow_array::{BooleanArray, RecordBatch, StringArray, StructArray, UInt32Array}; + use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; + use datafusion_common::deep::{ + rewrite_schema, try_rewrite_record_batch, + }; + use datafusion_optimizer::optimize_projections::OptimizeProjections; + use datafusion_optimizer::{Optimizer, OptimizerContext}; + use datafusion_physical_plan::get_plan_string; + use log::info; + use parquet::arrow::parquet_to_arrow_schema; + use parquet::schema::parser::parse_message_type; + use parquet::schema::types::SchemaDescriptor; + use std::collections::HashMap; + use std::sync::Arc; + use arrow::util::pretty::print_batches; + + #[tokio::test] + async fn test_rewrite_schema() -> crate::error::Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("i1", DataType::Int32, true), + Field::new( + "l1", + DataType::List(Arc::new(Field::new( + "s1", + DataType::Struct(Fields::from(vec![ + Field::new("s1s1", DataType::Utf8, true), + Field::new("s1i2", DataType::Int32, true), + Field::new( + "s1m1", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct(Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Utf8, false), + ])), + true, + )), + false, + ), + true, + ), + Field::new( + "s1l1", + DataType::List(Arc::new(Field::new( + "s1l1i1", + DataType::Date32, + true, + ))), + true, + ), + // extra field + Field::new("s1ts1", DataType::Time32(TimeUnit::Second), true), + ])), + true, + ))), + true, + ), + ])); + let out = rewrite_schema( + schema, + &vec![1], + &HashMap::from([ + (0, vec![]), + (1, vec!["*.s1s1".to_string(), "*.s1l1".to_string()]), + ]), + ); + // info!("out: {:#?}", out); + Ok(()) + } + + #[tokio::test] + async fn test_rewrite() -> crate::error::Result<()> { + let _ = env_logger::try_init(); + + let message_type = " + message schema { + REQUIRED INT32 int1; + OPTIONAL INT32 int2; + REQUIRED BYTE_ARRAY str1 (UTF8); + OPTIONAL GROUP stringlist1 (LIST) { + repeated group list { + optional BYTE_ARRAY element (UTF8); + } + } + OPTIONAL group map1 (MAP) { + REPEATED group map { + REQUIRED binary str (UTF8); + REQUIRED int32 num; + } + } + OPTIONAL GROUP array_of_arrays (LIST) { + REPEATED GROUP list { + REQUIRED GROUP element (LIST) { + REPEATED GROUP list { + REQUIRED INT32 element; + } + } + } + } + REQUIRED GROUP array_of_struct (LIST) { + REPEATED GROUP struct { + REQUIRED BOOLEAN bools; + REQUIRED INT32 uint32 (INTEGER(32,false)); + REQUIRED GROUP int32 (LIST) { + REPEATED GROUP list { + OPTIONAL INT32 element; + } + } + } + } + } + "; + let message_type = r#" + message schema { + REQUIRED GROUP struct { + REQUIRED BINARY name (UTF8); + REQUIRED BOOLEAN bools; + REQUIRED INT32 uint32 (INTEGER(32,false)); + REQUIRED GROUP tags (LIST) { + REPEATED GROUP tags { + OPTIONAL BINARY tag (UTF8); + } + } + } + } + "#; + let parquet_schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + + let arrow_schema = + Arc::new(parquet_to_arrow_schema(parquet_schema.as_ref(), None).unwrap()); + // println!("schema: {:#?}", arrow_schema); + let (_idx, ffield) = arrow_schema.fields().find("struct").unwrap(); + let struct_field = ffield.clone(); + let struct_fields = match struct_field.data_type() { + DataType::Struct(fields) => Some(fields), + _ => None, + } + .unwrap(); + println!("struct fields: {:#?}", struct_fields); + + let elem_builder: GenericStringBuilder = GenericStringBuilder::new(); + let mut expected_builder = ListBuilder::new(elem_builder).with_field(Field::new( + "tag", + DataType::Utf8, + true, + )); + expected_builder.values().append_value("foo"); + expected_builder.values().append_value("bar"); + expected_builder.append(true); + expected_builder.values().append_value("bar"); + expected_builder.values().append_value("foo"); + expected_builder.append(true); + let expected = expected_builder.finish(); + let struct_column = StructArray::new( + struct_fields.clone(), + vec![ + Arc::new(StringArray::from(vec!["name1", "name2"])), + Arc::new(BooleanArray::from(vec![true, false])), + Arc::new(UInt32Array::from(vec![1, 2])), + Arc::new(expected), + ], + None, + ); + let record_batch = + RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(struct_column)]) + .unwrap(); + // println!("rb: {:#?}", record_batch); + + let message_type = r#" + message schema { + REQUIRED GROUP struct { + REQUIRED GROUP tags (LIST) { + REPEATED GROUP tags { + OPTIONAL BINARY tag (UTF8); + } + } + } + } + "#; + let parquet_schema_2 = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + let arrow_schema_2 = + Arc::new(parquet_to_arrow_schema(parquet_schema_2.as_ref(), None).unwrap()); + println!("arrow_schema_2: {:#?}", arrow_schema_2); + let new_rb = try_rewrite_record_batch( + arrow_schema.clone(), + record_batch, + arrow_schema_2.clone(), + true, + false, + ) + .unwrap(); + println!("new_rb: {:#?}", new_rb); + + Ok(()) + } + + pub fn logical_plan_str(dataframe: &DataFrame) -> String { + let cl = dataframe.clone(); + let op = cl.into_optimized_plan().unwrap(); + format!("{}", op.display_indent()) + } + + pub async fn physical_plan_str(dataframe: &DataFrame) -> String { + let cl = dataframe.clone(); + let pp = cl.create_physical_plan().await.unwrap(); + get_plan_string(&pp).join("\n") + } + + #[tokio::test] + async fn test_deep_schema() -> crate::error::Result<()> { + let _ = env_logger::try_init(); + + let message_type = r#" + message schema { + REQUIRED INT32 id; + REQUIRED GROUP struct1 { + REQUIRED BINARY name (UTF8); + REQUIRED BOOLEAN bools; + REQUIRED INT32 uint32 (INTEGER(32,false)); + REQUIRED GROUP tags (LIST) { + REPEATED GROUP tags { + OPTIONAL BINARY tag (UTF8); + } + } + } + OPTIONAL GROUP list_struct (LIST) { + REPEATED GROUP struct { + REQUIRED BOOLEAN bools; + REQUIRED INT32 uint32 (INTEGER(32,false)); + REQUIRED GROUP int32 (LIST) { + REPEATED GROUP list { + OPTIONAL INT32 element; + } + } + } + } + OPTIONAL GROUP struct_list { + REQUIRED BOOLEAN bools; + REQUIRED INT32 uint32 (INTEGER(32,false)); + REQUIRED GROUP products (LIST) { + REPEATED GROUP product { + OPTIONAL INT32 qty; + OPTIONAL binary name(utf8); + } + } + } + } + "#; + let parquet_schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + {} + // return Ok(()); + + let complete_schema = + Arc::new(parquet_to_arrow_schema(parquet_schema.as_ref(), None).unwrap()); + // info!("schema: {:#?}", complete_schema.clone()); + // { + // let kk = generate_leaf_paths( + // complete_schema, + // parquet_schema.as_ref(), + // &vec![1, 2], + // &HashMap::from([ + // (1 as usize, vec!["name".to_string(), "tags".to_string()]) + // ]) + // ); + // info!("kk: {:#?}", kk); + // } + // return Ok(()); + + let ctx = SessionContext::new(); + + let schema_fields = complete_schema.fields().clone(); + let mut row_builder = StructBuilder::from_fields(schema_fields, 1); + + // field 0 + let f0_builder = row_builder.field_builder::(0).unwrap(); + f0_builder.append_value(1); + let f0_arr = f0_builder.finish(); + + // field 1 + let f1_builder = row_builder.field_builder::(1).unwrap(); + + // tbl.struct.name + { + let f1_name_builder = f1_builder.field_builder::(0).unwrap(); + f1_name_builder.append_value("n1"); + } + // tbl.struct.bools + { + let f1_bools_builder = f1_builder.field_builder::(1).unwrap(); + f1_bools_builder.append_value(true); + } + // tbl.struct.uint32 + let f1_uint32_builder = f1_builder.field_builder::(2).unwrap(); + f1_uint32_builder.append_value(1); + // tbl.struct.tags + let f1_tags_list_builder = f1_builder + .field_builder::>>(3) + .unwrap(); + let f1_tags_item_builder = f1_tags_list_builder + .values() + .as_any_mut() + .downcast_mut::() + .unwrap(); + f1_tags_item_builder.append_value("t1"); + f1_tags_item_builder.append_value("t2"); + f1_tags_list_builder.append(true); + + f1_builder.append(true); + + let f1_arr = f1_builder.finish(); + // field 2 + // make_array( + // named_struct( + // 'bools', false, + // 'uint32', 5, + // 'int32', make_array(10, 20) + // ) + // ), + let f2_builder = row_builder + .field_builder::>>(2) + .unwrap(); + let f2_item_builder = f2_builder + .values() + .as_any_mut() + .downcast_mut::() + .unwrap(); + + //tbl.list_struct[].bools + let f2_item_bools_builder = + f2_item_builder.field_builder::(0).unwrap(); + f2_item_bools_builder.append_value(true); + // tbl.list_struct[].uint32 + let f2_item_uint32_builder = + f2_item_builder.field_builder::(1).unwrap(); + f2_item_uint32_builder.append_value(5); + // tbl.list_struct[].uint32 + let f2_item_int32_list_builder = f2_item_builder + .field_builder::>>(2) + .unwrap(); + let f2_item_int32_item_builder = f2_item_int32_list_builder + .values() + .as_any_mut() + .downcast_mut::() + .unwrap(); + f2_item_int32_item_builder.append_values(&[10, 20], &[true, true]); + f2_item_int32_list_builder.append(true); + + f2_item_builder.append(true); + + f2_builder.append(true); + + let f2_arr = f2_builder.finish(); + + // field 3 + // named_struct( + // 'bools', true, + // 'uint32', 5, + // 'products', make_array( + // named_struct( + // 'qty', 1, + // 'name', 'product1' + // ), + // named_struct( + // 'qty', 2, + // 'name', 'product2' + // ) + // ) + // ) + let f3_builder = row_builder.field_builder::(3).unwrap(); + // tbl.named_struct.bools + let f3_bools_builder = f3_builder.field_builder::(0).unwrap(); + f3_bools_builder.append_value(true); + // tbl.named_struct.uint32 + let f3_uint32_builder = f3_builder.field_builder::(1).unwrap(); + f3_uint32_builder.append_value(5); + // tbl.named_struct.uint32 + let f3_products_builder = f3_builder + .field_builder::>>(2) + .unwrap(); + { + let f3_field_products_item_builder = f3_products_builder + .values() + .as_any_mut() + .downcast_mut::() + .unwrap(); + let qty_builder = f3_field_products_item_builder + .field_builder::(0) + .unwrap(); + qty_builder.append_value(1); + let name_builder = f3_field_products_item_builder + .field_builder::(1) + .unwrap(); + name_builder.append_value("product1"); + + f3_field_products_item_builder.append(true); + + let f3_field_products_item_builder = f3_products_builder + .values() + .as_any_mut() + .downcast_mut::() + .unwrap(); + let qty_builder = f3_field_products_item_builder + .field_builder::(0) + .unwrap(); + qty_builder.append_value(1); + let name_builder = f3_field_products_item_builder + .field_builder::(1) + .unwrap(); + name_builder.append_value("product1"); + f3_field_products_item_builder.append(true); + } + f3_products_builder.append(true); + f3_builder.append(true); + + let f3_arr = f3_builder.finish(); + + let row = StructArray::new( + complete_schema.fields.clone(), + vec![ + // 1 + Arc::new(f0_arr), + Arc::new(f1_arr), + Arc::new(f2_arr), + Arc::new(f3_arr), + ], + None, + ); + let initial_table = Arc::new(MemTable::try_new( + complete_schema.clone(), + vec![vec![RecordBatch::from(row)]], + )?); + + ctx.register_table("tbl", initial_table.clone()).unwrap(); + let df = ctx + .sql( + r#" + select + get_field(struct1, 'tags') as tags, + get_field(array_element(list_struct, 0), 'int32') as f2 + from + tbl; + "#, + ) + .await + .unwrap(); + + let df_plan = df.clone().logical_plan().clone(); + // info!("df_plan: {:?}", df_plan); + + let optimizer = Optimizer::with_rules(vec![Arc::new(OptimizeProjections::new())]); + let optimized_plan = + optimizer.optimize(df_plan, &OptimizerContext::new(), |_, _| {})?; + info!("df_plan: {:?}", optimized_plan); + + info!("logical = {}", logical_plan_str(&df)); + info!("physical = {}", physical_plan_str(&df).await); + df.show().await?; + Ok(()) + } + + #[tokio::test] + async fn test_deep_schema_2() -> crate::error::Result<()> { + let _ = env_logger::try_init(); + let ctx = SessionContext::new(); + + let dfr = ctx + .sql( + r#" + create external table + test + stored as parquet + location '/Users/adragomi/work/arrow/benchmark/profile_export_prod_delta/part-00001-1b493913-ef97-4da6-9f8c-da1506b378f1-c000.snappy.parquet' + "#, + ) + .await + .unwrap(); + + let df = ctx + .sql( + r#" + select + _change_type + from test + limit 10 + "#, + ) + .await + .unwrap(); + + let df_plan = df.clone().logical_plan().clone(); + // info!("df_plan: {:?}", df_plan); + + let optimizer = Optimizer::with_rules(vec![Arc::new(OptimizeProjections::new())]); + let optimized_plan = + optimizer.optimize(df_plan, &OptimizerContext::new(), |_, _| {})?; + info!("df_plan: {:?}", optimized_plan); + + // info!("logical = {}", logical_plan_str(&df)); + // info!("physical = {}", physical_plan_str(&df).await); + // df.show().await?; + let results = df + .collect() + .await?; + print_batches(results.as_slice()); + info!("results: {}", results.len()); + + Ok(()) + } + + +} From d262b5f4d6b37d8e42275027099db6a7cfde95d0 Mon Sep 17 00:00:00 2001 From: Andrei Dragomir Date: Wed, 31 Jul 2024 15:11:42 +0300 Subject: [PATCH 3/4] [HSTACK] Deep column projection --- datafusion/catalog/src/table.rs | 23 +++ .../core/src/datasource/schema_adapter.rs | 5 +- .../src/datasource/schema_adapter_deep.rs | 6 +- datafusion/expr/Cargo.toml | 1 + datafusion/expr/src/logical_plan/plan.rs | 104 +++++++++++ datafusion/expr/src/logical_plan/tree_node.rs | 3 + datafusion/optimizer/Cargo.toml | 3 + .../optimizer/src/optimize_projections/mod.rs | 162 ++++++++++++++++-- .../optimize_projections/required_indices.rs | 102 ++++++++++- .../required_indices_deep.rs | 144 ++++++++++++++++ datafusion/optimizer/src/push_down_filter.rs | 3 + 11 files changed, 532 insertions(+), 24 deletions(-) create mode 100644 datafusion/optimizer/src/optimize_projections/required_indices_deep.rs diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 792315642a00..711806ea29bc 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -16,6 +16,7 @@ // under the License. use std::any::Any; +use std::collections::HashMap; use std::sync::Arc; use crate::session::Session; @@ -153,6 +154,28 @@ pub trait TableProvider: Sync + Send { limit: Option, ) -> Result>; + /// Create an [`ExecutionPlan`] with an extra parameter + /// specifying the deep column projections + /// # Deep column projection + /// + /// If specified, a datasource such as Parquet can do deep projection pushdown. + /// In the case of deeply nested schemas (lists in structs etc), the + /// implementation can return a smaller schema that rewrites the entire file + /// schema to return only the necessary fields, no matter where they are (top-level + /// or deep) + /// + async fn scan_deep( + &self, + state: &dyn Session, + projection: Option<&Vec>, + _projection_deep: Option<&HashMap>>, + filters: &[Expr], + limit: Option, + ) -> Result> { + self.scan(state, projection, filters, limit).await + } + + /// Specify if DataFusion should provide filter expressions to the /// TableProvider to apply *during* the scan. /// diff --git a/datafusion/core/src/datasource/schema_adapter.rs b/datafusion/core/src/datasource/schema_adapter.rs index 40cb40a83af2..e2b5353f0704 100644 --- a/datafusion/core/src/datasource/schema_adapter.rs +++ b/datafusion/core/src/datasource/schema_adapter.rs @@ -27,6 +27,7 @@ use arrow_schema::{Schema, SchemaRef}; use datafusion_common::plan_err; use std::fmt::Debug; use std::sync::Arc; +use crate::datasource::schema_adapter_deep::NestedSchemaAdapter; /// Factory for creating [`SchemaAdapter`] /// @@ -99,7 +100,9 @@ pub struct DefaultSchemaAdapterFactory {} impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { fn create(&self, table_schema: SchemaRef) -> Box { - Box::new(DefaultSchemaAdapter { table_schema }) + // FIX Deep schema mapping + Box::new(NestedSchemaAdapter { table_schema }) + // Box::new(DefaultSchemaAdapter { table_schema }) } } diff --git a/datafusion/core/src/datasource/schema_adapter_deep.rs b/datafusion/core/src/datasource/schema_adapter_deep.rs index fda63fd6bf91..68530129a664 100644 --- a/datafusion/core/src/datasource/schema_adapter_deep.rs +++ b/datafusion/core/src/datasource/schema_adapter_deep.rs @@ -69,7 +69,7 @@ impl SchemaAdapter for NestedSchemaAdapter { if let Some((table_idx, table_field)) = self.table_schema.fields().find(file_field.name()) { - match can_rewrite_field(table_field.clone(), file_field.clone(), false) { + match can_rewrite_field(table_field.clone(), file_field.clone(), true) { true => { field_mappings[table_idx] = Some(projection.len()); projection.push(file_idx); @@ -582,8 +582,8 @@ mod tests { .sql( r#" select - get_field(struct1, 'tags') as tags, - get_field(array_element(list_struct, 0), 'int32') as f2 + struct1['tags'] as tags, + list_struct[0]['int32'] as f2 from tbl; "#, diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index b5d34d9a3834..6c3ebfc4d37b 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -52,6 +52,7 @@ serde_json = { workspace = true } sqlparser = { workspace = true } strum = { version = "0.26.1", features = ["derive"] } strum_macros = "0.26.0" +log = "0.4.21" [dev-dependencies] ctor = { workspace = true } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index ca7d04b9b03e..b3a7f98d6a86 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -42,6 +42,7 @@ use crate::{ }; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use log::trace; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; use datafusion_common::{ aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints, @@ -54,6 +55,7 @@ use crate::display::PgJsonVisitor; use crate::logical_plan::tree_node::unwrap_arc; pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan}; pub use datafusion_common::{JoinConstraint, JoinType}; +use datafusion_common::deep::rewrite_schema; /// A `LogicalPlan` is a node in a tree of relational operators (such as /// Projection or Filter). @@ -2301,6 +2303,8 @@ pub struct TableScan { pub source: Arc, /// Optional column indices to use as a projection pub projection: Option>, + /// Optional column indices to use as a projection + pub projection_deep: Option>>, /// The schema description of the output pub projected_schema: DFSchemaRef, /// Optional expressions to be used as filters by the table provider @@ -2391,11 +2395,109 @@ impl TableScan { table_name, source: table_source, projection, + projection_deep: None, projected_schema, filters, fetch, }) } + + /// Initialize TableScan with appropriate schema from the given + /// arguments. + pub fn try_new_with_deep_projection( + table_name: impl Into, + table_source: Arc, + projection: Option>, + projection_deep: Option>>, + filters: Vec, + fetch: Option, + ) -> Result { + trace!(target: "deep", "TableScan::try_new_with_deep_projection: {:#?}, {:#?}", projection, projection_deep); + let table_name = table_name.into(); + + if table_name.table().is_empty() { + return plan_err!("table_name cannot be empty"); + } + let schema = table_source.schema(); + let func_dependencies = FunctionalDependencies::new_from_constraints( + table_source.constraints(), + schema.fields.len(), + ); + let projected_schema = projection + .as_ref() + .map(|p| { + let projected_func_dependencies = + func_dependencies.project_functional_dependencies(p, p.len()); + + let df_schema = DFSchema::new_with_metadata( + p.iter() + .map(|i| { + (Some(table_name.clone()), Arc::new(schema.field(*i).clone())) + }) + .collect(), + schema.metadata.clone(), + )?; + df_schema.with_functional_dependencies(projected_func_dependencies) + }) + .unwrap_or_else(|| { + let df_schema = + DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?; + df_schema.with_functional_dependencies(func_dependencies) + })?; + let mut projected_schema = Arc::new(projected_schema); + + // reproject for deep schema + if projection.is_some() && projection_deep.is_some() { + let projection_clone = projection.unwrap().clone(); + let projection_deep_clone = projection_deep.unwrap().clone(); + let mut new_projection_deep: HashMap> = HashMap::new(); + projection_clone + .iter().enumerate().for_each(|(ip, elp)| { + let empty_vec: Vec = vec![]; + let deep = projection_deep_clone.get(elp).or(Some(&empty_vec)).unwrap(); + new_projection_deep.insert(ip, deep.clone()); + }); + let new_projection = (0..projection_clone.len()).collect::>(); + let inner_projected_schema = projected_schema.inner().clone(); + let new_inner_projected_schema = rewrite_schema(inner_projected_schema, &new_projection, &new_projection_deep); + let mut new_projected_schema_df = DFSchema::new_with_metadata( + new_inner_projected_schema.fields().iter() + .map(|fi| { + (Some(table_name.clone()), fi.clone()) + }) + .collect(), + schema.metadata.clone(), + )?; + new_projected_schema_df = new_projected_schema_df + .with_functional_dependencies(projected_schema.functional_dependencies().clone())?; + projected_schema = Arc::new(new_projected_schema_df); + + Ok(Self { + table_name, + source: table_source, + projection: Some(projection_clone), + projection_deep: Some(projection_deep_clone), + projected_schema, + filters, + fetch, + }) + + } else { + Ok(Self { + table_name, + source: table_source, + projection, + projection_deep, + projected_schema, + filters, + fetch, + }) + + } + + } + + } /// Apply Cross Join to two logical plans @@ -3494,6 +3596,7 @@ digraph { table_name: TableReference::bare("tab"), source: Arc::clone(&source) as Arc, projection: None, + projection_deep: None, projected_schema: Arc::clone(&schema), filters: vec![], fetch: None, @@ -3524,6 +3627,7 @@ digraph { table_name: TableReference::bare("tab"), source, projection: None, + projection_deep: None, projected_schema: Arc::clone(&unique_schema), filters: vec![], fetch: None, diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 539cb1cf5fb2..88eeeb8982f2 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -691,9 +691,11 @@ impl LogicalPlan { table_name, source, projection, + projection_deep, projected_schema, filters, fetch, + .. }) => filters .into_iter() .map_until_stop_and_collect(f)? @@ -702,6 +704,7 @@ impl LogicalPlan { table_name, source, projection, + projection_deep, projected_schema, filters, fetch, diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 1a9e9630c076..a8868540aee0 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -46,6 +46,9 @@ chrono = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } datafusion-physical-expr = { workspace = true } +datafusion-functions = { workspace = true } +datafusion-functions-nested = { workspace = true } +datafusion-functions-aggregate = { workspace = true } hashbrown = { workspace = true } indexmap = { workspace = true } itertools = { workspace = true } diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index ac4ed87a4a1a..cb464d3b120b 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -18,8 +18,9 @@ //! [`OptimizeProjections`] identifies and eliminates unused columns mod required_indices; +mod required_indices_deep; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use crate::optimizer::ApplyOrder; @@ -42,6 +43,7 @@ use datafusion_common::tree_node::{ Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion, }; use datafusion_expr::logical_plan::tree_node::unwrap_arc; +use log::trace; /// Optimizer rule to prune unnecessary columns from intermediate schemas /// inside the [`LogicalPlan`]. This rule: @@ -249,21 +251,46 @@ fn optimize_projections( table_name, source, projection, + projection_deep, filters, fetch, - projected_schema: _, + projected_schema, } = table_scan; - // Get indices referred to in the original (schema with all fields) - // given projected indices. - let projection = match &projection { - Some(projection) => indices.into_mapped_indices(|idx| projection[idx]), - None => indices.into_inner(), + trace!(target: "deep", "optimize_projections TableScan: {:?} {:?} = {:#?}", projection, projection_deep, projected_schema.clone()); + + if false { + // Get indices referred to in the original (schema with all fields) + // given projected indices. + let projection = match &projection { + Some(projection) => indices.into_mapped_indices(|idx| projection[idx]), + None => indices.into_inner(), + }; + return TableScan::try_new( + table_name, + source, + Some(projection), + filters, + fetch, + ) + .map(LogicalPlan::TableScan) + .map(Transformed::yes); + } + + let (new_projection, new_projection_deep) = if projection.is_some() { + let projection_clone = projection.unwrap().clone(); + let projection_deep_clone = projection_deep.clone(); + indices.into_mapped_indices_deep(|idx| projection_clone[idx]) + } else { + indices.into_inner_deep() }; - return TableScan::try_new( + trace!(target: "deep", "optimize_projections new projection: {:#?}, {:#?}", new_projection, new_projection_deep); + + return TableScan::try_new_with_deep_projection( table_name, source, - Some(projection), + Some(new_projection), + Some(new_projection_deep), filters, fetch, ) @@ -605,6 +632,8 @@ fn rewrite_expr(expr: Expr, input: &Projection) -> Result> { }) } + + /// Accumulates outer-referenced columns by the /// given expression, `expr`. /// @@ -613,12 +642,12 @@ fn rewrite_expr(expr: Expr, input: &Projection) -> Result> { /// * `expr` - The expression to analyze for outer-referenced columns. /// * `columns` - A mutable reference to a `HashSet` where detected /// columns are collected. -fn outer_columns<'a>(expr: &'a Expr, columns: &mut HashSet<&'a Column>) { +fn outer_columns<'a>(expr: &'a Expr, columns: &mut HashMap>) { // inspect_expr_pre doesn't handle subquery references, so find them explicitly expr.apply(|expr| { match expr { Expr::OuterReferenceColumn(_, col) => { - columns.insert(col); + required_indices_deep::append_column(columns, &col, vec![]); } Expr::ScalarSubquery(subquery) => { outer_columns_helper_multi(&subquery.outer_ref_columns, columns); @@ -650,7 +679,7 @@ fn outer_columns<'a>(expr: &'a Expr, columns: &mut HashSet<&'a Column>) { /// columns are collected. fn outer_columns_helper_multi<'a, 'b>( exprs: impl IntoIterator, - columns: &'b mut HashSet<&'a Column>, + columns: &'b mut HashMap>, ) { exprs.into_iter().for_each(|e| outer_columns(e, columns)); } @@ -762,11 +791,15 @@ fn rewrite_projection_given_requirements( let exprs_used = indices.get_at_indices(&expr); let required_indices = - RequiredIndicies::new().with_exprs(input.schema(), exprs_used.iter())?; + indices.with_exprs_and_old_indices(input.schema(), &expr)?; + + trace!(target: "deep", "rewrite_projection_given_requirements: {:#?}", required_indices); + trace!(target: "deep", "rewrite_projection_given_requirements: {:#?}", input); // rewrite the children projection, and if they are changed rewrite the // projection down - optimize_projections(unwrap_arc(input), config, required_indices)?.transform_data( + let opt1 = optimize_projections(unwrap_arc(input), config, required_indices)?; + opt1.transform_data( |input| { if is_projection_unnecessary(&input, &exprs_used)? { Ok(Transformed::yes(input)) @@ -803,6 +836,7 @@ mod tests { }; use crate::{OptimizerContext, OptimizerRule}; use arrow::datatypes::{DataType, Field, Schema}; + use log::info; use datafusion_common::{ Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference, }; @@ -816,7 +850,9 @@ mod tests { logical_plan::{builder::LogicalPlanBuilder, table_scan}, not, try_cast, when, BinaryExpr, Expr, Extension, Like, LogicalPlan, Operator, Projection, UserDefinedLogicalNodeCore, WindowFunctionDefinition, + Literal, }; + use crate::optimize_projections::required_indices_deep::expr_to_deep_columns; use datafusion_functions_aggregate::count::count_udaf; use datafusion_functions_aggregate::expr_fn::{count, max, min}; @@ -1954,4 +1990,102 @@ mod tests { optimizer.optimize(plan, &OptimizerContext::new(), observe)?; Ok(optimized_plan) } + + #[test] + fn test_adr() -> Result<()> { + let tmp = datafusion_functions::expr_fn::get_field( + datafusion_functions::expr_fn::get_field( + col("aa"), + "bb" + ), + "cc" + ); + let kk = expr_to_deep_columns(&tmp); + info!("kk: {:#?}", kk); + + let tmp = + datafusion_functions::expr_fn::get_field( + datafusion_functions_nested::expr_fn::array_element( + col("list_struct"), + 0_i32.lit() + ), + "cc" + ) + ; + let kk = expr_to_deep_columns(&tmp); + info!("kk: {:#?}", kk); + + let tmp = datafusion_functions::expr_fn::nullif( + datafusion_functions::expr_fn::get_field( + datafusion_functions_nested::expr_fn::array_element( + col("list_struct"), + 0_i32.lit() + ), + "cc" + ), + datafusion_functions::expr_fn::get_field( + datafusion_functions::expr_fn::get_field( + col("othercol"), + "bb" + ), + "cc" + ) + ); + let kk = expr_to_deep_columns(&tmp); + info!("kk: {:#?}", kk); + + Ok(()) + } + + #[test] + fn test_deep_schema() -> Result<()> { + // shared join columns from using join should be pushed to both sides + + let table_scan = test_table_scan()?; + + let schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]); + let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .join_using(table2_scan, JoinType::Left, vec!["a"])? + .project(vec![col("a"), col("b")])? + .build()?; + + // make sure projections are pushed down to table scan + let expected = "Projection: test.a, test.b\ + \n Left Join: Using test.a = test2.a\ + \n TableScan: test projection=[a, b]\ + \n TableScan: test2 projection=[a]"; + + let optimized_plan = optimize(plan)?; + let formatted_plan = format!("{optimized_plan:?}"); + assert_eq!(formatted_plan, expected); + + // make sure schema for join node include both join columns + let optimized_join = optimized_plan.inputs()[0]; + assert_eq!( + **optimized_join.schema(), + DFSchema::new_with_metadata( + vec![ + ( + Some("test".into()), + Arc::new(Field::new("a", DataType::UInt32, false)) + ), + ( + Some("test".into()), + Arc::new(Field::new("b", DataType::UInt32, false)) + ), + ( + Some("test2".into()), + Arc::new(Field::new("a", DataType::UInt32, true)) + ), + ], + HashMap::new() + )?, + ); + + Ok(()) + } + + } diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs b/datafusion/optimizer/src/optimize_projections/required_indices.rs index a9a18898c82e..225d83394f19 100644 --- a/datafusion/optimizer/src/optimize_projections/required_indices.rs +++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs @@ -17,10 +17,14 @@ //! [`RequiredIndicies`] helper for OptimizeProjection +use std::collections::HashMap; +use log::trace; use crate::optimize_projections::outer_columns; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Column, DFSchemaRef, Result}; +use datafusion_common::deep::fix_possible_field_accesses; use datafusion_expr::{Expr, LogicalPlan}; +use crate::optimize_projections::required_indices_deep::{append_column, expr_to_deep_columns}; /// Represents columns in a schema which are required (used) by a plan node /// @@ -38,6 +42,8 @@ use datafusion_expr::{Expr, LogicalPlan}; pub(super) struct RequiredIndicies { /// The indices of the required columns in the indices: Vec, + /// The path to leaf fields in the specified columns + deep_indices: HashMap>, /// If putting a projection above children is beneficial for the parent. /// Defaults to false. projection_beneficial: bool, @@ -53,14 +59,17 @@ impl RequiredIndicies { pub fn new_for_all_exprs(plan: &LogicalPlan) -> Self { Self { indices: (0..plan.schema().fields().len()).collect(), + deep_indices: HashMap::new(), projection_beneficial: false, } } /// Create a new instance with the specified indices as required pub fn new_from_indices(indices: Vec) -> Self { + let indices_len = indices.len(); Self { indices, + deep_indices: HashMap::new(), projection_beneficial: false, } .compact() @@ -71,6 +80,11 @@ impl RequiredIndicies { self.indices } + /// Convert the instance to its inner indices + pub fn into_inner_deep(self) -> (Vec, HashMap>) { + (self.indices, self.deep_indices) + } + /// Set the projection beneficial flag pub fn with_projection_beneficial(mut self) -> Self { self.projection_beneficial = true; @@ -96,7 +110,7 @@ impl RequiredIndicies { // Add indices of the child fields referred to by the expressions in the // parent plan.apply_expressions(|e| { - self.add_expr(schema, e)?; + self.add_expr(schema, e, None)?; Ok(TreeNodeRecursion::Continue) })?; Ok(self.compact()) @@ -111,15 +125,37 @@ impl RequiredIndicies { /// /// * `input_schema`: The input schema to analyze for index requirements. /// * `expr`: An expression for which we want to find necessary field indices. - fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) -> Result<()> { + fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr, deep_indices: Option<&Vec>) -> Result<()> { // TODO could remove these clones (and visit the expression directly) - let mut cols = expr.column_refs(); + let mut cols = expr_to_deep_columns(expr); + trace!(target:"deep", "add_expr: {:#?}", cols); // Get outer-referenced (subquery) columns: outer_columns(expr, &mut cols); self.indices.reserve(cols.len()); - for col in cols { - if let Some(idx) = input_schema.maybe_index_of_column(col) { + for (col, rest) in cols { + if let Some(idx) = input_schema.maybe_index_of_column(&col) { + // get the rest and see whether the column type + // we iterate through the rest specifiers and we fix them + // that is, if we see something that looks like a get field, but we know the field in the schema + // is a map, that means that we need to replace it with * + // map_field['val'], projection_rest = ["val"] => projection_rest=["*"] + let mut new_rest: Vec = vec![]; + for tmp in &rest { + let tmp_pieces = tmp.split(".").map(|x|x.to_string()).collect::>(); + let fixed = fix_possible_field_accesses(&input_schema.clone(), idx, &tmp_pieces)?; + new_rest.push(fixed.join(".").to_string()); + } + trace!(target: "deep", "fix_possible_field_accesses {}: {:?} {:?}", &col.name, &rest, &new_rest); self.indices.push(idx); + if let Some(parent_deep_indices) = deep_indices { + for parent_index in parent_deep_indices { + let mut child_rest = new_rest.clone(); + child_rest.push(parent_index.clone()); + append_column::(&mut self.deep_indices, &idx, child_rest); + } + } else { + append_column::(&mut self.deep_indices, &idx, new_rest); + } } } Ok(()) @@ -140,7 +176,24 @@ impl RequiredIndicies { exprs .into_iter() .try_fold(self, |mut acc, expr| { - acc.add_expr(schema, expr)?; + acc.add_expr(schema, expr, None)?; + Ok(acc) + }) + .map(|acc| acc.compact()) + } + + pub fn with_exprs_and_old_indices( + self, + schema: &DFSchemaRef, + exprs: &[Expr], + ) -> Result { + let new_indices = RequiredIndicies::new(); + self + .indices + .iter() + .map(|&idx| (exprs[idx].clone(), self.deep_indices.get(&idx))) + .try_fold(new_indices, |mut acc, (expr, deep_indices)| { + acc.add_expr(schema, &expr, deep_indices)?; Ok(acc) }) .map(|acc| acc.compact()) @@ -168,15 +221,39 @@ impl RequiredIndicies { { let (l, r): (Vec, Vec) = self.indices.iter().partition(|&&idx| f(idx)); + + let mut ld: HashMap> = HashMap::new(); + let mut rd: HashMap> = HashMap::new(); + + + for idx in l.iter() { + match self.deep_indices.get(idx) { + None => {} + Some(xx) => { + ld.insert(*idx, xx.clone()); + } + }; + } + + for idx in r.iter() { + match self.deep_indices.get(idx) { + None => {} + Some(xx) => { + rd.insert(*idx, xx.clone()); + } + }; + } let projection_beneficial = self.projection_beneficial; ( Self { indices: l, + deep_indices: ld, projection_beneficial, }, Self { indices: r, + deep_indices: rd, projection_beneficial, }, ) @@ -191,6 +268,10 @@ impl RequiredIndicies { F: Fn(usize) -> usize, { self.indices.iter_mut().for_each(|idx| *idx = f(*idx)); + let reindex_deep: HashMap<_, _> = self.deep_indices.into_iter() + .map(|(k, v)| (f(k), v)) + .collect(); + self.deep_indices = reindex_deep; self } @@ -203,6 +284,15 @@ impl RequiredIndicies { self.map_indices(f).into_inner() } + /// Apply the given function `f` to each index in this instance, returning + /// the mapped indices + pub fn into_mapped_indices_deep(self, f: F) -> (Vec, HashMap>) + where + F: Fn(usize) -> usize, + { + self.map_indices(f).into_inner_deep() + } + /// Returns the `Expr`s from `exprs` that are at the indices in this instance pub fn get_at_indices(&self, exprs: &[Expr]) -> Vec { self.indices.iter().map(|&idx| exprs[idx].clone()).collect() diff --git a/datafusion/optimizer/src/optimize_projections/required_indices_deep.rs b/datafusion/optimizer/src/optimize_projections/required_indices_deep.rs new file mode 100644 index 000000000000..09242f843af6 --- /dev/null +++ b/datafusion/optimizer/src/optimize_projections/required_indices_deep.rs @@ -0,0 +1,144 @@ +use std::collections::{HashMap, VecDeque}; +use std::fmt::Debug; +use std::hash::Hash; +use datafusion_common::{Column, ScalarValue}; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; +use datafusion_expr::Expr; + +// pub fn append_column(acc: &mut HashMap>, column: &Column, rest: Vec) { +// info!("APPEND: {:?} = {:?}", column, rest); +// match acc.get_mut(column) { +// None => { +// let column_clone = column.clone(); +// if rest.len() > 0 { +// acc.insert(column_clone, vec![rest.join(".")]); +// } else { +// acc.insert(column_clone, vec![]); +// } +// } +// Some(cc) => { +// if rest.len() > 0 { +// cc.push(rest.join(".")); +// } +// } +// } +// } + +pub fn append_column(acc: &mut HashMap>, column: &T, rest: Vec) +where T: Debug + Clone + Eq + Hash { + match acc.get_mut(column) { + None => { + let column_clone = column.clone(); + if rest.len() > 0 { + acc.insert(column_clone, vec![rest.join(".")]); + } else { + acc.insert(column_clone, vec![]); + } + } + Some(cc) => { + if cc.len() == 0 { + // we already had this column in full + } else { + if rest.len() > 0 { + cc.push(rest.join(".")); + } else { + // we are getting the entire column, and we already had something + // we should delete everything + cc.clear(); + } + } + } + } +} + +pub fn expr_to_deep_columns(expr: &Expr) -> HashMap> { + let mut accum: HashMap> = HashMap::new(); + let mut field_accum: VecDeque = VecDeque::new(); + let mut in_make_struct_call: bool = false; + let _ = expr.apply(|expr| { + match expr { + Expr::Column(qc) => { + // @HStack FIXME: ADR: we should have a test case + // ignore deep columns if we have a in_make_struct_call + // case: struct(a, b, c)['col'] - we were getting 'col' in the accum stack + // FIXME Will this work for struct(get_field(a, 'substruct'))['col'] ????? + if in_make_struct_call { + field_accum.clear() + } + // at the end, unwind the field_accum and push all to accum + let mut tmp: Vec = vec![]; + // if we did't just save a "*" - which means the entire column + if !(field_accum.len() == 1 && field_accum.get(0).unwrap() == "*") { + for f in field_accum.iter().rev() { + tmp.push(f.to_owned()); + } + } + field_accum.clear(); + append_column::(&mut accum, qc, tmp); + } + Expr::ScalarFunction(sf) => { + // TODO what about maps ? what's the operator + match sf.name() { + "get_field" => { + // get field, append the second argument to the stack and continue + let literal_expr: String = match sf.args[1].clone() { + Expr::Literal(lit_expr) => match lit_expr { + ScalarValue::Utf8(str) => str.unwrap(), + _ => panic!() + } + _ => {panic!()} + }; + field_accum.push_back(literal_expr); + } + "array_element" => { + // We don't have the schema, but when splatting the column, we need to actually push the list inner field name here + field_accum.push_back("*".to_owned()); + } + "struct" => { + in_make_struct_call = true; + } + _ => {} + } + + } + // Use explicit pattern match instead of a default + // implementation, so that in the future if someone adds + // new Expr types, they will check here as well + Expr::Unnest(_) + | Expr::ScalarVariable(_, _) + | Expr::Alias(_) + | Expr::Literal(_) + | Expr::BinaryExpr { .. } + | Expr::Like { .. } + | Expr::SimilarTo { .. } + | Expr::Not(_) + | Expr::IsNotNull(_) + | Expr::IsNull(_) + | Expr::IsTrue(_) + | Expr::IsFalse(_) + | Expr::IsUnknown(_) + | Expr::IsNotTrue(_) + | Expr::IsNotFalse(_) + | Expr::IsNotUnknown(_) + | Expr::Negative(_) + | Expr::Between { .. } + | Expr::Case { .. } + | Expr::Cast { .. } + | Expr::TryCast { .. } + | Expr::Sort { .. } + | Expr::WindowFunction { .. } + | Expr::AggregateFunction { .. } + | Expr::GroupingSet(_) + | Expr::InList { .. } + | Expr::Exists { .. } + | Expr::InSubquery(_) + | Expr::ScalarSubquery(_) + | Expr::Wildcard { .. } + | Expr::Placeholder(_) + | Expr::OuterReferenceColumn { .. } => {} + } + Ok(TreeNodeRecursion::Continue) + }) + .map(|_| ()); + accum +} diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 8455919c35a8..0900c8902208 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -2442,6 +2442,7 @@ mod tests { (*test_provider.schema()).clone(), )?), projection: None, + projection_deep: None, source: Arc::new(test_provider), fetch: None, }); @@ -2514,6 +2515,7 @@ mod tests { (*test_provider.schema()).clone(), )?), projection: Some(vec![0]), + projection_deep: Some(HashMap::new()), source: Arc::new(test_provider), fetch: None, }); @@ -2543,6 +2545,7 @@ mod tests { (*test_provider.schema()).clone(), )?), projection: Some(vec![0]), + projection_deep: Some(HashMap::new()), source: Arc::new(test_provider), fetch: None, }); From 0a278aea53b2a0e7b27089b82d2dac7877f437a2 Mon Sep 17 00:00:00 2001 From: Andrei Dragomir Date: Tue, 23 Jul 2024 18:04:45 +0300 Subject: [PATCH 4/4] [HSTACK] Push projection_deep down the physical nodes path --- .../core/src/datasource/listing/table.rs | 94 +++++++++++ .../physical_plan/file_scan_config.rs | 31 +++- .../datasource/physical_plan/parquet/mod.rs | 25 ++- .../physical_plan/parquet/opener.rs | 153 +++++++++++++++++- datafusion/core/src/physical_planner.rs | 3 +- .../optimizer/src/optimize_projections/mod.rs | 2 +- datafusion/proto/proto/datafusion.proto | 2 + datafusion/proto/src/generated/pbjson.rs | 21 +++ datafusion/proto/src/generated/prost.rs | 3 + .../proto/src/physical_plan/from_proto.rs | 12 ++ .../proto/src/physical_plan/to_proto.rs | 10 +- .../tests/cases/roundtrip_physical_plan.rs | 3 + 12 files changed, 342 insertions(+), 17 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 89066d8234ac..e2d1b6dbc6d9 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -54,6 +54,7 @@ use datafusion_catalog::Session; use futures::{future, stream, StreamExt, TryStreamExt}; use itertools::Itertools; use object_store::ObjectStore; +use datafusion_common::deep::try_rewrite_schema_opt; /// Configuration for creating a [`ListingTable`] #[derive(Debug, Clone)] @@ -822,6 +823,99 @@ impl TableProvider for ListingTable { .await } + async fn scan_deep( + &self, + state: &dyn Session, + projection: Option<&Vec>, + projection_deep: Option<&HashMap>>, + filters: &[Expr], + limit: Option, + ) -> Result> { + let session_state = state.as_any().downcast_ref::().unwrap(); + let (mut partitioned_file_lists, statistics) = + self.list_files_for_scan(session_state, filters, limit).await?; + + // let projected_schema = project_schema(&self.schema(), projection)?; + let projected_schema = try_rewrite_schema_opt( + self.schema(), + projection, + projection_deep + )?; + + // if no files need to be read, return an `EmptyExec` + if partitioned_file_lists.is_empty() { + return Ok(Arc::new(EmptyExec::new(projected_schema))); + } + + let output_ordering = self.try_create_output_ordering()?; + match state + .config_options() + .execution + .split_file_groups_by_statistics + .then(|| { + output_ordering.first().map(|output_ordering| { + FileScanConfig::split_groups_by_statistics( + &self.table_schema, + &partitioned_file_lists, + output_ordering, + ) + }) + }) + .flatten() + { + Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"), + Some(Ok(new_groups)) => { + if new_groups.len() <= self.options.target_partitions { + partitioned_file_lists = new_groups; + } else { + log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered") + } + } + None => {} // no ordering required + }; + + // extract types of partition columns + let table_partition_cols = self + .options + .table_partition_cols + .iter() + .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone())) + .collect::>>()?; + + let filters = if let Some(expr) = conjunction(filters.to_vec()) { + // NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns. + let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?; + let filters = + create_physical_expr(&expr, &table_df_schema, state.execution_props())?; + Some(filters) + } else { + None + }; + + let object_store_url = if let Some(url) = self.table_paths.first() { + url.object_store() + } else { + return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))); + }; + + // create the execution plan + self.options + .format + .create_physical_plan( + session_state, + FileScanConfig::new(object_store_url, Arc::clone(&self.file_schema)) + .with_file_groups(partitioned_file_lists) + .with_statistics(statistics) + .with_projection(projection.cloned()) + .with_projection_deep(projection_deep.cloned()) + .with_limit(limit) + .with_output_ordering(output_ordering) + .with_table_partition_cols(table_partition_cols), + filters.as_ref(), + ) + .await + } + fn supports_filters_pushdown( &self, filters: &[&Expr], diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 34fb6226c1a2..04efa3e401c6 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -37,7 +37,8 @@ use datafusion_common::stats::Precision; use datafusion_common::{exec_err, ColumnStatistics, DataFusionError, Statistics}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; -use log::warn; +use log::{trace, warn}; +use datafusion_common::deep::rewrite_field_projection; /// Convert type to a type suitable for use as a [`ListingTable`] /// partition column. Returns `Dictionary(UInt16, val_type)`, which is @@ -121,6 +122,9 @@ pub struct FileScanConfig { /// Columns on which to project the data. Indexes that are higher than the /// number of columns of `file_schema` refer to `table_partition_cols`. pub projection: Option>, + /// Columns on which to project the data. Indexes that are higher than the + /// number of columns of `file_schema` refer to `table_partition_cols`. + pub projection_deep: Option>>, /// The maximum number of records to read from this plan. If `None`, /// all records after filtering are returned. pub limit: Option, @@ -149,6 +153,7 @@ impl FileScanConfig { file_groups: vec![], statistics, projection: None, + projection_deep: None, limit: None, table_partition_cols: vec![], output_ordering: vec![], @@ -167,6 +172,13 @@ impl FileScanConfig { self } + /// Set the projection of the files + pub fn with_projection_deep(mut self, projection_deep: Option>>) -> Self { + self.projection_deep = projection_deep; + self + } + + /// Set the limit of the files pub fn with_limit(mut self, limit: Option) -> Self { self.limit = limit; @@ -232,8 +244,21 @@ impl FileScanConfig { let mut table_cols_stats = vec![]; for idx in proj_iter { if idx < self.file_schema.fields().len() { - let field = self.file_schema.field(idx); - table_fields.push(field.clone()); + + let output_field = match &self.projection_deep { + None => { + self.file_schema.field(idx).clone() + } + Some(projection_deep) => { + trace!("FileScanConfig::project DEEP PROJECT"); + let field_arc = Arc::new(self.file_schema.field(idx).clone()); + let rewritten_field_arc = rewrite_field_projection(self.file_schema.clone(), idx, &projection_deep); + trace!("FileScanConfig::project DEEP PROJECT {:#?}", rewritten_field_arc); + rewritten_field_arc.as_ref().clone() + } + }; + + table_fields.push(output_field); table_cols_stats.push(self.statistics.column_statistics[idx].clone()) } else { let partition_idx = idx - self.file_schema.fields().len(); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 85d6f8db2373..4d428013084f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -18,6 +18,7 @@ //! [`ParquetExec`] Execution plan for reading Parquet files use std::any::Any; +use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; @@ -43,7 +44,7 @@ use arrow::datatypes::SchemaRef; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr}; use itertools::Itertools; -use log::debug; +use log::{debug, info, trace}; mod access_plan; mod metrics; @@ -374,8 +375,8 @@ impl ParquetExecBuilder { } = self; let base_config = file_scan_config; - debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}", - base_config.file_groups, base_config.projection, predicate, base_config.limit); + info!("Creating ParquetExec, files: {:?}, projection {:?}, projection deep {:?}, predicate: {:?}, limit: {:?}", + base_config.file_groups, base_config.projection, base_config.projection_deep, predicate, base_config.limit); let metrics = ExecutionPlanMetricsSet::new(); let predicate_creation_errors = @@ -689,6 +690,23 @@ impl ExecutionPlan for ParquetExec { Some(proj) => proj, None => (0..self.base_config.file_schema.fields().len()).collect(), }; + let projection_deep = match &self.base_config.projection_deep { + None => {HashMap::new()} + Some(pd) => { + let mut out: HashMap> = HashMap::new(); + for npi in &projection { + match pd.get(npi) { + None => {} + Some(v) => { + out.insert(*npi, v.clone()); + } + } + + } + out + } + }; + trace!("ParquetExec::execute projection={:#?}, projection_deep={:#?}", &projection, &projection_deep); let parquet_file_reader_factory = self .parquet_file_reader_factory @@ -711,6 +729,7 @@ impl ExecutionPlan for ParquetExec { let opener = ParquetOpener { partition_index, projection: Arc::from(projection), + projection_deep: Arc::new(projection_deep), batch_size: ctx.session_config().batch_size(), limit: self.base_config.limit, predicate: self.predicate.clone(), diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index 4edc0ac525de..2dd4965b31d2 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -17,7 +17,8 @@ //! [`ParquetOpener`] for opening Parquet files -use crate::datasource::file_format::transform_schema_to_view; +use std::cmp::min; +use std::collections::HashMap; use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter; use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter; use crate::datasource::physical_plan::parquet::{ @@ -33,16 +34,21 @@ use datafusion_common::{exec_err, Result}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{StreamExt, TryStreamExt}; -use log::debug; +use log::{debug, info, trace}; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use std::sync::Arc; +use parquet::schema::types::SchemaDescriptor; +// use datafusion_common::DataFusionError; +use datafusion_common::deep::{has_deep_projection, rewrite_schema, splat_columns}; +use crate::datasource::file_format::transform_schema_to_view; /// Implements [`FileOpener`] for a parquet file pub(super) struct ParquetOpener { pub partition_index: usize, pub projection: Arc<[usize]>, + pub projection_deep: Arc>>, pub batch_size: usize, pub limit: Option, pub predicate: Option>, @@ -78,7 +84,20 @@ impl FileOpener for ParquetOpener { let batch_size = self.batch_size; let projection = self.projection.clone(); - let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); + let projection_vec = projection.as_ref().iter().map(|i| *i).collect::>(); + debug!("ParquetOpener::open projection={:?}", projection); + // FIXME @HStack: ADR: why do we need to do this ? our function needs another param maybe ? + // In the case when the projections requested are empty, we should return an empty schema + let projected_schema = if projection_vec.len() == 0 { + SchemaRef::from(self.table_schema.project(&projection)?) + } else { + rewrite_schema( + self.table_schema.clone(), + &projection_vec, + self.projection_deep.as_ref() + ) + }; + let projection_deep = self.projection_deep.clone(); let schema_adapter = self.schema_adapter_factory.create(projected_schema); let predicate = self.predicate.clone(); let pruning_predicate = self.pruning_predicate.clone(); @@ -94,6 +113,7 @@ impl FileOpener for ParquetOpener { let limit = self.limit; let schema_force_string_view = self.schema_force_string_view; + Ok(Box::pin(async move { let options = ArrowReaderOptions::new().with_page_index(enable_page_index); @@ -119,11 +139,32 @@ impl FileOpener for ParquetOpener { let (schema_mapping, adapted_projections) = schema_adapter.map_schema(&file_schema)?; - let mask = ProjectionMask::roots( - builder.parquet_schema(), - adapted_projections.iter().cloned(), - ); - + // let mask = ProjectionMask::roots( + // builder.parquet_schema(), + // adapted_projections.iter().cloned(), + // ); + let mask = if has_deep_projection(Some(projection_deep.clone().as_ref())) { + let leaves = generate_leaf_paths( + table_schema.clone(), + builder.parquet_schema(), + &projection_vec, + projection_deep.clone().as_ref() + ); + info!("ParquetOpener::open, using deep projection parquet leaves: {:?}", leaves.clone()); + // let tmp = builder.parquet_schema(); + // for (i, col) in tmp.columns().iter().enumerate() { + // info!(" {} {}= {:?}", i, col.path(), col); + // } + ProjectionMask::leaves( + builder.parquet_schema(), + leaves, + ) + } else { + ProjectionMask::roots( + builder.parquet_schema(), + adapted_projections.iter().cloned(), + ) + }; // Filter pushdown: evaluate predicates during scan if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { let row_filter = row_filter::build_row_filter( @@ -263,3 +304,99 @@ fn create_initial_plan( // default to scanning all row groups Ok(ParquetAccessPlan::new_all(row_group_count)) } + +// FIXME: @HStack ACTUALLY look at the arrow schema and handle map types correctly +// Right now, we are matching "map-like" parquet leaves like "key_value.key" etc +// But, we neeed to walk through both the arrow schema (which KNOWS about the map type) +// and the parquet leaves to do this correctly. +fn equivalent_projection_paths_from_parquet_schema( + arrow_schema: SchemaRef, + parquet_schema: &SchemaDescriptor, +) -> Vec<(usize, (String, String))> { + let mut output: Vec<(usize, (String, String))> = vec![]; + for (i, col) in parquet_schema.columns().iter().enumerate() { + let original_path = col.path().string(); + let converted_path = convert_parquet_path_to_deep_projection_path(&original_path.as_str()); + output.push((i, (original_path.clone(), converted_path))); + } + output +} + +fn convert_parquet_path_to_deep_projection_path(parquet_path: &str) -> String { + if parquet_path.contains(".key_value.key") || + parquet_path.contains(".key_value.value") || + parquet_path.contains(".entries.keys") || + parquet_path.contains(".entries.values") || + parquet_path.contains(".list.element") { + let tmp = parquet_path + .replace("key_value.key", "*") + .replace("key_value.value", "*") + .replace("entries.keys", "*") + .replace("entries.values", "*") + .replace("list.element", "*"); + tmp + } else { + parquet_path.to_string() + } +} + +fn generate_leaf_paths( + arrow_schema: SchemaRef, + parquet_schema: &SchemaDescriptor, + projection: &Vec, + projection_deep: &HashMap>, +) -> Vec { + let actual_projection = if projection.len() == 0 { + (0..arrow_schema.fields().len()).collect() + } else { + projection.clone() + }; + let splatted = + splat_columns(arrow_schema.clone(), &actual_projection, &projection_deep); + trace!(target: "deep", "generate_leaf_paths: splatted: {:?}", &splatted); + + let mut out: Vec = vec![]; + for (i, (original, converted)) in equivalent_projection_paths_from_parquet_schema(arrow_schema, parquet_schema) { + // FIXME: @HStack + // for map fields, the actual parquet paths look like x.y.z.key_value.key, x.y.z.key_value.value + // since we are ignoring these names in the paths, we need to actually collapse this access to a * + // so we can filter for them + // also, we need BOTH the key and the value for maps otherwise we run into an arrow-rs error + // "partial projection of MapArray is not supported" + + trace!(target: "deep", " generate_leaf_paths looking at index {} {} = {}", i, &original, &converted); + + let mut found = false; + for filter in splatted.iter() { + // check if this filter matches this leaf path + let filter_pieces = filter.split(".").collect::>(); + // let col_pieces = col_path.parts(); + let col_pieces = converted.split(".").collect::>(); + // let's check + let mut filter_found = true; + for i in 0..min(filter_pieces.len(), col_pieces.len()) { + if i >= filter_pieces.len() { + // we are at the end of the filter, and we matched until now, so we break, we match ! + break; + } + if i >= col_pieces.len() { + // we have a longer filter, we matched until now, we match ! + break; + } + // we can actually check + if !(col_pieces[i] == filter_pieces[i] || filter_pieces[i] == "*") { + filter_found = false; + break; + } + } + if filter_found { + found = true; + break; + } + } + if found { + out.push(i); + } + } + out +} diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 8d6c5089fa34..14017174578f 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -441,6 +441,7 @@ impl DefaultPhysicalPlanner { LogicalPlan::TableScan(TableScan { source, projection, + projection_deep, filters, fetch, .. @@ -451,7 +452,7 @@ impl DefaultPhysicalPlanner { // referred to in the query let filters = unnormalize_cols(filters.iter().cloned()); source - .scan(session_state, projection.as_ref(), &filters, *fetch) + .scan_deep(session_state, projection.as_ref(), projection_deep.as_ref(), &filters, *fetch) .await? } LogicalPlan::Values(Values { values, schema }) => { diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index cb464d3b120b..e2ceca009865 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -850,7 +850,7 @@ mod tests { logical_plan::{builder::LogicalPlanBuilder, table_scan}, not, try_cast, when, BinaryExpr, Expr, Extension, Like, LogicalPlan, Operator, Projection, UserDefinedLogicalNodeCore, WindowFunctionDefinition, - Literal, + Literal, }; use crate::optimize_projections::required_indices_deep::expr_to_deep_columns; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 826992e132ba..dadf65d62f70 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -932,6 +932,8 @@ message FileScanExecConf { repeated FileGroup file_groups = 1; datafusion_common.Schema schema = 2; repeated uint32 projection = 4; + // FIXME somewhat abusively using ProjectionColumns to serialize map> + map projection_deep = 20; ScanLimit limit = 5; datafusion_common.Statistics statistics = 6; repeated string table_partition_cols = 7; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index b4d63798f080..da494f75386c 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -5151,6 +5151,9 @@ impl serde::Serialize for FileScanExecConf { if !self.projection.is_empty() { len += 1; } + if !self.projection_deep.is_empty() { + len += 1; + } if self.limit.is_some() { len += 1; } @@ -5176,6 +5179,9 @@ impl serde::Serialize for FileScanExecConf { if !self.projection.is_empty() { struct_ser.serialize_field("projection", &self.projection)?; } + if !self.projection_deep.is_empty() { + struct_ser.serialize_field("projectionDeep", &self.projection_deep)?; + } if let Some(v) = self.limit.as_ref() { struct_ser.serialize_field("limit", v)?; } @@ -5205,6 +5211,8 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "fileGroups", "schema", "projection", + "projection_deep", + "projectionDeep", "limit", "statistics", "table_partition_cols", @@ -5220,6 +5228,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { FileGroups, Schema, Projection, + ProjectionDeep, Limit, Statistics, TablePartitionCols, @@ -5249,6 +5258,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "fileGroups" | "file_groups" => Ok(GeneratedField::FileGroups), "schema" => Ok(GeneratedField::Schema), "projection" => Ok(GeneratedField::Projection), + "projectionDeep" | "projection_deep" => Ok(GeneratedField::ProjectionDeep), "limit" => Ok(GeneratedField::Limit), "statistics" => Ok(GeneratedField::Statistics), "tablePartitionCols" | "table_partition_cols" => Ok(GeneratedField::TablePartitionCols), @@ -5276,6 +5286,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { let mut file_groups__ = None; let mut schema__ = None; let mut projection__ = None; + let mut projection_deep__ = None; let mut limit__ = None; let mut statistics__ = None; let mut table_partition_cols__ = None; @@ -5304,6 +5315,15 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { .into_iter().map(|x| x.0).collect()) ; } + GeneratedField::ProjectionDeep => { + if projection_deep__.is_some() { + return Err(serde::de::Error::duplicate_field("projectionDeep")); + } + projection_deep__ = Some( + map_.next_value::, _>>()? + .into_iter().map(|(k,v)| (k.0, v)).collect() + ); + } GeneratedField::Limit => { if limit__.is_some() { return Err(serde::de::Error::duplicate_field("limit")); @@ -5340,6 +5360,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { file_groups: file_groups__.unwrap_or_default(), schema: schema__, projection: projection__.unwrap_or_default(), + projection_deep: projection_deep__.unwrap_or_default(), limit: limit__, statistics: statistics__, table_partition_cols: table_partition_cols__.unwrap_or_default(), diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 875d2af75dd7..2f808e47e451 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1491,6 +1491,9 @@ pub struct FileScanExecConf { pub schema: ::core::option::Option, #[prost(uint32, repeated, tag = "4")] pub projection: ::prost::alloc::vec::Vec, + /// FIXME abusively using projection columns to serialize map> + #[prost(map = "uint32, message", tag = "20")] + pub projection_deep: ::std::collections::HashMap, #[prost(message, optional, tag = "5")] pub limit: ::core::option::Option, #[prost(message, optional, tag = "6")] diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index b2f92f4b2ee4..b56dd1cf0f2c 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -17,6 +17,7 @@ //! Serde code to convert from protocol buffers to Rust data structures. +use std::collections::HashMap; use std::sync::Arc; use arrow::compute::SortOptions; @@ -483,6 +484,16 @@ pub fn parse_protobuf_file_scan_config( } else { Some(projection) }; + let projection_deep = proto + .projection_deep + .iter() + .map(|(i, cols)| (*i as usize, cols.columns.clone())) + .collect::>(); + let projection_deep = if projection_deep.is_empty() { + None + } else { + Some(projection_deep) + }; let statistics = convert_required!(proto.statistics)?; let file_groups: Vec> = proto @@ -532,6 +543,7 @@ pub fn parse_protobuf_file_scan_config( file_groups, statistics, projection, + projection_deep, limit: proto.limit.as_ref().map(|sl| sl.limit as usize), table_partition_cols, output_ordering, diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 7949a457f40f..3c916dfb01f6 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -16,6 +16,7 @@ // under the License.language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; #[cfg(feature = "parquet")] @@ -43,7 +44,7 @@ use datafusion_expr::WindowFrame; use crate::protobuf::{ self, physical_aggregate_expr_node, physical_window_expr_node, PhysicalSortExprNode, - PhysicalSortExprNodeCollection, + PhysicalSortExprNodeCollection, ProjectionColumns }; use super::PhysicalExtensionCodec; @@ -593,6 +594,13 @@ pub fn serialize_file_scan_config( .iter() .map(|n| *n as u32) .collect(), + projection_deep: conf + .projection_deep + .as_ref() + .unwrap_or(&HashMap::new()) + .iter() + .map(|(n, v)| (*n as u32, ProjectionColumns { columns: v.clone() })) + .collect(), schema: Some(schema.as_ref().try_into()?), table_partition_cols: conf .table_partition_cols diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 0ffc494321fb..caea591cad11 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -668,6 +668,7 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> { ]))), }, projection: None, + projection_deep: None, limit: None, table_partition_cols: vec![], output_ordering: vec![], @@ -699,6 +700,7 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> { statistics: Statistics::new_unknown(&schema), file_schema: schema, projection: Some(vec![0, 1]), + projection_deep: None, limit: None, table_partition_cols: vec![Field::new( "part".to_string(), @@ -732,6 +734,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { ]))), }, projection: None, + projection_deep: None, limit: None, table_partition_cols: vec![], output_ordering: vec![],