From 88ea11027eb841628b8c94225181407cb00bdd25 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 14 Mar 2024 22:27:39 -0400 Subject: [PATCH] impl write app txn --- crates/core/src/kernel/models/actions.rs | 20 ++++++++++ crates/core/src/kernel/snapshot/mod.rs | 3 +- crates/core/src/operations/transaction/mod.rs | 37 ++++++++++++++++++- crates/core/src/operations/write.rs | 21 +++++++++++ crates/core/src/table/state.rs | 2 + 5 files changed, 80 insertions(+), 3 deletions(-) diff --git a/crates/core/src/kernel/models/actions.rs b/crates/core/src/kernel/models/actions.rs index 8284bb92e0..327793e4dd 100644 --- a/crates/core/src/kernel/models/actions.rs +++ b/crates/core/src/kernel/models/actions.rs @@ -671,6 +671,26 @@ pub struct Txn { pub last_updated: Option, } +impl Txn { + /// Create a new application transactions. See [`Txn`] for details. + pub fn new(app_id: &dyn ToString, version: i64) -> Self { + Self::new_with_last_update(app_id, version, None) + } + + /// Create a new application transactions. See [`Txn`] for details. + pub fn new_with_last_update( + app_id: &dyn ToString, + version: i64, + last_updated: Option, + ) -> Self { + Txn { + app_id: app_id.to_string(), + version, + last_updated, + } + } +} + /// The commitInfo is a fairly flexible action within the delta specification, where arbitrary data can be stored. /// However the reference implementation as well as delta-rs store useful information that may for instance /// allow us to be more permissive in commit conflict resolution. diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index fddfc11c9a..8403882024 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -801,7 +801,8 @@ mod tests { predicate: None, }; - let actions = vec![CommitData::new(removes, operation, HashMap::new()).unwrap()]; + let actions = + vec![CommitData::new(removes, operation, HashMap::new(), Vec::new()).unwrap()]; let new_version = snapshot.advance(&actions)?; assert_eq!(new_version, version + 1); diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 4846844dfc..b8221950c5 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -86,7 +86,7 @@ use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommit use crate::checkpoints::create_checkpoint_for; use crate::errors::DeltaTableError; use crate::kernel::{ - Action, CommitInfo, EagerSnapshot, Metadata, Protocol, ReaderFeatures, WriterFeatures, + Action, CommitInfo, EagerSnapshot, Metadata, Protocol, ReaderFeatures, Txn, WriterFeatures, }; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; @@ -253,6 +253,8 @@ pub struct CommitData { pub operation: DeltaOperation, /// The Metadata pub app_metadata: HashMap, + /// Application specific transaction + pub app_transactions: Vec, } impl CommitData { @@ -261,6 +263,7 @@ impl CommitData { mut actions: Vec, operation: DeltaOperation, mut app_metadata: HashMap, + app_transactions: Vec, ) -> Result { if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) { let mut commit_info = operation.get_commit_info(); @@ -273,10 +276,18 @@ impl CommitData { commit_info.info = app_metadata.clone(); actions.push(Action::CommitInfo(commit_info)) } + + for txn in &app_transactions { + actions.push(Action::Txn(txn.clone())) + } + + dbg!("{:?}", &actions); + Ok(CommitData { actions, operation, app_metadata, + app_transactions, }) } @@ -312,6 +323,7 @@ pub struct PostCommitHookProperties { /// Enable controling commit behaviour and modifying metadata that is written during a commit. pub struct CommitProperties { pub(crate) app_metadata: HashMap, + pub(crate) app_transaction: Vec, max_retries: usize, create_checkpoint: bool, } @@ -320,6 +332,7 @@ impl Default for CommitProperties { fn default() -> Self { Self { app_metadata: Default::default(), + app_transaction: Vec::new(), max_retries: DEFAULT_RETRIES, create_checkpoint: true, } @@ -341,6 +354,18 @@ impl CommitProperties { self.create_checkpoint = create_checkpoint; self } + + /// Add an additonal application transaction to the commit + pub fn with_application_transaction(mut self, txn: Txn) -> Self { + self.app_transaction.push(txn); + self + } + + /// Override application transactions for the commit + pub fn with_application_transactions(mut self, txn: Vec) -> Self { + self.app_transaction = txn; + self + } } impl From for CommitBuilder { @@ -352,6 +377,7 @@ impl From for CommitBuilder { create_checkpoint: value.create_checkpoint, } .into(), + app_transaction: value.app_transaction, ..Default::default() } } @@ -361,6 +387,7 @@ impl From for CommitBuilder { pub struct CommitBuilder { actions: Vec, app_metadata: HashMap, + app_transaction: Vec, max_retries: usize, post_commit_hook: Option, } @@ -370,6 +397,7 @@ impl Default for CommitBuilder { CommitBuilder { actions: Vec::new(), app_metadata: HashMap::new(), + app_transaction: Vec::new(), max_retries: DEFAULT_RETRIES, post_commit_hook: None, } @@ -408,7 +436,12 @@ impl<'a> CommitBuilder { log_store: LogStoreRef, operation: DeltaOperation, ) -> Result, CommitBuilderError> { - let data = CommitData::new(self.actions, operation, self.app_metadata)?; + let data = CommitData::new( + self.actions, + operation, + self.app_metadata, + self.app_transaction, + )?; Ok(PreCommit { log_store, table_data, diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index f2949ce94a..9df1f02e70 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -924,6 +924,7 @@ fn try_cast_batch(from_fields: &Fields, to_fields: &Fields) -> Result<(), ArrowE #[cfg(test)] mod tests { use super::*; + use crate::kernel::Txn; use crate::operations::{collect_sendable_stream, DeltaOps}; use crate::protocol::SaveMode; use crate::writer::test_utils::datafusion::write_batch; @@ -1668,4 +1669,24 @@ mod tests { let actual = get_data_sorted(&table, "id,value,modified").await; assert_batches_sorted_eq!(&expected, &actual); } + + #[tokio::test] + async fn test_app_txn() { + let batch = get_record_batch(None, false); + let table = DeltaOps::new_in_memory() + .write(vec![batch.clone()]) + .with_save_mode(SaveMode::ErrorIfExists) + .with_partition_columns(["modified"]) + .with_commit_properties( + CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 1)), + ) + .await + .unwrap(); + assert_eq!(table.version(), 0); + assert_eq!(table.get_files_count(), 2); + + let app_txns = table.get_app_transaction_version(); + println!("{:?}", &app_txns); + assert_eq!(app_txns.len(), 1); + } } diff --git a/crates/core/src/table/state.rs b/crates/core/src/table/state.rs index a5b25acef4..3141550f1b 100644 --- a/crates/core/src/table/state.rs +++ b/crates/core/src/table/state.rs @@ -84,6 +84,7 @@ impl DeltaTableState { metadata: metadata.clone(), }, HashMap::new(), + Vec::new(), ) .unwrap()]; @@ -200,6 +201,7 @@ impl DeltaTableState { actions, operation: operation.clone(), app_metadata: HashMap::new(), + app_transactions: Vec::new(), }; let new_version = self.snapshot.advance(&vec![commit_data])?; if new_version != version {