diff --git a/crates/core/src/kernel/models/actions.rs b/crates/core/src/kernel/models/actions.rs index 6d09c1558f..962b71b21b 100644 --- a/crates/core/src/kernel/models/actions.rs +++ b/crates/core/src/kernel/models/actions.rs @@ -5,6 +5,8 @@ use std::str::FromStr; // use std::sync::Arc; // use roaring::RoaringTreemap; +use crate::DeltaConfigKey; +use maplit::hashset; use serde::{Deserialize, Serialize}; use tracing::warn; use url::Url; @@ -137,30 +139,243 @@ pub struct Protocol { impl Protocol { /// Create a new protocol action - pub fn new(min_reader_version: i32, min_wrriter_version: i32) -> Self { + pub fn new(min_reader_version: i32, min_writer_version: i32) -> Self { Self { min_reader_version, - min_writer_version: min_wrriter_version, + min_writer_version, reader_features: None, writer_features: None, } } - /// set the reader features in the protocol action + /// set the reader features in the protocol action, automatically bumps min_reader_version pub fn with_reader_features( mut self, reader_features: impl IntoIterator>, ) -> Self { - self.reader_features = Some(reader_features.into_iter().map(|c| c.into()).collect()); + let all_reader_features = reader_features + .into_iter() + .map(Into::into) + .collect::>(); + if !all_reader_features.is_empty() { + self.min_reader_version = 3 + } + self.reader_features = Some(all_reader_features); self } - /// set the writer features in the protocol action + /// set the writer features in the protocol action, automatically bumps min_writer_version pub fn with_writer_features( mut self, writer_features: impl IntoIterator>, ) -> Self { - self.writer_features = Some(writer_features.into_iter().map(|c| c.into()).collect()); + let all_writer_feautures = writer_features + .into_iter() + .map(|c| c.into()) + .collect::>(); + if !all_writer_feautures.is_empty() { + self.min_writer_version = 7 + } + self.writer_features = Some(all_writer_feautures); + self + } + + /// Converts existing properties into features if the reader_version is >=3 or writer_version >=3 + /// only converts features that are "true" + pub fn move_table_properties_into_features( + mut self, + configuration: &HashMap>, + ) -> Protocol { + if self.min_writer_version >= 7 { + let mut converted_writer_features = configuration + .iter() + .filter(|(_, value)| { + value.as_ref().map_or(false, |v| { + v.to_ascii_lowercase().parse::().is_ok_and(|v| v) + }) + }) + .collect::>>() + .keys() + .map(|key| (*key).clone().into()) + .filter(|v| !matches!(v, WriterFeatures::Other(_))) + .collect::>(); + + if configuration + .keys() + .any(|v| v.starts_with("delta.constraints.")) + { + converted_writer_features.insert(WriterFeatures::CheckConstraints); + } + + match self.writer_features { + Some(mut features) => { + features.extend(converted_writer_features); + self.writer_features = Some(features); + } + None => self.writer_features = Some(converted_writer_features), + } + } + if self.min_reader_version > 3 { + let converted_reader_features = configuration + .iter() + .filter(|(_, value)| { + value.as_ref().map_or(false, |v| { + v.to_ascii_lowercase().parse::().is_ok_and(|v| v) + }) + }) + .map(|(key, _)| (*key).clone().into()) + .filter(|v| !matches!(v, ReaderFeatures::Other(_))) + .collect::>(); + match self.reader_features { + Some(mut features) => { + features.extend(converted_reader_features); + self.reader_features = Some(features); + } + None => self.reader_features = Some(converted_reader_features), + } + } + self + } + /// Will apply the properties to the protocol by either bumping the version or setting + /// features + pub fn apply_properties_to_protocol( + mut self, + new_properties: &HashMap, + raise_if_not_exists: bool, + ) -> DeltaResult { + let mut parsed_properties: HashMap = HashMap::new(); + + for (key, value) in new_properties { + if let Ok(parsed_key) = key.parse::() { + parsed_properties.insert(parsed_key, value.to_string()); + } else if raise_if_not_exists { + return Err(Error::Generic(format!( + "Error parsing property '{}':'{}'", + key, value + ))); + } + } + + // Check and update delta.minReaderVersion + if let Some(min_reader_version) = parsed_properties.get(&DeltaConfigKey::MinReaderVersion) { + let new_min_reader_version = min_reader_version.parse::(); + match new_min_reader_version { + Ok(version) => match version { + 1..=3 => { + if version > self.min_reader_version { + self.min_reader_version = version + } + } + _ => { + return Err(Error::Generic(format!( + "delta.minReaderVersion = '{}' is invalid, valid values are ['1','2','3']", + min_reader_version + ))) + } + }, + Err(_) => { + return Err(Error::Generic(format!( + "delta.minReaderVersion = '{}' is invalid, valid values are ['1','2','3']", + min_reader_version + ))) + } + } + } + + // Check and update delta.minWriterVersion + if let Some(min_writer_version) = parsed_properties.get(&DeltaConfigKey::MinWriterVersion) { + let new_min_writer_version = min_writer_version.parse::(); + match new_min_writer_version { + Ok(version) => match version { + 2..=7 => { + if version > self.min_writer_version { + self.min_writer_version = version + } + } + _ => { + return Err(Error::Generic(format!( + "delta.minWriterVersion = '{}' is invalid, valid values are ['2','3','4','5','6','7']", + min_writer_version + ))) + } + }, + Err(_) => { + return Err(Error::Generic(format!( + "delta.minWriterVersion = '{}' is invalid, valid values are ['2','3','4','5','6','7']", + min_writer_version + ))) + } + } + } + + // Check enableChangeDataFeed and bump protocol or add writerFeature if writer versions is >=7 + if let Some(enable_cdf) = parsed_properties.get(&DeltaConfigKey::EnableChangeDataFeed) { + let if_enable_cdf = enable_cdf.to_ascii_lowercase().parse::(); + match if_enable_cdf { + Ok(true) => { + if self.min_writer_version >= 7 { + match self.writer_features { + Some(mut features) => { + features.insert(WriterFeatures::ChangeDataFeed); + self.writer_features = Some(features); + } + None => { + self.writer_features = + Some(hashset! {WriterFeatures::ChangeDataFeed}) + } + } + } else if self.min_writer_version <= 3 { + self.min_writer_version = 4 + } + } + Ok(false) => {} + _ => { + return Err(Error::Generic(format!( + "delta.enableChangeDataFeed = '{}' is invalid, valid values are ['true']", + enable_cdf + ))) + } + } + } + + if let Some(enable_dv) = parsed_properties.get(&DeltaConfigKey::EnableDeletionVectors) { + let if_enable_dv = enable_dv.to_ascii_lowercase().parse::(); + match if_enable_dv { + Ok(true) => { + let writer_features = match self.writer_features { + Some(mut features) => { + features.insert(WriterFeatures::DeletionVectors); + features + } + None => hashset! {WriterFeatures::DeletionVectors}, + }; + let reader_features = match self.reader_features { + Some(mut features) => { + features.insert(ReaderFeatures::DeletionVectors); + features + } + None => hashset! {ReaderFeatures::DeletionVectors}, + }; + self.min_reader_version = 3; + self.min_writer_version = 7; + self.writer_features = Some(writer_features); + self.reader_features = Some(reader_features); + } + Ok(false) => {} + _ => { + return Err(Error::Generic(format!( + "delta.enableDeletionVectors = '{}' is invalid, valid values are ['true']", + enable_dv + ))) + } + } + } + Ok(self) + } + /// Enable timestamp_ntz in the protocol + pub fn enable_timestamp_ntz(mut self) -> Protocol { + self = self.with_reader_features(vec![ReaderFeatures::TimestampWithoutTimezone]); + self = self.with_writer_features(vec![WriterFeatures::TimestampWithoutTimezone]); self } } diff --git a/crates/core/src/operations/add_column.rs b/crates/core/src/operations/add_column.rs new file mode 100644 index 0000000000..028a6e5b2e --- /dev/null +++ b/crates/core/src/operations/add_column.rs @@ -0,0 +1,114 @@ +//! Add a new column to a table + +use delta_kernel::schema::StructType; +use futures::future::BoxFuture; +use itertools::Itertools; + +use super::cast::merge_struct; +use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; + +use crate::kernel::StructField; +use crate::logstore::LogStoreRef; +use crate::protocol::DeltaOperation; +use crate::table::state::DeltaTableState; +use crate::{DeltaResult, DeltaTable, DeltaTableError}; + +/// Add new columns and/or nested fields to a table +pub struct AddColumnBuilder { + /// A snapshot of the table's state + snapshot: DeltaTableState, + /// Fields to add/merge into schema + fields: Option>, + /// Delta object store for handling data files + log_store: LogStoreRef, + /// Additional information to add to the commit + commit_properties: CommitProperties, +} + +impl super::Operation<()> for AddColumnBuilder {} + +impl AddColumnBuilder { + /// Create a new builder + pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + Self { + snapshot, + log_store, + fields: None, + commit_properties: CommitProperties::default(), + } + } + + /// Specify the fields to be added + pub fn with_fields(mut self, fields: impl IntoIterator + Clone) -> Self { + self.fields = Some(fields.into_iter().collect()); + self + } + /// Additional metadata to be added to commit info + pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self { + self.commit_properties = commit_properties; + self + } +} + +impl std::future::IntoFuture for AddColumnBuilder { + type Output = DeltaResult; + + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + let this = self; + + Box::pin(async move { + let mut metadata = this.snapshot.metadata().clone(); + let fields = match this.fields { + Some(v) => v, + None => return Err(DeltaTableError::Generic("No fields provided".to_string())), + }; + + let fields_right = &StructType::new(fields.clone()); + let table_schema = this.snapshot.schema(); + let new_table_schema = merge_struct(table_schema, fields_right)?; + + // TODO(ion): Think of a way how we can simply this checking through the API or centralize some checks. + let contains_timestampntz = PROTOCOL.contains_timestampntz(fields.iter()); + let protocol = this.snapshot.protocol(); + + let maybe_new_protocol = if contains_timestampntz { + let updated_protocol = protocol.clone().enable_timestamp_ntz(); + if !(protocol.min_reader_version == 3 && protocol.min_writer_version == 7) { + // Convert existing properties to features since we advanced the protocol to v3,7 + Some( + updated_protocol + .move_table_properties_into_features(&metadata.configuration), + ) + } else { + Some(updated_protocol) + } + } else { + None + }; + + let operation = DeltaOperation::AddColumn { + fields: fields.into_iter().collect_vec(), + }; + + metadata.schema_string = serde_json::to_string(&new_table_schema)?; + + let mut actions = vec![metadata.into()]; + + if let Some(new_protocol) = maybe_new_protocol { + actions.push(new_protocol.into()) + } + + let commit = CommitBuilder::from(this.commit_properties) + .with_actions(actions) + .build(Some(&this.snapshot), this.log_store.clone(), operation) + .await?; + + Ok(DeltaTable::new_with_state( + this.log_store, + commit.snapshot(), + )) + }) + } +} diff --git a/crates/core/src/operations/create.rs b/crates/core/src/operations/create.rs index e53ec43c95..63b6995f9b 100644 --- a/crates/core/src/operations/create.rs +++ b/crates/core/src/operations/create.rs @@ -15,9 +15,6 @@ use crate::kernel::{ Action, DataType, Metadata, Protocol, ReaderFeatures, StructField, StructType, WriterFeatures, }; use crate::logstore::{LogStore, LogStoreRef}; -use crate::operations::set_tbl_properties::{ - apply_properties_to_protocol, convert_properties_to_features, -}; use crate::protocol::{DeltaOperation, SaveMode}; use crate::table::builder::ensure_table_uri; use crate::table::config::DeltaConfigKey; @@ -298,8 +295,7 @@ impl CreateBuilder { }) .unwrap_or_else(|| current_protocol); - let protocol = apply_properties_to_protocol( - &protocol, + let protocol = protocol.apply_properties_to_protocol( &configuration .iter() .map(|(k, v)| (k.clone(), v.clone().unwrap())) @@ -307,7 +303,7 @@ impl CreateBuilder { self.raise_if_key_not_exists, )?; - let protocol = convert_properties_to_features(protocol, &configuration); + let protocol = protocol.move_table_properties_into_features(&configuration); let mut metadata = Metadata::try_new( StructType::new(self.columns), diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 761ebd7b4e..bf164c01fb 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -7,6 +7,7 @@ //! with a [data stream][datafusion::physical_plan::SendableRecordBatchStream], //! if the operation returns data as well. +use self::add_column::AddColumnBuilder; use self::create::CreateBuilder; use self::filesystem_check::FileSystemCheckBuilder; use self::vacuum::VacuumBuilder; @@ -15,6 +16,7 @@ use crate::table::builder::DeltaTableBuilder; use crate::DeltaTable; use std::collections::HashMap; +pub mod add_column; pub mod cast; pub mod convert_to_delta; pub mod create; @@ -228,6 +230,11 @@ impl DeltaOps { pub fn set_tbl_properties(self) -> SetTablePropertiesBuilder { SetTablePropertiesBuilder::new(self.0.log_store, self.0.state.unwrap()) } + + /// Add new columns + pub fn add_columns(self) -> AddColumnBuilder { + AddColumnBuilder::new(self.0.log_store, self.0.state.unwrap()) + } } impl From for DeltaOps { diff --git a/crates/core/src/operations/set_tbl_properties.rs b/crates/core/src/operations/set_tbl_properties.rs index e0c4ea2e9a..b3ca7607ac 100644 --- a/crates/core/src/operations/set_tbl_properties.rs +++ b/crates/core/src/operations/set_tbl_properties.rs @@ -1,18 +1,16 @@ //! Set table properties on a table -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use futures::future::BoxFuture; -use maplit::hashset; use super::transaction::{CommitBuilder, CommitProperties}; -use crate::kernel::{Action, Protocol, ReaderFeatures, WriterFeatures}; +use crate::kernel::Action; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; -use crate::DeltaConfigKey; +use crate::DeltaResult; use crate::DeltaTable; -use crate::{DeltaResult, DeltaTableError}; /// Remove constraints from the table pub struct SetTablePropertiesBuilder { @@ -59,203 +57,6 @@ impl SetTablePropertiesBuilder { } } -/// Will apply the properties to the protocol by either bumping the version or setting -/// features -pub fn apply_properties_to_protocol( - current_protocol: &Protocol, - new_properties: &HashMap, - raise_if_not_exists: bool, -) -> DeltaResult { - let mut parsed_properties: HashMap = HashMap::new(); - - for (key, value) in new_properties { - if let Ok(parsed_key) = key.parse::() { - parsed_properties.insert(parsed_key, value.to_string()); - } else if raise_if_not_exists { - return Err(DeltaTableError::Generic(format!( - "Error parsing property '{}':'{}'", - key, value - ))); - } - } - - let mut new_protocol = current_protocol.clone(); - - // Check and update delta.minReaderVersion - if let Some(min_reader_version) = parsed_properties.get(&DeltaConfigKey::MinReaderVersion) { - let new_min_reader_version = min_reader_version.parse::(); - match new_min_reader_version { - Ok(version) => match version { - 1..=3 => { - if version > new_protocol.min_reader_version { - new_protocol.min_reader_version = version - } - } - _ => { - return Err(DeltaTableError::Generic(format!( - "delta.minReaderVersion = '{}' is invalid, valid values are ['1','2','3']", - min_reader_version - ))) - } - }, - Err(_) => { - return Err(DeltaTableError::Generic(format!( - "delta.minReaderVersion = '{}' is invalid, valid values are ['1','2','3']", - min_reader_version - ))) - } - } - } - - // Check and update delta.minWriterVersion - if let Some(min_writer_version) = parsed_properties.get(&DeltaConfigKey::MinWriterVersion) { - let new_min_writer_version = min_writer_version.parse::(); - match new_min_writer_version { - Ok(version) => match version { - 2..=7 => { - if version > new_protocol.min_writer_version { - new_protocol.min_writer_version = version - } - } - _ => { - return Err(DeltaTableError::Generic(format!( - "delta.minWriterVersion = '{}' is invalid, valid values are ['2','3','4','5','6','7']", - min_writer_version - ))) - } - }, - Err(_) => { - return Err(DeltaTableError::Generic(format!( - "delta.minWriterVersion = '{}' is invalid, valid values are ['2','3','4','5','6','7']", - min_writer_version - ))) - } - } - } - - // Check enableChangeDataFeed and bump protocol or add writerFeature if writer versions is >=7 - if let Some(enable_cdf) = parsed_properties.get(&DeltaConfigKey::EnableChangeDataFeed) { - let if_enable_cdf = enable_cdf.to_ascii_lowercase().parse::(); - match if_enable_cdf { - Ok(true) => { - if new_protocol.min_writer_version >= 7 { - match new_protocol.writer_features { - Some(mut features) => { - features.insert(WriterFeatures::ChangeDataFeed); - new_protocol.writer_features = Some(features); - } - None => { - new_protocol.writer_features = - Some(hashset! {WriterFeatures::ChangeDataFeed}) - } - } - } else if new_protocol.min_writer_version <= 3 { - new_protocol.min_writer_version = 4 - } - } - Ok(false) => {} - _ => { - return Err(DeltaTableError::Generic(format!( - "delta.enableChangeDataFeed = '{}' is invalid, valid values are ['true']", - enable_cdf - ))) - } - } - } - - if let Some(enable_dv) = parsed_properties.get(&DeltaConfigKey::EnableDeletionVectors) { - let if_enable_dv = enable_dv.to_ascii_lowercase().parse::(); - match if_enable_dv { - Ok(true) => { - let writer_features = match new_protocol.writer_features { - Some(mut features) => { - features.insert(WriterFeatures::DeletionVectors); - features - } - None => hashset! {WriterFeatures::DeletionVectors}, - }; - let reader_features = match new_protocol.reader_features { - Some(mut features) => { - features.insert(ReaderFeatures::DeletionVectors); - features - } - None => hashset! {ReaderFeatures::DeletionVectors}, - }; - new_protocol.min_reader_version = 3; - new_protocol.min_writer_version = 7; - new_protocol.writer_features = Some(writer_features); - new_protocol.reader_features = Some(reader_features); - } - Ok(false) => {} - _ => { - return Err(DeltaTableError::Generic(format!( - "delta.enableDeletionVectors = '{}' is invalid, valid values are ['true']", - enable_dv - ))) - } - } - } - - Ok(new_protocol) -} - -/// Converts existing properties into features if the reader_version is >=3 or writer_version >=3 -/// only converts features that are "true" -pub fn convert_properties_to_features( - mut new_protocol: Protocol, - configuration: &HashMap>, -) -> Protocol { - if new_protocol.min_writer_version >= 7 { - let mut converted_writer_features = configuration - .iter() - .filter(|(_, value)| { - value.as_ref().map_or(false, |v| { - v.to_ascii_lowercase().parse::().is_ok_and(|v| v) - }) - }) - .collect::>>() - .keys() - .map(|key| (*key).clone().into()) - .filter(|v| !matches!(v, WriterFeatures::Other(_))) - .collect::>(); - - if configuration - .keys() - .any(|v| v.contains("delta.constraints.")) - { - converted_writer_features.insert(WriterFeatures::CheckConstraints); - } - - match new_protocol.writer_features { - Some(mut features) => { - features.extend(converted_writer_features); - new_protocol.writer_features = Some(features); - } - None => new_protocol.writer_features = Some(converted_writer_features), - } - } - if new_protocol.min_reader_version >= 3 { - let converted_reader_features = configuration - .iter() - .filter(|(_, value)| { - value.as_ref().map_or(false, |v| { - v.to_ascii_lowercase().parse::().is_ok_and(|v| v) - }) - }) - .map(|(key, _)| (*key).clone().into()) - .filter(|v| !matches!(v, ReaderFeatures::Other(_))) - .collect::>(); - match new_protocol.reader_features { - Some(mut features) => { - features.extend(converted_reader_features); - new_protocol.reader_features = Some(features); - } - None => new_protocol.reader_features = Some(converted_reader_features), - } - } - new_protocol -} - impl std::future::IntoFuture for SetTablePropertiesBuilder { type Output = DeltaResult; @@ -270,11 +71,9 @@ impl std::future::IntoFuture for SetTablePropertiesBuilder { let current_protocol = this.snapshot.protocol(); let properties = this.properties; - let new_protocol = apply_properties_to_protocol( - current_protocol, - &properties, - this.raise_if_not_exists, - )?; + let new_protocol = current_protocol + .clone() + .apply_properties_to_protocol(&properties, this.raise_if_not_exists)?; metadata.configuration.extend( properties @@ -285,7 +84,7 @@ impl std::future::IntoFuture for SetTablePropertiesBuilder { ); let final_protocol = - convert_properties_to_features(new_protocol, &metadata.configuration); + new_protocol.move_table_properties_into_features(&metadata.configuration); let operation = DeltaOperation::SetTableProperties { properties }; diff --git a/crates/core/src/operations/transaction/protocol.rs b/crates/core/src/operations/transaction/protocol.rs index d1ab9269bc..f3bb87098a 100644 --- a/crates/core/src/operations/transaction/protocol.rs +++ b/crates/core/src/operations/transaction/protocol.rs @@ -613,9 +613,7 @@ mod tests { #[tokio::test] async fn test_minwriter_v4_with_generated_columns_and_expressions() { let checker_5 = ProtocolChecker::new(READER_V2.clone(), WRITER_V4.clone()); - let actions = vec![Action::Protocol(Protocol::new(2, 4).with_writer_features( - vec![crate::kernel::WriterFeatures::GeneratedColumns], - ))]; + let actions = vec![Action::Protocol(Protocol::new(2, 4))]; let table: crate::DeltaTable = crate::DeltaOps::new_in_memory() .create() diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 70ccc8c14e..024ea9bcad 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -288,17 +288,20 @@ impl WriteBuilder { Some(snapshot) => { PROTOCOL.can_write_to(snapshot)?; - if let Some(plan) = &self.input { - let schema: StructType = (plan.schema()).try_into()?; - PROTOCOL.check_can_write_timestamp_ntz(snapshot, &schema)?; + let schema: StructType = if let Some(plan) = &self.input { + (plan.schema()).try_into()? } else if let Some(batches) = &self.batches { if batches.is_empty() { return Err(WriteError::MissingData.into()); } - let schema: StructType = (batches[0].schema()).try_into()?; + (batches[0].schema()).try_into()? + } else { + return Err(WriteError::MissingData.into()); + }; + + if self.schema_mode.is_none() { PROTOCOL.check_can_write_timestamp_ntz(snapshot, &schema)?; } - match self.mode { SaveMode::ErrorIfExists => { Err(WriteError::AlreadyExists(self.log_store.root_uri()).into()) @@ -796,12 +799,38 @@ impl std::future::IntoFuture for WriteBuilder { if this.schema_mode == Some(SchemaMode::Merge) && schema_drift { if let Some(snapshot) = &this.snapshot { let schema_struct: StructType = schema.clone().try_into()?; + let current_protocol = snapshot.protocol(); + let configuration = snapshot.metadata().configuration.clone(); + let maybe_new_protocol = if PROTOCOL + .contains_timestampntz(schema_struct.fields()) + && !current_protocol + .reader_features + .clone() + .unwrap_or_default() + .contains(&crate::kernel::ReaderFeatures::TimestampWithoutTimezone) + // We can check only reader features, as reader and writer timestampNtz + // should be always enabled together + { + let new_protocol = current_protocol.clone().enable_timestamp_ntz(); + if !(current_protocol.min_reader_version == 3 + && current_protocol.min_writer_version == 7) + { + Some(new_protocol.move_table_properties_into_features(&configuration)) + } else { + Some(new_protocol) + } + } else { + None + }; let schema_action = Action::Metadata(Metadata::try_new( schema_struct, partition_columns.clone(), - snapshot.metadata().configuration.clone(), + configuration, )?); actions.push(schema_action); + if let Some(new_protocol) = maybe_new_protocol { + actions.push(new_protocol.into()) + } } } let state = match this.state { @@ -869,6 +898,34 @@ impl std::future::IntoFuture for WriteBuilder { .or_else(|_| snapshot.arrow_schema()) .unwrap_or(schema.clone()); + let configuration = snapshot.metadata().configuration.clone(); + let current_protocol = snapshot.protocol(); + let maybe_new_protocol = if PROTOCOL.contains_timestampntz( + TryInto::::try_into(schema.clone())?.fields(), + ) && !current_protocol + .reader_features + .clone() + .unwrap_or_default() + .contains(&crate::kernel::ReaderFeatures::TimestampWithoutTimezone) + // We can check only reader features, as reader and writer timestampNtz + // should be always enabled together + { + let new_protocol = current_protocol.clone().enable_timestamp_ntz(); + if !(current_protocol.min_reader_version == 3 + && current_protocol.min_writer_version == 7) + { + Some(new_protocol.move_table_properties_into_features(&configuration)) + } else { + Some(new_protocol) + } + } else { + None + }; + + if let Some(protocol) = maybe_new_protocol { + actions.push(protocol.into()) + } + if schema != table_schema { let mut metadata = snapshot.metadata().clone(); let delta_schema: StructType = schema.as_ref().try_into()?; diff --git a/crates/core/src/protocol/mod.rs b/crates/core/src/protocol/mod.rs index 9cfa429fde..ce6ef0e8b0 100644 --- a/crates/core/src/protocol/mod.rs +++ b/crates/core/src/protocol/mod.rs @@ -21,7 +21,7 @@ use std::str::FromStr; use tracing::{debug, error}; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove}; +use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove, StructField}; use crate::logstore::LogStore; use crate::table::CheckPoint; @@ -326,6 +326,13 @@ pub struct MergePredicate { #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub enum DeltaOperation { + /// Represents a Delta `Add Column` operation. + /// Used to add new columns or field in a struct + AddColumn { + /// Fields added to existing schema + fields: Vec, + }, + /// Represents a Delta `Create` operation. /// Would usually only create the table, if also data is written, /// a `Write` operations is more appropriate @@ -458,6 +465,7 @@ impl DeltaOperation { pub fn name(&self) -> &str { // operation names taken from https://learn.microsoft.com/en-us/azure/databricks/delta/history#--operation-metrics-keys match &self { + DeltaOperation::AddColumn { .. } => "ADD COLUMN", DeltaOperation::Create { mode: SaveMode::Overwrite, .. @@ -513,6 +521,7 @@ impl DeltaOperation { match self { Self::Optimize { .. } | Self::SetTableProperties { .. } + | Self::AddColumn { .. } | Self::VacuumStart { .. } | Self::VacuumEnd { .. } | Self::AddConstraint { .. } diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 23ed7e7ffa..ff55d8f95e 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -82,6 +82,12 @@ class RawDeltaTable: custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], ) -> str: ... + def add_columns( + self, + fields: List[Field], + custom_metadata: Optional[Dict[str, str]], + post_commithook_properties: Optional[Dict[str, Optional[bool]]], + ) -> None: ... def add_constraints( self, constraints: Dict[str, str], diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 9aa48ef6e2..2ae17f5f9c 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -46,6 +46,7 @@ from deltalake.data_catalog import DataCatalog from deltalake.exceptions import DeltaProtocolError from deltalake.fs import DeltaStorageHandler +from deltalake.schema import Field as DeltaField from deltalake.schema import Schema as DeltaSchema try: @@ -1800,6 +1801,42 @@ class TableAlterer: def __init__(self, table: DeltaTable) -> None: self.table = table + def add_columns( + self, + fields: Union[DeltaField, List[DeltaField]], + custom_metadata: Optional[Dict[str, str]] = None, + post_commithook_properties: Optional[PostCommitHookProperties] = None, + ) -> None: + """Add new columns and/or update the fields of a stuctcolumn + + Args: + fields: fields to merge into schema + custom_metadata: custom metadata that will be added to the transaction commit. + post_commithook_properties: properties for the post commit hook. If None, default values are used. + + Example: + ```python + from deltalake import DeltaTable + from deltalake.schema import Field, PrimitiveType, StructType + dt = DeltaTable("test_table") + new_fields = [ + Field("baz", StructType([Field("bar", PrimitiveType("integer"))])), + Field("bar", PrimitiveType("integer")) + ] + dt.alter.add_columns( + new_fields + ) + ``` + """ + if isinstance(fields, DeltaField): + fields = [fields] + + self.table._table.add_columns( + fields, + custom_metadata, + post_commithook_properties.__dict__ if post_commithook_properties else None, + ) + def add_constraint( self, constraints: Dict[str, str], diff --git a/python/src/lib.rs b/python/src/lib.rs index c90d116c37..41cdb37ccb 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -13,6 +13,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use arrow::pyarrow::PyArrowType; use chrono::{DateTime, Duration, FixedOffset, Utc}; use delta_kernel::expressions::Scalar; +use delta_kernel::schema::StructField; use deltalake::arrow::compute::concat_batches; use deltalake::arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; use deltalake::arrow::record_batch::RecordBatchReader; @@ -28,6 +29,7 @@ use deltalake::errors::DeltaTableError; use deltalake::kernel::{ scalars::ScalarExt, Action, Add, Invariant, LogicalFile, Remove, StructType, }; +use deltalake::operations::add_column::AddColumnBuilder; use deltalake::operations::collect_sendable_stream; use deltalake::operations::constraints::ConstraintBuilder; use deltalake::operations::convert_to_delta::{ConvertToDeltaBuilder, PartitionStrategy}; @@ -61,7 +63,7 @@ use serde_json::{Map, Value}; use crate::error::DeltaProtocolError; use crate::error::PythonError; use crate::filesystem::FsConfig; -use crate::schema::schema_to_pyobject; +use crate::schema::{schema_to_pyobject, Field}; use crate::utils::rt; #[derive(FromPyObject)] @@ -360,20 +362,11 @@ impl RawDeltaTable { cmd = cmd.with_retention_period(Duration::hours(retention_period as i64)); } - if custom_metadata.is_some() || post_commithook_properties.is_some() { - let mut commit_properties = CommitProperties::default(); - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - commit_properties = commit_properties.with_metadata(json_metadata); - }; - - if let Some(post_commit_hook_props) = post_commithook_properties { - commit_properties = - set_post_commithook_properties(commit_properties, post_commit_hook_props) - } + if let Some(commit_properties) = + maybe_create_commit_properties(custom_metadata, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); - }; + } rt().block_on(cmd.into_future()).map_err(PythonError::from) })?; self._table.state = table.state; @@ -382,6 +375,7 @@ impl RawDeltaTable { /// Run the UPDATE command on the Delta Table #[pyo3(signature = (updates, predicate=None, writer_properties=None, safe_cast = false, custom_metadata = None, post_commithook_properties=None))] + #[allow(clippy::too_many_arguments)] pub fn update( &mut self, py: Python, @@ -413,18 +407,9 @@ impl RawDeltaTable { cmd = cmd.with_predicate(update_predicate); } - if custom_metadata.is_some() || post_commithook_properties.is_some() { - let mut commit_properties = CommitProperties::default(); - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - commit_properties = commit_properties.with_metadata(json_metadata); - }; - - if let Some(post_commit_hook_props) = post_commithook_properties { - commit_properties = - set_post_commithook_properties(commit_properties, post_commit_hook_props) - } + if let Some(commit_properties) = + maybe_create_commit_properties(custom_metadata, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -476,18 +461,9 @@ impl RawDeltaTable { ); } - if custom_metadata.is_some() || post_commithook_properties.is_some() { - let mut commit_properties = CommitProperties::default(); - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - commit_properties = commit_properties.with_metadata(json_metadata); - }; - - if let Some(post_commit_hook_props) = post_commithook_properties { - commit_properties = - set_post_commithook_properties(commit_properties, post_commit_hook_props) - } + if let Some(commit_properties) = + maybe_create_commit_properties(custom_metadata, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -547,18 +523,9 @@ impl RawDeltaTable { ); } - if custom_metadata.is_some() || post_commithook_properties.is_some() { - let mut commit_properties = CommitProperties::default(); - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - commit_properties = commit_properties.with_metadata(json_metadata); - }; - - if let Some(post_commit_hook_props) = post_commithook_properties { - commit_properties = - set_post_commithook_properties(commit_properties, post_commit_hook_props) - } + if let Some(commit_properties) = + maybe_create_commit_properties(custom_metadata, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -573,6 +540,39 @@ impl RawDeltaTable { Ok(serde_json::to_string(&metrics).unwrap()) } + #[pyo3(signature = (fields, custom_metadata=None, post_commithook_properties=None))] + pub fn add_columns( + &mut self, + py: Python, + fields: Vec, + custom_metadata: Option>, + post_commithook_properties: Option>>, + ) -> PyResult<()> { + let table = py.allow_threads(|| { + let mut cmd = AddColumnBuilder::new( + self._table.log_store(), + self._table.snapshot().map_err(PythonError::from)?.clone(), + ); + + let new_fields = fields + .iter() + .map(|v| v.inner.clone()) + .collect::>(); + + cmd = cmd.with_fields(new_fields); + + if let Some(commit_properties) = + maybe_create_commit_properties(custom_metadata, post_commithook_properties) + { + cmd = cmd.with_commit_properties(commit_properties); + } + + rt().block_on(cmd.into_future()).map_err(PythonError::from) + })?; + self._table.state = table.state; + Ok(()) + } + #[pyo3(signature = (constraints, custom_metadata=None, post_commithook_properties=None))] pub fn add_constraints( &mut self, @@ -591,18 +591,9 @@ impl RawDeltaTable { cmd = cmd.with_constraint(col_name.clone(), expression.clone()); } - if custom_metadata.is_some() || post_commithook_properties.is_some() { - let mut commit_properties = CommitProperties::default(); - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - commit_properties = commit_properties.with_metadata(json_metadata); - }; - - if let Some(post_commit_hook_props) = post_commithook_properties { - commit_properties = - set_post_commithook_properties(commit_properties, post_commit_hook_props) - } + if let Some(commit_properties) = + maybe_create_commit_properties(custom_metadata, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -629,18 +620,9 @@ impl RawDeltaTable { .with_constraint(name) .with_raise_if_not_exists(raise_if_not_exists); - if custom_metadata.is_some() || post_commithook_properties.is_some() { - let mut commit_properties = CommitProperties::default(); - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - commit_properties = commit_properties.with_metadata(json_metadata); - }; - - if let Some(post_commit_hook_props) = post_commithook_properties { - commit_properties = - set_post_commithook_properties(commit_properties, post_commit_hook_props) - } + if let Some(commit_properties) = + maybe_create_commit_properties(custom_metadata, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -789,18 +771,9 @@ impl RawDeltaTable { ); } - if custom_metadata.is_some() || post_commithook_properties.is_some() { - let mut commit_properties = CommitProperties::default(); - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - commit_properties = commit_properties.with_metadata(json_metadata); - }; - - if let Some(post_commit_hook_props) = post_commithook_properties { - commit_properties = - set_post_commithook_properties(commit_properties, post_commit_hook_props) - } + if let Some(commit_properties) = + maybe_create_commit_properties(custom_metadata, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -949,12 +922,9 @@ impl RawDeltaTable { cmd = cmd.with_ignore_missing_files(ignore_missing_files); cmd = cmd.with_protocol_downgrade_allowed(protocol_downgrade_allowed); - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - cmd = cmd - .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); - }; + if let Some(commit_properties) = maybe_create_commit_properties(custom_metadata, None) { + cmd = cmd.with_commit_properties(commit_properties); + } let (table, metrics) = rt() .block_on(cmd.into_future()) @@ -1284,19 +1254,14 @@ impl RawDeltaTable { if let Some(predicate) = predicate { cmd = cmd.with_predicate(predicate); } - - if custom_metadata.is_some() || post_commithook_properties.is_some() { - let mut commit_properties = CommitProperties::default(); - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - commit_properties = commit_properties.with_metadata(json_metadata); - }; - - if let Some(post_commit_hook_props) = post_commithook_properties { - commit_properties = - set_post_commithook_properties(commit_properties, post_commit_hook_props) - } + if let Some(writer_props) = writer_properties { + cmd = cmd.with_writer_properties( + set_writer_properties(writer_props).map_err(PythonError::from)?, + ); + } + if let Some(commit_properties) = + maybe_create_commit_properties(custom_metadata, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -1320,14 +1285,7 @@ impl RawDeltaTable { .with_properties(properties) .with_raise_if_not_exists(raise_if_not_exists); - if custom_metadata.is_some() { - let mut commit_properties = CommitProperties::default(); - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - commit_properties = commit_properties.with_metadata(json_metadata); - }; - + if let Some(commit_properties) = maybe_create_commit_properties(custom_metadata, None) { cmd = cmd.with_commit_properties(commit_properties); } @@ -1353,18 +1311,9 @@ impl RawDeltaTable { ) .with_dry_run(dry_run); - if custom_metadata.is_some() || post_commithook_properties.is_some() { - let mut commit_properties = CommitProperties::default(); - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - commit_properties = commit_properties.with_metadata(json_metadata); - }; - - if let Some(post_commit_hook_props) = post_commithook_properties { - commit_properties = - set_post_commithook_properties(commit_properties, post_commit_hook_props) - } + if let Some(commit_properties) = + maybe_create_commit_properties(custom_metadata, post_commithook_properties) + { cmd = cmd.with_commit_properties(commit_properties); } @@ -1450,6 +1399,27 @@ fn convert_partition_filters( .collect() } +fn maybe_create_commit_properties( + custom_metadata: Option>, + post_commithook_properties: Option>>, +) -> Option { + if custom_metadata.is_none() && post_commithook_properties.is_none() { + return None; + } + let mut commit_properties = CommitProperties::default(); + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + commit_properties = commit_properties.with_metadata(json_metadata); + }; + + if let Some(post_commit_hook_props) = post_commithook_properties { + commit_properties = + set_post_commithook_properties(commit_properties, post_commit_hook_props) + } + Some(commit_properties) +} + fn scalar_to_py<'py>(value: &Scalar, py_date: &Bound<'py, PyAny>) -> PyResult> { use Scalar::*; diff --git a/python/src/schema.rs b/python/src/schema.rs index ba4ea4fa47..7508b5495f 100644 --- a/python/src/schema.rs +++ b/python/src/schema.rs @@ -404,7 +404,7 @@ impl MapType { #[pyclass(module = "deltalake._internal")] #[derive(Clone)] pub struct Field { - inner: StructField, + pub inner: StructField, } #[pymethods] diff --git a/python/tests/test_alter.py b/python/tests/test_alter.py index fb03acd23b..b931939348 100644 --- a/python/tests/test_alter.py +++ b/python/tests/test_alter.py @@ -1,10 +1,12 @@ import pathlib +from typing import List import pyarrow as pa import pytest from deltalake import DeltaTable, write_deltalake from deltalake.exceptions import DeltaError, DeltaProtocolError +from deltalake.schema import Field, PrimitiveType, StructType def test_add_constraint(tmp_path: pathlib.Path, sample_table: pa.Table): @@ -305,3 +307,68 @@ def test_set_table_properties_enable_dv(tmp_path: pathlib.Path, sample_table: pa assert protocol.min_writer_version == 7 assert protocol.writer_features == ["deletionVectors"] assert protocol.reader_features == ["deletionVectors"] + + +def _sort_fields(fields: List[Field]) -> List[Field]: + return list(sorted(iter(fields), key=lambda x: (x.name, str(x.type)))) + + +def test_add_column_primitive(existing_table: DeltaTable): + current_fields = existing_table.schema().fields + + new_fields_to_add = [ + Field("foo", PrimitiveType("integer")), + Field("bar", PrimitiveType("float")), + ] + + existing_table.alter.add_columns(new_fields_to_add) + new_fields = existing_table.schema().fields + + assert _sort_fields(new_fields) == _sort_fields( + [*current_fields, *new_fields_to_add] + ) + + +def test_add_field_in_struct_column(existing_table: DeltaTable): + current_fields = existing_table.schema().fields + + new_fields_to_add = [ + Field("struct", StructType([Field("z", PrimitiveType("float"))])), + ] + + existing_table.alter.add_columns(new_fields_to_add) + new_fields = existing_table.schema().fields + + new_field = Field( + "struct", + StructType( + [ + Field("x", PrimitiveType("long")), + Field("y", PrimitiveType("string")), + Field("z", PrimitiveType("float")), + ] + ), + ) + assert _sort_fields(new_fields) == _sort_fields( + [*[field for field in current_fields if field.name != "struct"], new_field] + ) + + +def test_add_timestamp_ntz_column(tmp_path: pathlib.Path, sample_table: pa.Table): + write_deltalake(tmp_path, sample_table, mode="append", engine="rust") + dt = DeltaTable(tmp_path) + current_fields = dt.schema().fields + + new_fields_to_add = Field("timestamp_ntz_col", PrimitiveType("timestamp_ntz")) + + dt.alter.add_columns(new_fields_to_add) + new_fields = dt.schema().fields + new_protocol = dt.protocol() + + assert _sort_fields(new_fields) == _sort_fields( + [*current_fields, new_fields_to_add] + ) + assert new_protocol.min_reader_version == 3 + assert new_protocol.min_writer_version == 7 + assert new_protocol.reader_features == ["timestampNtz"] + assert new_protocol.writer_features == ["timestampNtz"]