From 3fad049973f887713732d0ef38c46019021748e0 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Wed, 24 Apr 2024 06:19:48 +0000 Subject: [PATCH 1/5] feat: introduce CDC write-side support for the Update operations This change introduces a `CDCTracker` which helps collect changes during merges and update. This is admittedly rather inefficient, but my hope is that this provides a place to start iterating and improving upon the writer code There is still additional work which needs to be done to handle table features properly for other code paths (see the middleware discussion we have had in Slack) but this produces CDC files for Update operations Fixes #604 Fixes #2095 --- crates/core/Cargo.toml | 3 +- crates/core/src/delta_datafusion/mod.rs | 6 + crates/core/src/operations/cdc.rs | 454 ++++++++++++++++++ crates/core/src/operations/delete.rs | 1 + crates/core/src/operations/merge/mod.rs | 1 + crates/core/src/operations/mod.rs | 2 + .../src/operations/transaction/protocol.rs | 5 + crates/core/src/operations/update.rs | 340 ++++++++++++- crates/core/src/operations/write.rs | 27 +- 9 files changed, 819 insertions(+), 20 deletions(-) create mode 100644 crates/core/src/operations/cdc.rs diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 008d405948..a83be3ce37 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -115,7 +115,8 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] } utime = "0.3" [features] -default = [] +cdf = [] +default = ["cdf"] datafusion = [ "dep:datafusion", "datafusion-expr", diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index c1b6208cff..fae36d7cbf 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -509,6 +509,12 @@ impl<'a> DeltaScanBuilder<'a> { self } + /// Use the provided [SchemaRef] for the [DeltaScan] + pub fn with_schema(mut self, schema: SchemaRef) -> Self { + self.schema = Some(schema); + self + } + pub async fn build(self) -> DeltaResult { let config = self.config; let schema = match self.schema { diff --git a/crates/core/src/operations/cdc.rs b/crates/core/src/operations/cdc.rs new file mode 100644 index 0000000000..365f4a649a --- /dev/null +++ b/crates/core/src/operations/cdc.rs @@ -0,0 +1,454 @@ +//! +//! The CDC module contains private tools for managing CDC files +//! + +use crate::table::state::DeltaTableState; +use crate::DeltaResult; + +use arrow::array::{Array, StringArray}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use datafusion::error::Result as DataFusionResult; +use datafusion::physical_plan::{ + metrics::MetricsSet, DisplayAs, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, +}; +use datafusion::prelude::*; +use futures::{Stream, StreamExt}; +use std::sync::Arc; +use tokio::sync::mpsc::*; +use tracing::log::*; + +/// Maximum in-memory channel size for the tracker to use +const MAX_CHANNEL_SIZE: usize = 1024; + +/// The CDCTracker is useful for hooking reads/writes in a manner nececessary to create CDC files +/// associated with commits +pub(crate) struct CDCTracker { + schema: SchemaRef, + pre_sender: Sender, + pre_receiver: Receiver, + post_sender: Sender, + post_receiver: Receiver, +} + +impl CDCTracker { + /// construct + pub(crate) fn new(schema: SchemaRef) -> Self { + let (pre_sender, pre_receiver) = channel(MAX_CHANNEL_SIZE); + let (post_sender, post_receiver) = channel(MAX_CHANNEL_SIZE); + Self { + schema, + pre_sender, + pre_receiver, + post_sender, + post_receiver, + } + } + + /// Return an owned [Sender] for the caller to use when sending read but not altered batches + pub(crate) fn pre_sender(&self) -> Sender { + self.pre_sender.clone() + } + + /// Return an owned [Sender][ for the caller to use when sending altered batches + pub(crate) fn post_sender(&self) -> Sender { + self.post_sender.clone() + } + + pub(crate) async fn collect(mut self) -> DeltaResult> { + debug!("Collecting all the batches for diffing"); + let ctx = SessionContext::new(); + let mut pre = vec![]; + let mut post = vec![]; + + while !self.pre_receiver.is_empty() { + if let Ok(batch) = self.pre_receiver.try_recv() { + pre.push(batch); + } else { + warn!("Error when receiving on the pre-receiver"); + } + } + + while !self.post_receiver.is_empty() { + if let Ok(batch) = self.post_receiver.try_recv() { + post.push(batch); + } else { + warn!("Error when receiving on the post-receiver"); + } + } + + // Collect _all_ the batches for consideration + let pre = ctx.read_batches(pre)?; + let post = ctx.read_batches(post)?; + + // There is certainly a better way to do this other than stupidly cloning data for diffing + // purposes, but this is the quickest and easiest way to "diff" the two sets of batches + let preimage = pre.clone().except(post.clone())?; + let postimage = post.except(pre)?; + + // Create a new schema which represents the input batch along with the CDC + // columns + let mut fields: Vec> = self.schema.fields().to_vec().clone(); + fields.push(Arc::new(Field::new("_change_type", DataType::Utf8, true))); + let schema = Arc::new(Schema::new(fields)); + + let mut batches = vec![]; + + let mut pre_stream = preimage.execute_stream().await?; + let mut post_stream = postimage.execute_stream().await?; + + // Fill up on pre image batches + while let Some(Ok(batch)) = pre_stream.next().await { + let batch = crate::operations::cast::cast_record_batch( + &batch, + self.schema.clone(), + true, + false, + )?; + let new_column = Arc::new(StringArray::from(vec![ + Some("update_preimage"); + batch.num_rows() + ])); + let mut columns: Vec> = batch.columns().to_vec(); + columns.push(new_column); + + let batch = RecordBatch::try_new(schema.clone(), columns)?; + batches.push(batch); + } + + // Fill up on the post-image batches + while let Some(Ok(batch)) = post_stream.next().await { + let batch = crate::operations::cast::cast_record_batch( + &batch, + self.schema.clone(), + true, + false, + )?; + let new_column = Arc::new(StringArray::from(vec![ + Some("update_postimage"); + batch.num_rows() + ])); + let mut columns: Vec> = batch.columns().to_vec(); + columns.push(new_column); + + let batch = RecordBatch::try_new(schema.clone(), columns)?; + batches.push(batch); + } + + debug!("Found {} batches to consider `CDC` data", batches.len()); + + // At this point the batches should just contain the changes + Ok(batches) + } +} + +/// A DataFusion observer to help pick up on pre-image changes +pub(crate) struct CDCObserver { + parent: Arc, + id: String, + sender: Sender, +} + +impl CDCObserver { + pub(crate) fn new( + id: String, + sender: Sender, + parent: Arc, + ) -> Self { + Self { id, sender, parent } + } +} + +impl std::fmt::Debug for CDCObserver { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CDCObserver").field("id", &self.id).finish() + } +} + +impl DisplayAs for CDCObserver { + fn fmt_as( + &self, + _: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "CDCObserver id={}", self.id) + } +} + +impl ExecutionPlan for CDCObserver { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> SchemaRef { + self.parent.schema() + } + + fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + self.parent.properties() + } + + fn children(&self) -> Vec> { + vec![self.parent.clone()] + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> datafusion_common::Result { + let res = self.parent.execute(partition, context)?; + Ok(Box::pin(CDCObserverStream { + schema: self.schema(), + input: res, + sender: self.sender.clone(), + })) + } + + fn statistics(&self) -> DataFusionResult { + self.parent.statistics() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> datafusion_common::Result> { + if let Some(parent) = children.first() { + Ok(Arc::new(CDCObserver { + id: self.id.clone(), + sender: self.sender.clone(), + parent: parent.clone(), + })) + } else { + Err(datafusion_common::DataFusionError::Internal( + "Failed to handle CDCObserver".into(), + )) + } + } + + fn metrics(&self) -> Option { + self.parent.metrics() + } +} + +/// The CDCObserverStream simply acts to help observe the stream of data being +/// read by DataFusion to capture the pre-image versions of data +pub(crate) struct CDCObserverStream { + schema: SchemaRef, + input: SendableRecordBatchStream, + sender: Sender, +} + +impl Stream for CDCObserverStream { + type Item = DataFusionResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.input.poll_next_unpin(cx).map(|x| match x { + Some(Ok(batch)) => { + let _ = self.sender.try_send(batch.clone()); + Some(Ok(batch)) + } + other => other, + }) + } + + fn size_hint(&self) -> (usize, Option) { + self.input.size_hint() + } +} + +impl RecordBatchStream for CDCObserverStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +/// +/// Return true if the specified table is capable of writing Change Data files +/// +/// From the Protocol: +/// +/// > For Writer Versions 4 up to 6, all writers must respect the delta.enableChangeDataFeed +/// > configuration flag in the metadata of the table. When delta.enableChangeDataFeed is true, +/// > writers must produce the relevant AddCDCFile's for any operation that changes data, as +/// > specified in Change Data Files. +/// > +/// > For Writer Version 7, all writers must respect the delta.enableChangeDataFeed configuration flag in +/// > the metadata of the table only if the feature changeDataFeed exists in the table protocol's +/// > writerFeatures. +pub(crate) fn should_write_cdc(snapshot: &DeltaTableState) -> DeltaResult { + if let Some(features) = &snapshot.protocol().writer_features { + // Features should only exist at writer version 7 but to avoid cases where + // the Option> can get filled with an empty set, checking for the value + // explicitly + if snapshot.protocol().min_writer_version == 7 + && !features.contains(&crate::kernel::WriterFeatures::ChangeDataFeed) + { + // If the writer feature has not been set, then the table should not have CDC written + // to it. Otherwise fallback to the configured table configuration + return Ok(false); + } + } + Ok(snapshot.table_config().enable_change_data_feed()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::kernel::DataType as DeltaDataType; + use crate::kernel::{Action, PrimitiveType, Protocol}; + use crate::operations::DeltaOps; + use crate::{DeltaConfigKey, DeltaTable}; + use arrow::array::Int32Array; + use datafusion::assert_batches_sorted_eq; + + /// A simple test which validates primitive writer version 1 tables should + /// not write Change Data Files + #[tokio::test] + async fn test_should_write_cdc_basic_table() { + let mut table = DeltaOps::new_in_memory() + .create() + .with_column( + "value", + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + None, + ) + .await + .expect("Failed to make a table"); + table.load().await.expect("Failed to reload table"); + let result = should_write_cdc(table.snapshot().unwrap()).expect("Failed to use table"); + assert!( + result == false, + "A default table should not create CDC files" + ); + } + + /// + /// This test manually creates a table with writer version 4 that has the configuration sets + /// + #[tokio::test] + async fn test_should_write_cdc_table_with_configuration() { + let actions = vec![Action::Protocol(Protocol::new(1, 4))]; + let mut table: DeltaTable = DeltaOps::new_in_memory() + .create() + .with_column( + "value", + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + None, + ) + .with_actions(actions) + .with_configuration_property(DeltaConfigKey::EnableChangeDataFeed, Some("true")) + .await + .expect("failed to make a version 4 table with EnableChangeDataFeed"); + table.load().await.expect("Failed to reload table"); + + let result = should_write_cdc(table.snapshot().unwrap()).expect("Failed to use table"); + assert!( + result == true, + "A table with the EnableChangeDataFeed should create CDC files" + ); + } + + /// + /// This test creates a writer version 7 table which has a slightly different way of + /// determining whether CDC files should be written or not. + #[tokio::test] + async fn test_should_write_cdc_v7_table_no_writer_feature() { + let actions = vec![Action::Protocol(Protocol::new(1, 7))]; + let mut table: DeltaTable = DeltaOps::new_in_memory() + .create() + .with_column( + "value", + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + None, + ) + .with_actions(actions) + .await + .expect("failed to make a version 4 table with EnableChangeDataFeed"); + table.load().await.expect("Failed to reload table"); + + let result = should_write_cdc(table.snapshot().unwrap()).expect("Failed to use table"); + assert!( + result == false, + "A v7 table must not write CDC files unless the writer feature is set" + ); + } + + /// + /// This test creates a writer version 7 table with a writer table feature enabled for CDC and + /// therefore should write CDC files + #[tokio::test] + async fn test_should_write_cdc_v7_table_with_writer_feature() { + let protocol = Protocol::new(1, 7) + .with_writer_features(vec![crate::kernel::WriterFeatures::ChangeDataFeed]); + let actions = vec![Action::Protocol(protocol)]; + let mut table: DeltaTable = DeltaOps::new_in_memory() + .create() + .with_column( + "value", + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + None, + ) + .with_actions(actions) + .with_configuration_property(DeltaConfigKey::EnableChangeDataFeed, Some("true")) + .await + .expect("failed to make a version 4 table with EnableChangeDataFeed"); + table.load().await.expect("Failed to reload table"); + + let result = should_write_cdc(table.snapshot().unwrap()).expect("Failed to use table"); + assert!( + result == true, + "A v7 table must not write CDC files unless the writer feature is set" + ); + } + + #[tokio::test] + async fn test_sanity_check() { + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int32, + true, + )])); + let tracker = CDCTracker::new(schema.clone()); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))], + ) + .unwrap(); + let updated_batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![Some(1), Some(12), Some(3)]))], + ) + .unwrap(); + + let _ = tracker.pre_sender().send(batch).await; + let _ = tracker.post_sender().send(updated_batch).await; + + match tracker.collect().await { + Ok(batches) => { + let _ = arrow::util::pretty::print_batches(&batches); + assert_eq!(batches.len(), 2); + assert_batches_sorted_eq! {[ + "+-------+------------------+", + "| value | _change_type |", + "+-------+------------------+", + "| 2 | update_preimage |", + "| 12 | update_postimage |", + "+-------+------------------+", + ], &batches } + } + Err(err) => { + println!("err: {err:#?}"); + panic!("Should have never reached this assertion"); + } + } + } +} diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index bf17ed6085..aba54cd5f1 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -174,6 +174,7 @@ async fn excute_non_empty_expr( false, None, writer_stats_config, + None, ) .await? .into_iter() diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index ddbe113d16..c13da4d879 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -1389,6 +1389,7 @@ async fn execute( safe_cast, None, writer_stats_config, + None, ) .await?; diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 7923431d45..761ebd7b4e 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -39,6 +39,8 @@ use optimize::OptimizeBuilder; use restore::RestoreBuilder; use set_tbl_properties::SetTablePropertiesBuilder; +#[cfg(all(feature = "cdf", feature = "datafusion"))] +mod cdc; #[cfg(feature = "datafusion")] pub mod constraints; #[cfg(feature = "datafusion")] diff --git a/crates/core/src/operations/transaction/protocol.rs b/crates/core/src/operations/transaction/protocol.rs index c5d9cdf650..707f1daf02 100644 --- a/crates/core/src/operations/transaction/protocol.rs +++ b/crates/core/src/operations/transaction/protocol.rs @@ -228,6 +228,11 @@ pub static INSTANCE: Lazy = Lazy::new(|| { let mut writer_features = HashSet::new(); writer_features.insert(WriterFeatures::AppendOnly); writer_features.insert(WriterFeatures::TimestampWithoutTimezone); + #[cfg(feature = "cdf")] + { + writer_features.insert(WriterFeatures::ChangeDataFeed); + writer_features.insert(WriterFeatures::GeneratedColumns); + } #[cfg(feature = "datafusion")] { writer_features.insert(WriterFeatures::Invariants); diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 9a088c6ae9..6ca79c6244 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -38,8 +38,10 @@ use datafusion_physical_expr::{ PhysicalExpr, }; use futures::future::BoxFuture; +use object_store::prefix::PrefixStore; use parquet::file::properties::WriterProperties; use serde::Serialize; +use tracing::log::*; use super::write::write_execution_plan; use super::{ @@ -52,12 +54,17 @@ use crate::delta_datafusion::{ DataFusionMixins, DeltaColumn, DeltaSessionContext, }; use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; -use crate::kernel::{Action, Remove}; +use crate::kernel::{Action, AddCDCFile, Remove}; use crate::logstore::LogStoreRef; +use crate::operations::cdc::*; +use crate::operations::writer::{DeltaWriter, WriterConfig}; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable}; +/// Custom column name used for marking internal [RecordBatch] rows as updated +pub(crate) const UPDATE_PREDICATE_COLNAME: &str = "__delta_rs_update_predicate"; + /// Updates records in the Delta Table. /// See this module's documentation for more information pub struct UpdateBuilder { @@ -222,6 +229,10 @@ async fn execute( let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); + // Create a projection for a new column with the predicate evaluated + let input_schema = snapshot.input_schema()?; + let tracker = CDCTracker::new(input_schema.clone()); + let execution_props = state.execution_props(); // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches @@ -231,15 +242,23 @@ async fn execute( .await?; let scan = Arc::new(scan); - // Create a projection for a new column with the predicate evaluated - let input_schema = snapshot.input_schema()?; + // Wrap the scan with a CDCObserver if CDC has been abled so that the tracker can + // later be used to produce the CDC files + let scan: Arc = match should_write_cdc(&snapshot) { + Ok(true) => Arc::new(CDCObserver::new( + "cdc-update-observer".into(), + tracker.pre_sender(), + scan.clone(), + )), + _others => scan, + }; let mut fields = Vec::new(); for field in input_schema.fields.iter() { fields.push(field.to_owned()); } fields.push(Arc::new(Field::new( - "__delta_rs_update_predicate", + UPDATE_PREDICATE_COLNAME, arrow_schema::DataType::Boolean, true, ))); @@ -265,16 +284,16 @@ async fn execute( when(predicate.clone(), lit(true)).otherwise(lit(ScalarValue::Boolean(None)))?; let predicate_expr = create_physical_expr_fix(predicate_null, &input_dfschema, execution_props)?; - expressions.push((predicate_expr, "__delta_rs_update_predicate".to_string())); + expressions.push((predicate_expr, UPDATE_PREDICATE_COLNAME.to_string())); let projection_predicate: Arc = - Arc::new(ProjectionExec::try_new(expressions, scan)?); + Arc::new(ProjectionExec::try_new(expressions, scan.clone())?); let count_plan = Arc::new(MetricObserverExec::new( "update_count".into(), projection_predicate.clone(), |batch, metrics| { - let array = batch.column_by_name("__delta_rs_update_predicate").unwrap(); + let array = batch.column_by_name(UPDATE_PREDICATE_COLNAME).unwrap(); let copied_rows = array.null_count(); let num_updated = array.len() - copied_rows; @@ -305,10 +324,10 @@ async fn execute( // Maintain a map from the original column name to its temporary column index let mut map = HashMap::::new(); let mut control_columns = HashSet::::new(); - control_columns.insert("__delta_rs_update_predicate".to_owned()); + control_columns.insert(UPDATE_PREDICATE_COLNAME.to_string()); for (column, expr) in updates { - let expr = case(col("__delta_rs_update_predicate")) + let expr = case(col(UPDATE_PREDICATE_COLNAME)) .when(lit(true), expr.to_owned()) .otherwise(col(column.to_owned()))?; let predicate_expr = create_physical_expr_fix(expr, &input_dfschema, execution_props)?; @@ -324,6 +343,7 @@ async fn execute( // Project again to remove __delta_rs columns and rename update columns to their original name let mut expressions: Vec<(Arc, String)> = Vec::new(); let scan_schema = projection_update.schema(); + for (i, field) in scan_schema.fields().into_iter().enumerate() { if !control_columns.contains(field.name()) { match map.get(field.name()) { @@ -364,10 +384,11 @@ async fn execute( log_store.object_store().clone(), Some(snapshot.table_config().target_file_size() as usize), None, - writer_properties, + writer_properties.clone(), safe_cast, None, writer_stats_config, + Some(tracker.post_sender()), ) .await?; @@ -422,6 +443,50 @@ async fn execute( serde_json::to_value(&metrics)?, ); + match tracker.collect().await { + Ok(batches) => { + if !batches.is_empty() { + debug!( + "Collected {} batches to write as part of this transaction:", + batches.len() + ); + let config = WriterConfig::new( + batches[0].schema().clone(), + snapshot.metadata().partition_columns.clone(), + writer_properties.clone(), + None, + None, + 0, + None, + ); + + let store = Arc::new(PrefixStore::new( + log_store.object_store().clone(), + "_change_data", + )); + let mut writer = DeltaWriter::new(store, config); + for batch in batches { + writer.write(&batch).await?; + } + // Add the AddCDCFile actions that exist to the commit + actions.extend(writer.close().await?.into_iter().map(|add| { + Action::Cdc(AddCDCFile { + // This is a gnarly hack, but the action needs the nested path, not the + // path isnide the prefixed store + path: format!("_change_data/{}", add.path), + size: add.size, + partition_values: add.partition_values, + data_change: false, + tags: add.tags, + }) + })); + } + } + Err(err) => { + error!("Failed to collect CDC batches: {err:#?}"); + } + }; + let commit = CommitBuilder::from(commit_properties) .with_actions(actions) .build(Some(&snapshot), log_store, operation) @@ -472,10 +537,12 @@ impl std::future::IntoFuture for UpdateBuilder { #[cfg(test)] mod tests { + use super::*; + + use crate::delta_datafusion::cdf::DeltaCdfScan; use crate::kernel::DataType as DeltaDataType; - use crate::kernel::PrimitiveType; - use crate::kernel::StructField; - use crate::kernel::StructType; + use crate::kernel::{Action, PrimitiveType, Protocol, StructField, StructType}; + use crate::operations::collect_sendable_stream; use crate::operations::DeltaOps; use crate::writer::test_utils::datafusion::get_data; use crate::writer::test_utils::datafusion::write_batch; @@ -484,12 +551,13 @@ mod tests { }; use crate::DeltaConfigKey; use crate::DeltaTable; + use arrow::array::{Int32Array, StringArray}; use arrow::datatypes::Schema as ArrowSchema; use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; - use arrow_array::Int32Array; use arrow_schema::DataType; use datafusion::assert_batches_sorted_eq; + use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; use serde_json::json; use std::sync::Arc; @@ -969,4 +1037,248 @@ mod tests { .await; assert!(res.is_err()); } + + #[tokio::test] + async fn test_no_cdc_on_older_tables() { + let table = prepare_values_table().await; + assert_eq!(table.version(), 0); + assert_eq!(table.get_files_count(), 1); + + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + arrow::datatypes::DataType::Int32, + true, + )])); + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))], + ) + .unwrap(); + let table = DeltaOps(table) + .write(vec![batch]) + .await + .expect("Failed to write first batch"); + assert_eq!(table.version(), 1); + + let (table, _metrics) = DeltaOps(table) + .update() + .with_predicate(col("value").eq(lit(2))) + .with_update("value", lit(12)) + .await + .unwrap(); + assert_eq!(table.version(), 2); + + // NOTE: This currently doesn't really assert anything because cdc_files() is not reading + // actions correct + if let Some(state) = table.state.clone() { + let cdc_files = state.cdc_files(); + assert!(cdc_files.is_ok()); + if let Ok(cdc_files) = cdc_files { + let cdc_files: Vec<_> = cdc_files.collect(); + assert_eq!(cdc_files.len(), 0); + } + } else { + panic!("I shouldn't exist!"); + } + + // Too close for missiles, switching to guns. Just checking that the data wasn't actually + // written instead! + if let Ok(files) = crate::storage::utils::flatten_list_stream( + &table.object_store(), + Some(&object_store::path::Path::from("_change_data")), + ) + .await + { + assert_eq!( + 0, + files.len(), + "This test should not find any written CDC files! {files:#?}" + ); + } + } + + #[tokio::test] + async fn test_update_cdc_enabled() { + // Currently you cannot pass EnableChangeDataFeed through `with_configuration_property` + // so the only way to create a truly CDC enabled table is by shoving the Protocol + // directly into the actions list + let actions = vec![Action::Protocol(Protocol::new(1, 4))]; + let table: DeltaTable = DeltaOps::new_in_memory() + .create() + .with_column( + "value", + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + None, + ) + .with_actions(actions) + .with_configuration_property(DeltaConfigKey::EnableChangeDataFeed, Some("true")) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + arrow::datatypes::DataType::Int32, + true, + )])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))], + ) + .unwrap(); + let table = DeltaOps(table) + .write(vec![batch]) + .await + .expect("Failed to write first batch"); + assert_eq!(table.version(), 1); + + let (table, _metrics) = DeltaOps(table) + .update() + .with_predicate(col("value").eq(lit(2))) + .with_update("value", lit(12)) + .await + .unwrap(); + assert_eq!(table.version(), 2); + + let ctx = SessionContext::new(); + let table = DeltaOps(table) + .load_cdf() + .with_session_ctx(ctx.clone()) + .with_starting_version(0) + .build() + .await + .expect("Failed to load CDF"); + + let mut batches = collect_batches( + table.properties().output_partitioning().partition_count(), + table, + ctx, + ) + .await + .expect("Failed to collect batches"); + + // The batches will contain a current _commit_timestamp which shouldn't be check_append_only + let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(3)).collect(); + + assert_batches_sorted_eq! {[ + "+-------+------------------+-----------------+", + "| value | _change_type | _commit_version |", + "+-------+------------------+-----------------+", + "| 1 | insert | 1 |", + "| 2 | insert | 1 |", + "| 2 | update_preimage | 2 |", + "| 12 | update_postimage | 2 |", + "| 3 | insert | 1 |", + "+-------+------------------+-----------------+", + ], &batches } + } + + #[tokio::test] + async fn test_update_cdc_enabled_partitions() { + // Currently you cannot pass EnableChangeDataFeed through `with_configuration_property` + // so the only way to create a truly CDC enabled table is by shoving the Protocol + // directly into the actions list + let actions = vec![Action::Protocol(Protocol::new(1, 4))]; + let table: DeltaTable = DeltaOps::new_in_memory() + .create() + .with_column( + "year", + DeltaDataType::Primitive(PrimitiveType::String), + true, + None, + ) + .with_column( + "value", + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + None, + ) + .with_partition_columns(vec!["year"]) + .with_actions(actions) + .with_configuration_property(DeltaConfigKey::EnableChangeDataFeed, Some("true")) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let schema = Arc::new(Schema::new(vec![ + Field::new("year", DataType::Utf8, true), + Field::new("value", DataType::Int32, true), + ])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(StringArray::from(vec![ + Some("2020"), + Some("2020"), + Some("2024"), + ])), + Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])), + ], + ) + .unwrap(); + let table = DeltaOps(table) + .write(vec![batch]) + .await + .expect("Failed to write first batch"); + assert_eq!(table.version(), 1); + + let (table, _metrics) = DeltaOps(table) + .update() + .with_predicate(col("value").eq(lit(2))) + .with_update("year", "2024") + .await + .unwrap(); + assert_eq!(table.version(), 2); + + let ctx = SessionContext::new(); + let table = DeltaOps(table) + .load_cdf() + .with_session_ctx(ctx.clone()) + .with_starting_version(0) + .build() + .await + .expect("Failed to load CDF"); + + let mut batches = collect_batches( + table.properties().output_partitioning().partition_count(), + table, + ctx, + ) + .await + .expect("Failed to collect batches"); + + let _ = arrow::util::pretty::print_batches(&batches); + + // The batches will contain a current _commit_timestamp which shouldn't be check_append_only + let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(3)).collect(); + + assert_batches_sorted_eq! {[ + "+-------+------------------+-----------------+------+", + "| value | _change_type | _commit_version | year |", + "+-------+------------------+-----------------+------+", + "| 1 | insert | 1 | 2020 |", + "| 2 | insert | 1 | 2020 |", + "| 2 | update_preimage | 2 | 2020 |", + "| 2 | update_postimage | 2 | 2024 |", + "| 3 | insert | 1 | 2024 |", + "+-------+------------------+-----------------+------+", + ], &batches } + } + + async fn collect_batches( + num_partitions: usize, + stream: DeltaCdfScan, + ctx: SessionContext, + ) -> Result, Box> { + let mut batches = vec![]; + for p in 0..num_partitions { + let data: Vec = + collect_sendable_stream(stream.execute(p, ctx.task_ctx())?).await?; + batches.extend_from_slice(&data); + } + Ok(batches) + } } diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index f3b87d4f66..84705c415d 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -41,6 +41,7 @@ use datafusion_expr::Expr; use futures::future::BoxFuture; use futures::StreamExt; use parquet::file::properties::WriterProperties; +use tracing::log::*; use super::datafusion_utils::Expression; use super::transaction::{CommitBuilder, CommitProperties, TableReference, PROTOCOL}; @@ -63,6 +64,8 @@ use crate::table::Constraint as DeltaConstraint; use crate::writer::record_batch::divide_by_partition_values; use crate::DeltaTable; +use tokio::sync::mpsc::Sender; + #[derive(thiserror::Error, Debug)] enum WriteError { #[error("No data source supplied to write command.")] @@ -370,6 +373,7 @@ async fn write_execution_plan_with_predicate( safe_cast: bool, schema_mode: Option, writer_stats_config: WriterStatsConfig, + sender: Option>, ) -> DeltaResult> { let schema: ArrowSchemaRef = if schema_mode.is_some() { plan.schema() @@ -378,7 +382,6 @@ async fn write_execution_plan_with_predicate( .and_then(|s| s.input_schema().ok()) .unwrap_or(plan.schema()) }; - let checker = if let Some(snapshot) = snapshot { DeltaDataChecker::new(snapshot) } else { @@ -410,11 +413,15 @@ async fn write_execution_plan_with_predicate( ); let mut writer = DeltaWriter::new(object_store.clone(), config); let checker_stream = checker.clone(); + let sender_stream = sender.clone(); let mut stream = inner_plan.execute(i, task_ctx)?; - let handle: tokio::task::JoinHandle>> = - tokio::task::spawn(async move { + + let handle: tokio::task::JoinHandle>> = tokio::task::spawn( + async move { + let sendable = sender_stream.clone(); while let Some(maybe_batch) = stream.next().await { let batch = maybe_batch?; + checker_stream.check_batch(&batch).await?; let arr = super::cast::cast_record_batch( &batch, @@ -422,6 +429,12 @@ async fn write_execution_plan_with_predicate( safe_cast, schema_mode == Some(SchemaMode::Merge), )?; + + if let Some(s) = sendable.as_ref() { + let _ = s.send(arr.clone()).await; + } else { + debug!("write_execution_plan_with_predicate did not send any batches, no sender."); + } writer.write(&arr).await?; } let add_actions = writer.close().await; @@ -429,7 +442,8 @@ async fn write_execution_plan_with_predicate( Ok(actions) => Ok(actions.into_iter().map(Action::Add).collect::>()), Err(err) => Err(err), } - }); + }, + ); tasks.push(handle); } @@ -460,6 +474,7 @@ pub(crate) async fn write_execution_plan( safe_cast: bool, schema_mode: Option, writer_stats_config: WriterStatsConfig, + sender: Option>, ) -> DeltaResult> { write_execution_plan_with_predicate( None, @@ -474,6 +489,7 @@ pub(crate) async fn write_execution_plan( safe_cast, schema_mode, writer_stats_config, + sender, ) .await } @@ -522,6 +538,7 @@ async fn execute_non_empty_expr( false, None, writer_stats_config, + None, ) .await?; @@ -778,6 +795,7 @@ impl std::future::IntoFuture for WriteBuilder { this.safe_cast, this.schema_mode, writer_stats_config.clone(), + None, ) .await?; actions.extend(add_actions); @@ -1270,7 +1288,6 @@ mod tests { ], ) .unwrap(); - println!("new_batch: {:?}", new_batch.schema()); let table = DeltaOps(table) .write(vec![new_batch]) .with_save_mode(SaveMode::Append) From 6f5236527406dc2ebaf961aa18e6afa1138b8396 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Thu, 30 May 2024 13:46:57 +0000 Subject: [PATCH 2/5] chore: add a Python integration test This test has highlighted an apparent race condition when handling structs or lists in how excerpt() is treated by the CDCObserver. --- crates/core/src/operations/update.rs | 5 ++- crates/core/src/operations/write.rs | 4 ++- python/tests/test_writer.py | 48 ++++++++++++++++++++++++++-- 3 files changed, 53 insertions(+), 4 deletions(-) diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 6ca79c6244..9440942e2c 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -445,7 +445,10 @@ async fn execute( match tracker.collect().await { Ok(batches) => { - if !batches.is_empty() { + if batches.is_empty() { + debug!("CDCObserver collected zero batches"); + } + else { debug!( "Collected {} batches to write as part of this transaction:", batches.len() diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 84705c415d..c435a3df08 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -431,7 +431,9 @@ async fn write_execution_plan_with_predicate( )?; if let Some(s) = sendable.as_ref() { - let _ = s.send(arr.clone()).await; + if let Err(e) = s.send(arr.clone()).await { + error!("Failed to send data to observer: {e:#?}"); + } } else { debug!("write_execution_plan_with_predicate did not send any batches, no sender."); } diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 0a7e766cac..1e813318f8 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -5,6 +5,7 @@ import random import threading from datetime import date, datetime +from decimal import Decimal from math import inf from typing import Any, Dict, Iterable, List, Literal from unittest.mock import Mock @@ -1450,7 +1451,6 @@ def test_issue_1651_roundtrip_timestamp(tmp_path: pathlib.Path): @pytest.mark.parametrize("engine", ["rust", "pyarrow"]) def test_invalid_decimals(tmp_path: pathlib.Path, engine): import re - from decimal import Decimal data = pa.table( {"x": pa.array([Decimal("10000000000000000000000000000000000000.0")])} @@ -1558,7 +1558,6 @@ def test_empty(existing_table: DeltaTable): def test_rust_decimal_cast(tmp_path: pathlib.Path): import re - from decimal import Decimal data = pa.table({"x": pa.array([Decimal("100.1")])}) @@ -1729,3 +1728,48 @@ def test_parse_stats_with_new_schema(tmp_path, engine): write_deltalake( tmp_path, sample_data, mode="overwrite", schema_mode="overwrite", engine=engine ) + + +def test_roundtrip_cdc_evolution(tmp_path: pathlib.Path): + """ + This test is used as a CDC integration test from Python to ensure, + approximately, that CDC files are being written + """ + raw_commit = r"""{"metaData":{"id":"bb0fdeb2-76dd-4f5e-b1ea-845ecec8fa7e","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1713110303902}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":4,"writerFeatures":["changeDataFeed"]}} +""" + # timestampNtz looks like it might be an unnecessary requirement to write from Python + os.mkdir(os.path.join(tmp_path, "_delta_log")) + # This is a stupid hack to make sure we have a CDC capable table from the jump + with open( + os.path.join(tmp_path, "_delta_log", "00000000000000000000.json"), "w+" + ) as fd: + fd.write(raw_commit) + assert ("0" * 20 + ".json") in os.listdir(tmp_path / "_delta_log") + + # Make sure the _change_data doesn't exist + assert not os.path.isdir(os.path.join(tmp_path, "_change_data")) + + nrows = 5 + sample_data = pa.table( + { + "utf8": pa.array([str(x) for x in range(nrows)]), + "int64": pa.array(list(range(nrows)), pa.int64()), + # See + # "struct": pa.array([{"x": x, "y": str(x)} for x in range(nrows)]), + # "list": pa.array([list(range(x + 1)) for x in range(nrows)]), + } + ) + + write_deltalake( + tmp_path, sample_data, mode="append", schema_mode="merge", engine="rust" + ) + assert ("0" * 19 + "1.json") in os.listdir(tmp_path / "_delta_log") + + delta_table = DeltaTable(tmp_path) + delta_table.update(predicate="utf8 = '1'", updates={"utf8": "'hello world'"}) + + delta_table = DeltaTable(tmp_path) + print(os.listdir(tmp_path)) + # This is kind of a weak test to verify that CDFs were written + assert os.path.isdir(os.path.join(tmp_path, "_change_data")) From 03b7bb0d23beaff3bd52d4034163539a400c1163 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 1 Jun 2024 18:08:29 +0000 Subject: [PATCH 3/5] Ignore the some CDC related integration tests until upstream works See apache/datafusion#10749 --- crates/core/src/operations/cdc.rs | 173 ++++++++++++++++++++++++++- crates/core/src/operations/update.rs | 3 +- 2 files changed, 172 insertions(+), 4 deletions(-) diff --git a/crates/core/src/operations/cdc.rs b/crates/core/src/operations/cdc.rs index 365f4a649a..b6165d01d6 100644 --- a/crates/core/src/operations/cdc.rs +++ b/crates/core/src/operations/cdc.rs @@ -105,6 +105,7 @@ impl CDCTracker { true, false, )?; + debug!("prestream: {batch:?}"); let new_column = Arc::new(StringArray::from(vec![ Some("update_preimage"); batch.num_rows() @@ -124,6 +125,7 @@ impl CDCTracker { true, false, )?; + debug!("poststream: {batch:?}"); let new_column = Arc::new(StringArray::from(vec![ Some("update_postimage"); batch.num_rows() @@ -302,7 +304,7 @@ mod tests { use crate::kernel::{Action, PrimitiveType, Protocol}; use crate::operations::DeltaOps; use crate::{DeltaConfigKey, DeltaTable}; - use arrow::array::Int32Array; + use arrow::array::{ArrayRef, Int32Array, StructArray}; use datafusion::assert_batches_sorted_eq; /// A simple test which validates primitive writer version 1 tables should @@ -404,7 +406,7 @@ mod tests { let result = should_write_cdc(table.snapshot().unwrap()).expect("Failed to use table"); assert!( - result == true, + result, "A v7 table must not write CDC files unless the writer feature is set" ); } @@ -451,4 +453,171 @@ mod tests { } } } + + // This cannot be re-enabled until DataFrame.except() works: + #[ignore] + #[tokio::test] + async fn test_sanity_check_with_pure_df() { + let _ = pretty_env_logger::try_init(); + let nested_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("lat", DataType::Int32, true), + Field::new("long", DataType::Int32, true), + ])); + let schema = Arc::new(Schema::new(vec![ + Field::new("value", DataType::Int32, true), + Field::new( + "nested", + DataType::Struct(nested_schema.fields.clone()), + true, + ), + ])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])), + Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("id", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ( + Arc::new(Field::new("lat", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ( + Arc::new(Field::new("long", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ])), + ], + ) + .unwrap(); + + let updated_batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(12), Some(3)])), + Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("id", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ( + Arc::new(Field::new("lat", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ( + Arc::new(Field::new("long", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ])), + ], + ) + .unwrap(); + let _ = arrow::util::pretty::print_batches(&vec![batch.clone()]); + let _ = arrow::util::pretty::print_batches(&vec![updated_batch.clone()]); + + let ctx = SessionContext::new(); + let before = ctx.read_batch(batch).expect("Failed to make DataFrame"); + let after = ctx + .read_batch(updated_batch) + .expect("Failed to make DataFrame"); + + let diff = before + .except(after) + .expect("Failed to except") + .collect() + .await + .expect("Failed to diff"); + assert_eq!(diff.len(), 1); + } + + // This cannot be re-enabled until DataFrame.except() works: + #[ignore] + #[tokio::test] + async fn test_sanity_check_with_struct() { + let _ = pretty_env_logger::try_init(); + let nested_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("lat", DataType::Int32, true), + Field::new("long", DataType::Int32, true), + ])); + let schema = Arc::new(Schema::new(vec![ + Field::new("value", DataType::Int32, true), + Field::new( + "nested", + DataType::Struct(nested_schema.fields.clone()), + true, + ), + ])); + + let tracker = CDCTracker::new(schema.clone()); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])), + Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("id", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ( + Arc::new(Field::new("lat", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ( + Arc::new(Field::new("long", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ])), + ], + ) + .unwrap(); + + let updated_batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(12), Some(3)])), + Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("id", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ( + Arc::new(Field::new("lat", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ( + Arc::new(Field::new("long", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ])), + ], + ) + .unwrap(); + + let _ = tracker.pre_sender().send(batch).await; + let _ = tracker.post_sender().send(updated_batch).await; + + match tracker.collect().await { + Ok(batches) => { + let _ = arrow::util::pretty::print_batches(&batches); + assert_eq!(batches.len(), 2); + assert_batches_sorted_eq! {[ + "+-------+------------------+", + "| value | _change_type |", + "+-------+------------------+", + "| 2 | update_preimage |", + "| 12 | update_postimage |", + "+-------+------------------+", + ], &batches } + } + Err(err) => { + println!("err: {err:#?}"); + panic!("Should have never reached this assertion"); + } + } + } } diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 9440942e2c..9ec8519b9b 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -447,8 +447,7 @@ async fn execute( Ok(batches) => { if batches.is_empty() { debug!("CDCObserver collected zero batches"); - } - else { + } else { debug!( "Collected {} batches to write as part of this transaction:", batches.len() From a3b26007a97936a8909a0ea17d21e123e0730435 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 4 Jun 2024 04:22:17 +0000 Subject: [PATCH 4/5] chore: provide a warning if the schema contains fields which break CDC See #2568 --- crates/core/src/operations/cdc.rs | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/crates/core/src/operations/cdc.rs b/crates/core/src/operations/cdc.rs index b6165d01d6..8338bfa52b 100644 --- a/crates/core/src/operations/cdc.rs +++ b/crates/core/src/operations/cdc.rs @@ -89,6 +89,24 @@ impl CDCTracker { // Create a new schema which represents the input batch along with the CDC // columns let mut fields: Vec> = self.schema.fields().to_vec().clone(); + + let mut has_struct = false; + for field in fields.iter() { + match field.data_type() { + DataType::Struct(_) => { + has_struct = true; + } + DataType::List(_) => { + has_struct = true; + } + _ => {} + } + } + + if has_struct { + warn!("The schema contains a Struct or List type, which unfortunately means a change data file cannot be captured in this release of delta-rs: . The write operation will complete properly, but no CDC data will be generated for schema: {fields:?}"); + } + fields.push(Arc::new(Field::new("_change_type", DataType::Utf8, true))); let schema = Arc::new(Schema::new(fields)); @@ -458,7 +476,6 @@ mod tests { #[ignore] #[tokio::test] async fn test_sanity_check_with_pure_df() { - let _ = pretty_env_logger::try_init(); let nested_schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, true), Field::new("lat", DataType::Int32, true), @@ -537,7 +554,6 @@ mod tests { #[ignore] #[tokio::test] async fn test_sanity_check_with_struct() { - let _ = pretty_env_logger::try_init(); let nested_schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, true), Field::new("lat", DataType::Int32, true), From 87c01cc8f549f5eac756b8a2812a70e620cbb019 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 4 Jun 2024 05:28:52 +0000 Subject: [PATCH 5/5] Allow writers lower than minWriterVersion 7 to be gracefully supported Basically for older minWriterVersions we don't have to really worry about generated columns unless an expression has been set, in which case we must fail to write since we cannot honor generationExpression --- .../src/operations/transaction/protocol.rs | 101 ++++++++++++++++-- 1 file changed, 90 insertions(+), 11 deletions(-) diff --git a/crates/core/src/operations/transaction/protocol.rs b/crates/core/src/operations/transaction/protocol.rs index 707f1daf02..ac5bab7738 100644 --- a/crates/core/src/operations/transaction/protocol.rs +++ b/crates/core/src/operations/transaction/protocol.rs @@ -2,6 +2,7 @@ use std::collections::HashSet; use lazy_static::lazy_static; use once_cell::sync::Lazy; +use tracing::log::*; use super::{TableReference, TransactionError}; use crate::kernel::{ @@ -148,17 +149,33 @@ impl ProtocolChecker { pub fn can_write_to(&self, snapshot: &dyn TableReference) -> Result<(), TransactionError> { // NOTE: writers must always support all required reader features self.can_read_from(snapshot)?; + let min_writer_version = snapshot.protocol().min_writer_version; + + let required_features: Option<&HashSet> = match min_writer_version { + 0 | 1 => None, + 2 => Some(&WRITER_V2), + 3 => Some(&WRITER_V3), + 4 => Some(&WRITER_V4), + 5 => Some(&WRITER_V5), + 6 => Some(&WRITER_V6), + _ => snapshot.protocol().writer_features.as_ref(), + }; - let required_features: Option<&HashSet> = - match snapshot.protocol().min_writer_version { - 0 | 1 => None, - 2 => Some(&WRITER_V2), - 3 => Some(&WRITER_V3), - 4 => Some(&WRITER_V4), - 5 => Some(&WRITER_V5), - 6 => Some(&WRITER_V6), - _ => snapshot.protocol().writer_features.as_ref(), - }; + if (4..7).contains(&min_writer_version) { + debug!("min_writer_version is less 4-6, checking for unsupported table features"); + if let Ok(schema) = snapshot.metadata().schema() { + for field in schema.fields.iter() { + if field.metadata.contains_key( + crate::kernel::ColumnMetadataKey::GenerationExpression.as_ref(), + ) { + error!("The table contains `delta.generationExpression` settings on columns which mean this table cannot be currently written to by delta-rs"); + return Err(TransactionError::UnsupportedWriterFeatures(vec![ + WriterFeatures::GeneratedColumns, + ])); + } + } + } + } if let Some(features) = required_features { let mut diff = features.difference(&self.writer_features).peekable(); @@ -250,7 +267,8 @@ pub static INSTANCE: Lazy = Lazy::new(|| { mod tests { use super::super::test_utils::create_metadata_action; use super::*; - use crate::kernel::{Action, Add, Protocol, Remove}; + use crate::kernel::DataType as DeltaDataType; + use crate::kernel::{Action, Add, PrimitiveType, Protocol, Remove}; use crate::protocol::SaveMode; use crate::table::state::DeltaTableState; use crate::DeltaConfigKey; @@ -559,4 +577,65 @@ mod tests { assert!(checker_7.can_read_from(eager_7).is_ok()); assert!(checker_7.can_write_to(eager_7).is_ok()); } + + #[tokio::test] + async fn test_minwriter_v4_with_cdf() { + let checker_5 = ProtocolChecker::new(READER_V2.clone(), WRITER_V4.clone()); + let actions = vec![ + Action::Protocol( + Protocol::new(2, 4) + .with_writer_features(vec![crate::kernel::WriterFeatures::ChangeDataFeed]), + ), + create_metadata_action(None, Some(HashMap::new())), + ]; + let snapshot_5 = DeltaTableState::from_actions(actions).unwrap(); + let eager_5 = snapshot_5.snapshot(); + assert!(checker_5.can_write_to(eager_5).is_ok()); + } + + /// Technically we do not yet support generated columns, but it is okay to "accept" writing to + /// a column with minWriterVersion=4 and the generated columns feature as long as the + /// `delta.generationExpression` isn't actually defined the write is still allowed + #[tokio::test] + async fn test_minwriter_v4_with_generated_columns() { + let checker_5 = ProtocolChecker::new(READER_V2.clone(), WRITER_V4.clone()); + let actions = vec![ + Action::Protocol( + Protocol::new(2, 4) + .with_writer_features(vec![crate::kernel::WriterFeatures::GeneratedColumns]), + ), + create_metadata_action(None, Some(HashMap::new())), + ]; + let snapshot_5 = DeltaTableState::from_actions(actions).unwrap(); + let eager_5 = snapshot_5.snapshot(); + assert!(checker_5.can_write_to(eager_5).is_ok()); + } + + #[tokio::test] + async fn test_minwriter_v4_with_generated_columns_and_expressions() { + let checker_5 = ProtocolChecker::new(READER_V2.clone(), WRITER_V4.clone()); + let actions = vec![Action::Protocol(Protocol::new(2, 4).with_writer_features( + vec![crate::kernel::WriterFeatures::GeneratedColumns], + ))]; + + let table: crate::DeltaTable = crate::DeltaOps::new_in_memory() + .create() + .with_column( + "value", + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + Some(HashMap::from([( + "delta.generationExpression".into(), + "x IS TRUE".into(), + )])), + ) + .with_actions(actions) + .with_configuration_property(DeltaConfigKey::EnableChangeDataFeed, Some("true")) + .await + .expect("failed to make a version 4 table with EnableChangeDataFeed"); + let eager_5 = table + .snapshot() + .expect("Failed to get snapshot from test table"); + assert!(checker_5.can_write_to(eager_5).is_err()); + } }