Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add deltaOps set metadata operation #2474

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub use ::datafusion::physical_plan::common::collect as collect_sendable_stream;
use arrow::record_batch::RecordBatch;
use optimize::OptimizeBuilder;
use restore::RestoreBuilder;
use crate::operations::set_metadata::SetMetadataBuilder;

#[cfg(feature = "datafusion")]
pub mod constraints;
Expand All @@ -51,6 +52,7 @@ pub mod update;
#[cfg(feature = "datafusion")]
pub mod write;
pub mod writer;
mod set_metadata;

/// The [Operation] trait defines common behaviors that all operations builders
/// should have consistent
Expand Down Expand Up @@ -161,6 +163,23 @@ impl DeltaOps {
OptimizeBuilder::new(self.0.log_store, self.0.state.unwrap())
}

/// Sets the metadata for a table
///
/// ```
/// use deltalake_core::DeltaOps;
///
/// async {
/// let ops = DeltaOps::try_from_uri("memory://").await.unwrap();
/// let table = ops.create().with_table_name("my_table").await.unwrap();
/// let table = DeltaOps(table).set_metadata().await.unwrap();
/// assert_eq!(table.version(), 1);
/// };
/// ```
#[must_use]
pub fn set_metadata(self) -> SetMetadataBuilder {
SetMetadataBuilder::new(self.0.log_store, self.0.state.unwrap())
}

/// Delete data from Delta table
#[cfg(feature = "datafusion")]
#[must_use]
Expand Down
145 changes: 145 additions & 0 deletions crates/core/src/operations/set_metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
//! Command for updating the metadata of a delta table
// https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala

use futures::future::BoxFuture;

use super::transaction::{CommitBuilder, CommitProperties};
use crate::errors::{DeltaResult};
use crate::kernel::{
Action, Metadata, StructType,
};
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::DeltaTable;
use crate::table::state::DeltaTableState;

/// Build an operation to change the columns of a [DeltaTable]
#[derive(Debug, Clone)]
pub struct SetMetadataBuilder {
/// The base for metadata for the table
metadata: Metadata,

schema: Option<StructType>,

/// A snapshot of the to-be-checked table's state
snapshot: DeltaTableState,

/// Delta object store for handling data files
log_store: LogStoreRef,

/// Commit properties and configuration
commit_properties: CommitProperties,
}

impl super::Operation<()> for SetMetadataBuilder {}

impl SetMetadataBuilder {
/// Create a new [`SetMetadataBuilder`]
pub fn new(log_store: LogStoreRef, state: DeltaTableState) -> Self {
Self {
metadata: state.metadata().clone(),
schema: None,
log_store,
snapshot: state,
commit_properties: CommitProperties::default(),
}
}

/// Sets the schema for the table
pub fn with_schema(
mut self,
schema: StructType,
) -> Self {
self.schema = Some(schema);
self
}
}

impl std::future::IntoFuture for SetMetadataBuilder {
type Output = DeltaResult<DeltaTable>;
type IntoFuture = BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let this = self;
Box::pin(async move {

let mut metadata = this.metadata.clone();
if let Some(schema) = this.schema {
metadata.schema_string = serde_json::to_string(&schema)?
}

let operation = DeltaOperation::SetMetadata {
metadata: metadata.clone(),
};

let actions = vec![Action::Metadata(metadata)];

CommitBuilder::from(this.commit_properties.clone())
.with_actions(actions)
.build(
Some(&this.snapshot),
this.log_store.clone(),
operation,
)?
.await?;

let mut table = DeltaTable::new_with_state(this.log_store.clone(), this.snapshot.clone());
table.update().await?;
Ok(table)
})
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::operations::DeltaOps;
use crate::writer::test_utils::get_delta_schema;
use crate::kernel::{DataType, PrimitiveType, StructField};
use crate::protocol::SaveMode;

#[tokio::test]
async fn test_set_metadata_with_new_schema() {
let table_schema = get_delta_schema();

let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().clone())
.with_save_mode(SaveMode::ErrorIfExists)
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_schema().unwrap(), &table_schema);

let new_table_schema = StructType::new(vec![
StructField::new(
"id".to_string(),
DataType::Primitive(PrimitiveType::String),
true,
),
StructField::new(
"value".to_string(),
DataType::Primitive(PrimitiveType::Integer),
true,
),
StructField::new(
"modified".to_string(),
DataType::Primitive(PrimitiveType::String),
true,
),
StructField::new(
"new_column".to_string(),
DataType::Primitive(PrimitiveType::String),
true,
),
]);

let table = DeltaOps(table.clone())
.set_metadata()
.with_schema(new_table_schema.clone())
.await
.unwrap();

assert_eq!(table.get_schema().unwrap(), &new_table_schema)
}
}
11 changes: 10 additions & 1 deletion crates/core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,13 @@ pub enum DeltaOperation {
not_matched_by_source_predicates: Vec<MergePredicate>,
},

/// Sets the metadata of the table
#[serde(rename_all = "camelCase")]
SetMetadata {
/// Metadata associated with the new table
metadata: Metadata,
},

/// Represents a Delta `StreamingUpdate` operation.
#[serde(rename_all = "camelCase")]
StreamingUpdate {
Expand Down Expand Up @@ -460,6 +467,7 @@ impl DeltaOperation {
DeltaOperation::Delete { .. } => "DELETE",
DeltaOperation::Update { .. } => "UPDATE",
DeltaOperation::Merge { .. } => "MERGE",
DeltaOperation::SetMetadata { .. } => "SET METADATA",
DeltaOperation::StreamingUpdate { .. } => "STREAMING UPDATE",
DeltaOperation::Optimize { .. } => "OPTIMIZE",
DeltaOperation::FileSystemCheck { .. } => "FSCK",
Expand Down Expand Up @@ -507,7 +515,8 @@ impl DeltaOperation {
| Self::VacuumStart { .. }
| Self::VacuumEnd { .. }
| Self::AddConstraint { .. }
| Self::DropConstraint { .. } => false,
| Self::DropConstraint { .. }
| Self::SetMetadata { .. } => false,
Self::Create { .. }
| Self::FileSystemCheck {}
| Self::StreamingUpdate { .. }
Expand Down
Loading