Skip to content

Commit

Permalink
impl write app txn
Browse files Browse the repository at this point in the history
  • Loading branch information
Blajda authored and rtyler committed May 26, 2024
1 parent 319229d commit 88ea110
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 3 deletions.
20 changes: 20 additions & 0 deletions crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,26 @@ pub struct Txn {
pub last_updated: Option<i64>,
}

impl Txn {
/// Create a new application transactions. See [`Txn`] for details.
pub fn new(app_id: &dyn ToString, version: i64) -> Self {
Self::new_with_last_update(app_id, version, None)
}

/// Create a new application transactions. See [`Txn`] for details.
pub fn new_with_last_update(
app_id: &dyn ToString,
version: i64,
last_updated: Option<i64>,
) -> Self {
Txn {
app_id: app_id.to_string(),
version,
last_updated,
}
}
}

/// The commitInfo is a fairly flexible action within the delta specification, where arbitrary data can be stored.
/// However the reference implementation as well as delta-rs store useful information that may for instance
/// allow us to be more permissive in commit conflict resolution.
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,8 @@ mod tests {
predicate: None,
};

let actions = vec![CommitData::new(removes, operation, HashMap::new()).unwrap()];
let actions =
vec![CommitData::new(removes, operation, HashMap::new(), Vec::new()).unwrap()];

let new_version = snapshot.advance(&actions)?;
assert_eq!(new_version, version + 1);
Expand Down
37 changes: 35 additions & 2 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommit
use crate::checkpoints::create_checkpoint_for;
use crate::errors::DeltaTableError;
use crate::kernel::{
Action, CommitInfo, EagerSnapshot, Metadata, Protocol, ReaderFeatures, WriterFeatures,
Action, CommitInfo, EagerSnapshot, Metadata, Protocol, ReaderFeatures, Txn, WriterFeatures,
};
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
Expand Down Expand Up @@ -253,6 +253,8 @@ pub struct CommitData {
pub operation: DeltaOperation,
/// The Metadata
pub app_metadata: HashMap<String, Value>,
/// Application specific transaction
pub app_transactions: Vec<Txn>,
}

impl CommitData {
Expand All @@ -261,6 +263,7 @@ impl CommitData {
mut actions: Vec<Action>,
operation: DeltaOperation,
mut app_metadata: HashMap<String, Value>,
app_transactions: Vec<Txn>,
) -> Result<Self, CommitBuilderError> {
if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) {
let mut commit_info = operation.get_commit_info();
Expand All @@ -273,10 +276,18 @@ impl CommitData {
commit_info.info = app_metadata.clone();
actions.push(Action::CommitInfo(commit_info))
}

for txn in &app_transactions {
actions.push(Action::Txn(txn.clone()))
}

dbg!("{:?}", &actions);

Ok(CommitData {
actions,
operation,
app_metadata,
app_transactions,
})
}

Expand Down Expand Up @@ -312,6 +323,7 @@ pub struct PostCommitHookProperties {
/// Enable controling commit behaviour and modifying metadata that is written during a commit.
pub struct CommitProperties {
pub(crate) app_metadata: HashMap<String, Value>,
pub(crate) app_transaction: Vec<Txn>,
max_retries: usize,
create_checkpoint: bool,
}
Expand All @@ -320,6 +332,7 @@ impl Default for CommitProperties {
fn default() -> Self {
Self {
app_metadata: Default::default(),
app_transaction: Vec::new(),
max_retries: DEFAULT_RETRIES,
create_checkpoint: true,
}
Expand All @@ -341,6 +354,18 @@ impl CommitProperties {
self.create_checkpoint = create_checkpoint;
self
}

/// Add an additonal application transaction to the commit
pub fn with_application_transaction(mut self, txn: Txn) -> Self {
self.app_transaction.push(txn);
self
}

/// Override application transactions for the commit
pub fn with_application_transactions(mut self, txn: Vec<Txn>) -> Self {
self.app_transaction = txn;
self
}
}

impl From<CommitProperties> for CommitBuilder {
Expand All @@ -352,6 +377,7 @@ impl From<CommitProperties> for CommitBuilder {
create_checkpoint: value.create_checkpoint,
}
.into(),
app_transaction: value.app_transaction,
..Default::default()
}
}
Expand All @@ -361,6 +387,7 @@ impl From<CommitProperties> for CommitBuilder {
pub struct CommitBuilder {
actions: Vec<Action>,
app_metadata: HashMap<String, Value>,
app_transaction: Vec<Txn>,
max_retries: usize,
post_commit_hook: Option<PostCommitHookProperties>,
}
Expand All @@ -370,6 +397,7 @@ impl Default for CommitBuilder {
CommitBuilder {
actions: Vec::new(),
app_metadata: HashMap::new(),
app_transaction: Vec::new(),
max_retries: DEFAULT_RETRIES,
post_commit_hook: None,
}
Expand Down Expand Up @@ -408,7 +436,12 @@ impl<'a> CommitBuilder {
log_store: LogStoreRef,
operation: DeltaOperation,
) -> Result<PreCommit<'a>, CommitBuilderError> {
let data = CommitData::new(self.actions, operation, self.app_metadata)?;
let data = CommitData::new(
self.actions,
operation,
self.app_metadata,
self.app_transaction,
)?;
Ok(PreCommit {
log_store,
table_data,
Expand Down
21 changes: 21 additions & 0 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,7 @@ fn try_cast_batch(from_fields: &Fields, to_fields: &Fields) -> Result<(), ArrowE
#[cfg(test)]
mod tests {
use super::*;
use crate::kernel::Txn;
use crate::operations::{collect_sendable_stream, DeltaOps};
use crate::protocol::SaveMode;
use crate::writer::test_utils::datafusion::write_batch;
Expand Down Expand Up @@ -1668,4 +1669,24 @@ mod tests {
let actual = get_data_sorted(&table, "id,value,modified").await;
assert_batches_sorted_eq!(&expected, &actual);
}

#[tokio::test]
async fn test_app_txn() {
let batch = get_record_batch(None, false);
let table = DeltaOps::new_in_memory()
.write(vec![batch.clone()])
.with_save_mode(SaveMode::ErrorIfExists)
.with_partition_columns(["modified"])
.with_commit_properties(
CommitProperties::default().with_application_transaction(Txn::new(&"my-app", 1)),
)
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_files_count(), 2);

let app_txns = table.get_app_transaction_version();
println!("{:?}", &app_txns);
assert_eq!(app_txns.len(), 1);
}
}
2 changes: 2 additions & 0 deletions crates/core/src/table/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl DeltaTableState {
metadata: metadata.clone(),
},
HashMap::new(),
Vec::new(),
)
.unwrap()];

Expand Down Expand Up @@ -200,6 +201,7 @@ impl DeltaTableState {
actions,
operation: operation.clone(),
app_metadata: HashMap::new(),
app_transactions: Vec::new(),
};
let new_version = self.snapshot.advance(&vec![commit_data])?;
if new_version != version {
Expand Down

0 comments on commit 88ea110

Please sign in to comment.