From 7dfadb0f9ff7eb98dbdb31d0b338f0449bf6e55d Mon Sep 17 00:00:00 2001 From: Michael Maletich Date: Thu, 20 Jun 2024 22:31:29 -0500 Subject: [PATCH 1/3] feat(core): Allow schema evolution on read By casting the read record batch to the delta schema datafusion can read tables where the underlying parquet files can be cast to the desired schema. --- .../src/delta_datafusion/find_files/mod.rs | 2 +- crates/core/src/delta_datafusion/mod.rs | 234 +++++++++++++++--- .../src/delta_datafusion/schema_adapter.rs | 68 +++++ crates/core/src/operations/constraints.rs | 2 +- crates/core/src/operations/delete.rs | 2 +- crates/core/src/operations/update.rs | 2 +- crates/core/src/operations/write.rs | 2 +- 7 files changed, 274 insertions(+), 38 deletions(-) create mode 100644 crates/core/src/delta_datafusion/schema_adapter.rs diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs index 347925f31f..f237a4ac8e 100644 --- a/crates/core/src/delta_datafusion/find_files/mod.rs +++ b/crates/core/src/delta_datafusion/find_files/mod.rs @@ -148,7 +148,7 @@ async fn scan_table_by_files( // Add path column used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); - let scan = DeltaScanBuilder::new(&snapshot, log_store, &state) + let scan = DeltaScanBuilder::new(&snapshot, log_store) .with_filter(Some(expression.clone())) .with_projection(Some(&used_columns)) .with_scan_config(scan_config) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 723c2bea03..132fea0750 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -39,7 +39,8 @@ use arrow_cast::display::array_value_to_string; use arrow_schema::Field; use async_trait::async_trait; use chrono::{DateTime, TimeZone, Utc}; -use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat}; +use datafusion::datasource::file_format::FileFormat; +use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder; use datafusion::datasource::physical_plan::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, }; @@ -76,6 +77,7 @@ use serde::{Deserialize, Serialize}; use url::Url; use crate::delta_datafusion::expr::parse_predicate_expression; +use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot, StructTypeExt}; use crate::logstore::LogStoreRef; @@ -92,6 +94,7 @@ pub mod logical; pub mod physical; mod find_files; +mod schema_adapter; impl From for DataFusionError { fn from(err: DeltaTableError) -> Self { @@ -448,7 +451,6 @@ pub(crate) struct DeltaScanBuilder<'a> { snapshot: &'a DeltaTableState, log_store: LogStoreRef, filter: Option, - state: &'a SessionState, projection: Option<&'a Vec>, limit: Option, files: Option<&'a [Add]>, @@ -457,16 +459,11 @@ pub(crate) struct DeltaScanBuilder<'a> { } impl<'a> DeltaScanBuilder<'a> { - pub fn new( - snapshot: &'a DeltaTableState, - log_store: LogStoreRef, - state: &'a SessionState, - ) -> Self { + pub fn new(snapshot: &'a DeltaTableState, log_store: LogStoreRef) -> Self { DeltaScanBuilder { snapshot, log_store, filter: None, - state, files: None, projection: None, limit: None, @@ -618,35 +615,30 @@ impl<'a> DeltaScanBuilder<'a> { .datafusion_table_statistics() .unwrap_or(Statistics::new_unknown(&schema)); + let mut exec_plan_builder = ParquetExecBuilder::new(FileScanConfig { + object_store_url: self.log_store.object_store_url(), + file_schema, + file_groups: file_groups.into_values().collect(), + statistics: stats, + projection: self.projection.cloned(), + limit: self.limit, + table_partition_cols, + output_ordering: vec![], + }) + .with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {})); + // Sometimes (i.e Merge) we want to prune files that don't make the // filter and read the entire contents for files that do match the // filter - let parquet_pushdown = if config.enable_parquet_pushdown { - logical_filter.clone() - } else { - None + if let Some(predicate) = logical_filter { + if config.enable_parquet_pushdown { + exec_plan_builder = exec_plan_builder.with_predicate(predicate); + } }; - let scan = ParquetFormat::new() - .create_physical_plan( - self.state, - FileScanConfig { - object_store_url: self.log_store.object_store_url(), - file_schema, - file_groups: file_groups.into_values().collect(), - statistics: stats, - projection: self.projection.cloned(), - limit: self.limit, - table_partition_cols, - output_ordering: vec![], - }, - parquet_pushdown.as_ref(), - ) - .await?; - Ok(DeltaScan { table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(), - parquet_scan: scan, + parquet_scan: exec_plan_builder.build_arc(), config, logical_schema, }) @@ -686,7 +678,7 @@ impl TableProvider for DeltaTable { register_store(self.log_store(), session.runtime_env().clone()); let filter_expr = conjunction(filters.iter().cloned()); - let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store(), session) + let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store()) .with_projection(projection) .with_limit(limit) .with_filter(filter_expr) @@ -767,7 +759,7 @@ impl TableProvider for DeltaTableProvider { register_store(self.log_store.clone(), session.runtime_env().clone()); let filter_expr = conjunction(filters.iter().cloned()); - let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session) + let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone()) .with_projection(projection) .with_limit(limit) .with_filter(filter_expr) @@ -1479,7 +1471,7 @@ pub(crate) async fn find_files_scan<'a>( // Add path column used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); - let scan = DeltaScanBuilder::new(snapshot, log_store, state) + let scan = DeltaScanBuilder::new(snapshot, log_store) .with_filter(Some(expression.clone())) .with_projection(Some(&used_columns)) .with_scan_config(scan_config) @@ -1714,6 +1706,7 @@ impl From for DeltaColumn { #[cfg(test)] mod tests { + use crate::operations::write::SchemaMode; use crate::writer::test_utils::get_delta_schema; use arrow::array::StructArray; use arrow::datatypes::{DataType, Field, Schema}; @@ -2114,6 +2107,181 @@ mod tests { */ } + #[tokio::test] + async fn delta_scan_supports_missing_columns() { + let schema1 = Arc::new(ArrowSchema::new(vec![Field::new( + "col_1", + DataType::Utf8, + true, + )])); + + let batch1 = RecordBatch::try_new( + schema1.clone(), + vec![Arc::new(arrow::array::StringArray::from(vec![ + Some("A"), + Some("B"), + ]))], + ) + .unwrap(); + + let schema2 = Arc::new(ArrowSchema::new(vec![ + Field::new("col_1", DataType::Utf8, true), + Field::new("col_2", DataType::Utf8, true), + ])); + + let batch2 = RecordBatch::try_new( + schema2.clone(), + vec![ + Arc::new(arrow::array::StringArray::from(vec![ + Some("E"), + Some("F"), + Some("G"), + ])), + Arc::new(arrow::array::StringArray::from(vec![ + Some("E2"), + Some("F2"), + Some("G2"), + ])), + ], + ) + .unwrap(); + + let table = crate::DeltaOps::new_in_memory() + .write(vec![batch2]) + .with_save_mode(crate::protocol::SaveMode::Append) + .await + .unwrap(); + + let table = crate::DeltaOps(table) + .write(vec![batch1]) + .with_schema_mode(SchemaMode::Merge) + .with_save_mode(crate::protocol::SaveMode::Append) + .await + .unwrap(); + + let config = DeltaScanConfigBuilder::new() + .build(table.snapshot().unwrap()) + .unwrap(); + let log = table.log_store(); + + let provider = + DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap(); + let ctx: SessionContext = DeltaSessionContext::default().into(); + ctx.register_table("test", Arc::new(provider)).unwrap(); + + let df = ctx.sql("select col_1, col_2 from test").await.unwrap(); + let actual = df.collect().await.unwrap(); + let expected = vec![ + "+-------+-------+", + "| col_1 | col_2 |", + "+-------+-------+", + "| A | |", + "| B | |", + "| E | E2 |", + "| F | F2 |", + "| G | G2 |", + "+-------+-------+", + ]; + assert_batches_sorted_eq!(&expected, &actual); + } + + #[tokio::test] + async fn delta_scan_supports_nested_missing_columns() { + let column1_schema1: arrow::datatypes::Fields = + vec![Field::new("col_1a", DataType::Utf8, true)].into(); + let schema1 = Arc::new(ArrowSchema::new(vec![Field::new( + "col_1", + DataType::Struct(column1_schema1.clone()), + true, + )])); + + let batch1 = RecordBatch::try_new( + schema1.clone(), + vec![Arc::new(StructArray::new( + column1_schema1, + vec![Arc::new(arrow::array::StringArray::from(vec![ + Some("A"), + Some("B"), + ]))], + None, + ))], + ) + .unwrap(); + + let column1_schema2: arrow::datatypes::Fields = vec![ + Field::new("col_1a", DataType::Utf8, true), + Field::new("col_1b", DataType::Utf8, true), + ] + .into(); + let schema2 = Arc::new(ArrowSchema::new(vec![Field::new( + "col_1", + DataType::Struct(column1_schema2.clone()), + true, + )])); + + let batch2 = RecordBatch::try_new( + schema2.clone(), + vec![Arc::new(StructArray::new( + column1_schema2, + vec![ + Arc::new(arrow::array::StringArray::from(vec![ + Some("E"), + Some("F"), + Some("G"), + ])), + Arc::new(arrow::array::StringArray::from(vec![ + Some("E2"), + Some("F2"), + Some("G2"), + ])), + ], + None, + ))], + ) + .unwrap(); + + let table = crate::DeltaOps::new_in_memory() + .write(vec![batch1]) + .with_save_mode(crate::protocol::SaveMode::Append) + .await + .unwrap(); + + let table = crate::DeltaOps(table) + .write(vec![batch2]) + .with_schema_mode(SchemaMode::Merge) + .with_save_mode(crate::protocol::SaveMode::Append) + .await + .unwrap(); + + let config = DeltaScanConfigBuilder::new() + .build(table.snapshot().unwrap()) + .unwrap(); + let log = table.log_store(); + + let provider = + DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap(); + let ctx: SessionContext = DeltaSessionContext::default().into(); + ctx.register_table("test", Arc::new(provider)).unwrap(); + + let df = ctx + .sql("select col_1.col_1a, col_1.col_1b from test") + .await + .unwrap(); + let actual = df.collect().await.unwrap(); + let expected = vec![ + "+--------------------+--------------------+", + "| test.col_1[col_1a] | test.col_1[col_1b] |", + "+--------------------+--------------------+", + "| A | |", + "| B | |", + "| E | E2 |", + "| F | F2 |", + "| G | G2 |", + "+--------------------+--------------------+", + ]; + assert_batches_sorted_eq!(&expected, &actual); + } + #[tokio::test] async fn test_multiple_predicate_pushdown() { use crate::{datafusion::prelude::SessionContext, DeltaTableBuilder}; diff --git a/crates/core/src/delta_datafusion/schema_adapter.rs b/crates/core/src/delta_datafusion/schema_adapter.rs new file mode 100644 index 0000000000..ce331a7fea --- /dev/null +++ b/crates/core/src/delta_datafusion/schema_adapter.rs @@ -0,0 +1,68 @@ +use crate::operations::cast::cast_record_batch; +use arrow_array::RecordBatch; +use arrow_schema::{Schema, SchemaRef}; +use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; +use std::fmt::Debug; +use std::sync::Arc; + +/// A Schema Adapter Factory which provides casting record batches from parquet to meet +/// delta lake conventions. +#[derive(Debug)] +pub(crate) struct DeltaSchemaAdapterFactory {} + +impl SchemaAdapterFactory for DeltaSchemaAdapterFactory { + fn create(&self, schema: SchemaRef) -> Box { + Box::new(DeltaSchemaAdapter { + table_schema: schema, + }) + } +} + +pub(crate) struct DeltaSchemaAdapter { + /// Schema for the table + table_schema: SchemaRef, +} + +impl SchemaAdapter for DeltaSchemaAdapter { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { + let field = self.table_schema.field(index); + Some(file_schema.fields.find(field.name())?.0) + } + + fn map_schema( + &self, + file_schema: &Schema, + ) -> datafusion_common::Result<(Arc, Vec)> { + let mut projection = Vec::with_capacity(file_schema.fields().len()); + + for (file_idx, file_field) in file_schema.fields.iter().enumerate() { + if self.table_schema.fields().find(file_field.name()).is_some() { + projection.push(file_idx); + } + } + + Ok(( + Arc::new(SchemaMapping { + table_schema: self.table_schema.clone(), + }), + projection, + )) + } +} + +#[derive(Debug)] +pub(crate) struct SchemaMapping { + table_schema: SchemaRef, +} + +impl SchemaMapper for SchemaMapping { + fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { + let record_batch = cast_record_batch(&batch, self.table_schema.clone(), false, true)?; + Ok(record_batch) + } + + fn map_partial_batch(&self, batch: RecordBatch) -> datafusion_common::Result { + let record_batch = cast_record_batch(&batch, self.table_schema.clone(), false, true)?; + Ok(record_batch) + } +} diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index e5d356f81c..246541ccc1 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -114,7 +114,7 @@ impl std::future::IntoFuture for ConstraintBuilder { session.state() }); - let scan = DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), &state) + let scan = DeltaScanBuilder::new(&this.snapshot, this.log_store.clone()) .build() .await?; diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 56aa9ef98b..ac0b616ef3 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -139,7 +139,7 @@ async fn excute_non_empty_expr( let table_partition_cols = snapshot.metadata().partition_columns.clone(); - let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), state) + let scan = DeltaScanBuilder::new(snapshot, log_store.clone()) .with_files(rewrite) .build() .await?; diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 2f30f4aa8a..cd13698bed 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -236,7 +236,7 @@ async fn execute( // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches - let scan = DeltaScanBuilder::new(&snapshot, log_store.clone(), &state) + let scan = DeltaScanBuilder::new(&snapshot, log_store.clone()) .with_files(&candidates.candidates) .build() .await?; diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 0606707c19..fc3dfe44c7 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -511,7 +511,7 @@ async fn execute_non_empty_expr( let input_schema = snapshot.input_schema()?; let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; - let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) + let scan = DeltaScanBuilder::new(snapshot, log_store.clone()) .with_files(rewrite) .build() .await?; From d48425ecbc4e07b44a6af4e3334be51dc2919490 Mon Sep 17 00:00:00 2001 From: Michael Maletich Date: Fri, 21 Jun 2024 07:22:38 -0500 Subject: [PATCH 2/3] Use the correct schema for schema adapter. --- crates/core/src/delta_datafusion/mod.rs | 6 +----- crates/core/tests/integration_datafusion.rs | 15 ++++++++------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 132fea0750..4f4a504d0d 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -507,11 +507,7 @@ impl<'a> DeltaScanBuilder<'a> { let config = self.config; let schema = match self.schema { Some(schema) => schema, - None => { - self.snapshot - .physical_arrow_schema(self.log_store.object_store()) - .await? - } + None => self.snapshot.arrow_schema()?, }; let logical_schema = df_logical_schema(self.snapshot, &config)?; diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index cb3cc41edb..81cbe9ef72 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -900,13 +900,14 @@ mod local { let batches = ctx.sql("SELECT * FROM demo").await?.collect().await?; + // Without defining a schema of the select the default for a timestamp is ms UTC let expected = vec![ - "+-------------------------------+---------------------+------------+", - "| BIG_DATE | NORMAL_DATE | SOME_VALUE |", - "+-------------------------------+---------------------+------------+", - "| 1816-03-28T05:56:08.066277376 | 2022-02-01T00:00:00 | 2 |", - "| 1816-03-29T05:56:08.066277376 | 2022-01-01T00:00:00 | 1 |", - "+-------------------------------+---------------------+------------+", + "+-----------------------------+----------------------+------------+", + "| BIG_DATE | NORMAL_DATE | SOME_VALUE |", + "+-----------------------------+----------------------+------------+", + "| 1816-03-28T05:56:08.066278Z | 2022-02-01T00:00:00Z | 2 |", + "| 1816-03-29T05:56:08.066278Z | 2022-01-01T00:00:00Z | 1 |", + "+-----------------------------+----------------------+------------+", ]; assert_batches_sorted_eq!(&expected, &batches); @@ -1113,7 +1114,7 @@ mod local { .unwrap(); let batch = batches.pop().unwrap(); - let expected_schema = Schema::new(vec![Field::new("id", ArrowDataType::Int32, true)]); + let expected_schema = Schema::new(vec![Field::new("id", ArrowDataType::Int64, false)]); assert_eq!(batch.schema().as_ref(), &expected_schema); Ok(()) } From 215f73307e769fdea640e9f8024f9a89cc33e2c8 Mon Sep 17 00:00:00 2001 From: Alex Wilcoxson Date: Mon, 15 Jul 2024 15:49:46 +0200 Subject: [PATCH 3/3] test: post merge test fixes --- crates/core/src/delta_datafusion/mod.rs | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 7a5fbd0b01..fecc6f3f03 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -39,7 +39,6 @@ use arrow_cast::display::array_value_to_string; use arrow_schema::Field; use async_trait::async_trait; use chrono::{DateTime, TimeZone, Utc}; -use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder; use datafusion::datasource::physical_plan::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, @@ -66,7 +65,6 @@ use datafusion_common::{ use datafusion_expr::logical_plan::CreateExternalTable; use datafusion_expr::utils::conjunction; use datafusion_expr::{col, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility}; -use datafusion_physical_expr::PhysicalExpr; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; use datafusion_sql::planner::ParserOptions; @@ -1747,6 +1745,7 @@ mod tests { use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor}; use datafusion_expr::lit; + use datafusion_physical_expr::PhysicalExpr; use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf; use object_store::path::Path; @@ -2319,7 +2318,7 @@ mod tests { #[tokio::test] async fn test_multiple_predicate_pushdown() { - use crate::{datafusion::prelude::SessionContext, DeltaTableBuilder}; + use crate::datafusion::prelude::SessionContext; let schema = Arc::new(ArrowSchema::new(vec![ Field::new("moDified", DataType::Utf8, true), Field::new("id", DataType::Utf8, true), @@ -2362,7 +2361,6 @@ mod tests { #[tokio::test] async fn test_delta_scan_builder_no_scan_config() { - use crate::datafusion::prelude::SessionContext; let arr: Arc = Arc::new(arrow::array::StringArray::from(vec!["s"])); let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap(); let table = crate::DeltaOps::new_in_memory() @@ -2371,13 +2369,11 @@ mod tests { .await .unwrap(); - let ctx = SessionContext::new(); - let scan = - DeltaScanBuilder::new(table.snapshot().unwrap(), table.log_store(), &ctx.state()) - .with_filter(Some(col("a").eq(lit("s")))) - .build() - .await - .unwrap(); + let scan = DeltaScanBuilder::new(table.snapshot().unwrap(), table.log_store()) + .with_filter(Some(col("a").eq(lit("s")))) + .build() + .await + .unwrap(); let mut visitor = ParquetPredicateVisitor::default(); visit_execution_plan(&scan, &mut visitor).unwrap(); @@ -2391,7 +2387,6 @@ mod tests { #[tokio::test] async fn test_delta_scan_builder_scan_config_disable_pushdown() { - use crate::datafusion::prelude::SessionContext; let arr: Arc = Arc::new(arrow::array::StringArray::from(vec!["s"])); let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap(); let table = crate::DeltaOps::new_in_memory() @@ -2400,9 +2395,8 @@ mod tests { .await .unwrap(); - let ctx = SessionContext::new(); let snapshot = table.snapshot().unwrap(); - let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &ctx.state()) + let scan = DeltaScanBuilder::new(snapshot, table.log_store()) .with_filter(Some(col("a").eq(lit("s")))) .with_scan_config( DeltaScanConfigBuilder::new()