Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python, rust): allow for deferring optimize.compact() commits in python client #2267

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 84 additions & 18 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ pub struct Metrics {
pub preserve_insertion_order: bool,
}

/// Information to be committed by the optimizer.
#[derive(Debug)]
pub struct CommitContext {
actions: Vec<Action>,
app_metadata: Option<HashMap<String, serde_json::Value>>,
snapshot: Option<DeltaTableState>,
operation: OptimizeInput,
}

/// Statistics on files for a particular operation
/// Operation can be remove or add
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -175,6 +184,8 @@ pub struct OptimizeBuilder<'a> {
/// Optimize type
optimize_type: OptimizeType,
min_commit_interval: Option<Duration>,
/// Indicates whether the writes should be committed
commit_writes: bool,
}

impl<'a> OptimizeBuilder<'a> {
Expand All @@ -192,6 +203,7 @@ impl<'a> OptimizeBuilder<'a> {
max_spill_size: 20 * 1024 * 1024 * 2014, // 20 GB.
optimize_type: OptimizeType::Compact,
min_commit_interval: None,
commit_writes: true, // commit by default
}
}

Expand Down Expand Up @@ -251,10 +263,36 @@ impl<'a> OptimizeBuilder<'a> {
self.min_commit_interval = Some(min_commit_interval);
self
}

/// Commit Writes
pub fn with_commit_writes(mut self, commit_writes: bool) -> Self {
self.commit_writes = commit_writes;
self
}
}

impl<'a> OptimizeBuilder<'a> {
/// Commit writes after processing
pub async fn commit_writes(self, commit_info: CommitContext) -> DeltaResult<DeltaTable> {
commit(
self.log_store.as_ref(),
&commit_info.actions,
commit_info.operation.clone().into(),
commit_info.snapshot.as_ref(),
commit_info.app_metadata.clone(),
)
.await?;

let mut table = DeltaTable::new_with_state(self.log_store, self.snapshot);

table.update().await?;

Ok(table)
}
}

impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
type Output = DeltaResult<(DeltaTable, Metrics)>;
type Output = DeltaResult<(DeltaTable, Metrics, Option<CommitContext>)>;
type IntoFuture = BoxFuture<'a, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
Expand All @@ -276,19 +314,22 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
this.target_size.to_owned(),
writer_properties,
)?;
let metrics = plan
let (metrics, commit_info) = plan
.execute(
this.log_store.clone(),
&this.snapshot,
this.max_concurrent_tasks,
this.max_spill_size,
this.min_commit_interval,
this.app_metadata,
this.commit_writes,
)
.await?;

let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot);
table.update().await?;
Ok((table, metrics))

Ok((table, metrics, commit_info))
})
}
}
Expand Down Expand Up @@ -604,6 +645,7 @@ impl MergePlan {
}

/// Perform the operations outlined in the plan.
#[allow(clippy::too_many_arguments)]
pub async fn execute(
mut self,
log_store: LogStoreRef,
Expand All @@ -613,9 +655,9 @@ impl MergePlan {
max_spill_size: usize,
min_commit_interval: Option<Duration>,
app_metadata: Option<HashMap<String, serde_json::Value>>,
) -> Result<Metrics, DeltaTableError> {
commit_writes: bool,
) -> Result<(Metrics, Option<CommitContext>), DeltaTableError> {
let operations = std::mem::take(&mut self.operations);

let stream = match operations {
OptimizeOperations::Compact(bins) => futures::stream::iter(bins)
.flat_map(|(_, (partition, bins))| {
Expand Down Expand Up @@ -698,6 +740,9 @@ impl MergePlan {
// or when we reach the commit interval.
let mut actions = vec![];

// Store total actions if we are not committing
let mut total_actions: Vec<Action> = vec![];

// Each time we commit, we'll reset buffered_metrics to orig_metrics.
let orig_metrics = std::mem::take(&mut self.metrics);
let mut buffered_metrics = orig_metrics.clone();
Expand All @@ -721,7 +766,11 @@ impl MergePlan {
None => false,
Some(i) => now.duration_since(last_commit) > i,
};
if !actions.is_empty() && (mature || end) {

// if commit_writes is false, we are not committing anyway, and will
// instead return a commit_info object, so no need to consider the
// min_commit_interval
if !actions.is_empty() && (!commit_writes || mature || end) {
let actions = std::mem::take(&mut actions);
last_commit = now;

Expand All @@ -740,24 +789,41 @@ impl MergePlan {
}

table.update().await?;
debug!("committing {} actions", actions.len());
//// TODO: Check for remove actions on optimized partitions. If a
//// optimized partition was updated then abort the commit. Requires (#593).
commit(
table.log_store.as_ref(),
&actions,
self.task_parameters.input_parameters.clone().into(),
Some(table.snapshot()?),
Some(app_metadata.clone()),
)
.await?;

if commit_writes {
debug!("committing {} actions", actions.len());
//// TODO: Check for remove actions on optimized partitions. If a
//// optimized partition was updated then abort the commit. Requires (#593).
commit(
table.log_store.as_ref(),
&actions,
self.task_parameters.input_parameters.clone().into(),
Some(table.snapshot()?),
Some(app_metadata.clone()),
)
.await?;
} else {
// Save the actions buffer so the client can commit later
total_actions.extend(actions)
}
}

if end {
break;
}
}

let commit_context = if !commit_writes {
Some(CommitContext {
actions: total_actions,
app_metadata,
snapshot: Some(snapshot.clone()), // using original table snapshot as all commits are added at once
operation: self.task_parameters.input_parameters.clone(),
})
} else {
None
};

total_metrics.preserve_insertion_order = true;
if total_metrics.num_files_added == 0 {
total_metrics.files_added.min = 0;
Expand All @@ -766,7 +832,7 @@ impl MergePlan {
total_metrics.files_removed.min = 0;
}

Ok(total_metrics)
Ok((total_metrics, commit_context))
}
}

Expand Down
Loading
Loading