Skip to content

Commit

Permalink
cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
gautham acharya committed Mar 9, 2024
1 parent 04b5d32 commit 1099229
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 49 deletions.
41 changes: 17 additions & 24 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ pub struct OptimizeBuilder<'a> {
optimize_type: OptimizeType,
min_commit_interval: Option<Duration>,
/// Indicates whether the writes should be committed
commit_writes: bool
commit_writes: bool,
}

impl<'a> OptimizeBuilder<'a> {
Expand Down Expand Up @@ -273,17 +273,15 @@ impl<'a> OptimizeBuilder<'a> {

impl<'a> OptimizeBuilder<'a> {
/// Commit writes after processing
pub async fn commit_writes(
self,
commit_info: CommitContext
) -> DeltaResult<DeltaTable> {
pub async fn commit_writes(self, commit_info: CommitContext) -> DeltaResult<DeltaTable> {
commit(
self.log_store.as_ref(),
&commit_info.actions,
commit_info.operation.clone().into(),
commit_info.snapshot.as_ref(),
commit_info.app_metadata.clone()
).await?;
self.log_store.as_ref(),
&commit_info.actions,
commit_info.operation.clone().into(),
commit_info.snapshot.as_ref(),
commit_info.app_metadata.clone(),
)
.await?;

let mut table = DeltaTable::new_with_state(self.log_store, self.snapshot);

Expand Down Expand Up @@ -324,7 +322,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
this.max_spill_size,
this.min_commit_interval,
this.app_metadata,
this.commit_writes
this.commit_writes,
)
.await?;

Expand Down Expand Up @@ -657,9 +655,8 @@ impl MergePlan {
max_spill_size: usize,
min_commit_interval: Option<Duration>,
app_metadata: Option<HashMap<String, serde_json::Value>>,
commit_writes: bool
commit_writes: bool,
) -> Result<(Metrics, Option<CommitContext>), DeltaTableError> {

let operations = std::mem::take(&mut self.operations);
let stream = match operations {
OptimizeOperations::Compact(bins) => futures::stream::iter(bins)
Expand Down Expand Up @@ -794,7 +791,6 @@ impl MergePlan {
table.update().await?;

if commit_writes {

debug!("committing {} actions", actions.len());
//// TODO: Check for remove actions on optimized partitions. If a
//// optimized partition was updated then abort the commit. Requires (#593).
Expand All @@ -806,12 +802,10 @@ impl MergePlan {
Some(app_metadata.clone()),
)
.await?;

} else {
// Save the actions buffer so the client can commit later
total_actions.extend(actions)
}

}

if end {
Expand All @@ -820,12 +814,12 @@ impl MergePlan {
}

let commit_context = if !commit_writes {
Some(CommitContext{
actions: total_actions,
app_metadata,
snapshot: Some(snapshot.clone()), // using original table snapshot as all commits are added at once
operation: self.task_parameters.input_parameters.clone()
})
Some(CommitContext {
actions: total_actions,
app_metadata,
snapshot: Some(snapshot.clone()), // using original table snapshot as all commits are added at once
operation: self.task_parameters.input_parameters.clone(),
})
} else {
None
};
Expand All @@ -838,7 +832,6 @@ impl MergePlan {
total_metrics.files_removed.min = 0;
}


Ok((total_metrics, commit_context))
}
}
Expand Down
26 changes: 17 additions & 9 deletions crates/core/tests/command_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,10 @@ async fn test_optimize_non_partitioned_table_deferred_commit() -> Result<(), Box
let version = dt.version();
assert_eq!(dt.get_files_count(), 5);

let optimize = DeltaOps(dt).optimize().with_target_size(2_000_000).with_commit_writes(false);
let optimize = DeltaOps(dt)
.optimize()
.with_target_size(2_000_000)
.with_commit_writes(false);
let (dt, metrics, commit_context) = optimize.await?;

// Still have same version, and file count,
Expand All @@ -255,9 +258,10 @@ async fn test_optimize_non_partitioned_table_deferred_commit() -> Result<(), Box
let commit_info = dt.history(None).await?;
assert_eq!(commit_info.len(), 6);

let dt_commit = DeltaOps(dt).optimize().commit_writes(
commit_context.unwrap()
).await?;
let dt_commit = DeltaOps(dt)
.optimize()
.commit_writes(commit_context.unwrap())
.await?;

assert_eq!(version + 1, dt_commit.version());

Expand Down Expand Up @@ -309,7 +313,10 @@ async fn test_optimize_with_partitions_deferred_commit() -> Result<(), Box<dyn E
let version = dt.version();
let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?];

let optimize = DeltaOps(dt).optimize().with_filters(&filter).with_commit_writes(false);
let optimize = DeltaOps(dt)
.optimize()
.with_filters(&filter)
.with_commit_writes(false);
let (dt, metrics, commit_context) = optimize.await?;

// optimize is not committed, so we have 4 files.
Expand All @@ -321,9 +328,10 @@ async fn test_optimize_with_partitions_deferred_commit() -> Result<(), Box<dyn E
let commit_info = dt.history(None).await?;
assert_eq!(commit_info.len(), 5);

let dt_commit = DeltaOps(dt).optimize().commit_writes(
commit_context.unwrap()
).await?;
let dt_commit = DeltaOps(dt)
.optimize()
.commit_writes(commit_context.unwrap())
.await?;

assert_eq!(version + 1, dt_commit.version());

Expand Down Expand Up @@ -544,7 +552,7 @@ async fn test_commit_interval() -> Result<(), Box<dyn Error>> {
20,
Some(Duration::from_secs(0)), // this will cause as many commits as num_files_added
None,
true
true,
)
.await?;
assert_eq!(metrics.num_files_added, 2);
Expand Down
6 changes: 2 additions & 4 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ class RawDeltaTable:
enforce_retention_duration: bool,
custom_metadata: Optional[Dict[str, str]],
) -> List[str]: ...
def commit_optimize(
self
) -> None: ...
def commit_optimize(self) -> None: ...
def compact_optimize(
self,
partition_filters: Optional[FilterType],
Expand All @@ -70,7 +68,7 @@ class RawDeltaTable:
min_commit_interval: Optional[int],
writer_properties: Optional[Dict[str, Optional[str]]],
custom_metadata: Optional[Dict[str, str]],
commit_writes: bool
commit_writes: bool,
) -> str: ...
def z_order_optimize(
self,
Expand Down
8 changes: 3 additions & 5 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1857,9 +1857,7 @@ def __call__(

return self.compact(partition_filters, target_size, max_concurrent_tasks)

def commit(
self
) -> Dict[str, Any]:
def commit(self) -> Dict[str, Any]:
self.table._table.commit_optimize()

def compact(
Expand All @@ -1870,7 +1868,7 @@ def compact(
min_commit_interval: Optional[Union[int, timedelta]] = None,
writer_properties: Optional[WriterProperties] = None,
custom_metadata: Optional[Dict[str, str]] = None,
commit_writes: bool = True
commit_writes: bool = True,
) -> Dict[str, Any]:
"""
Compacts small files to reduce the total number of files in the table.
Expand Down Expand Up @@ -1924,7 +1922,7 @@ def compact(
min_commit_interval,
writer_properties._to_dict() if writer_properties else None,
custom_metadata,
commit_writes
commit_writes,
)
self.table.update_incremental()
return json.loads(metrics)
Expand Down
15 changes: 8 additions & 7 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,14 +366,14 @@ impl RawDeltaTable {
.with_max_concurrent_tasks(num_cpus::get());

if self._commit.is_none() {
return Err(PyValueError::new_err("Cannot commit optimization, nothing to commit."))
return Err(
PyValueError::new_err("Cannot commit optimization, nothing to commit."
));
}

let table = rt()?.block_on(
cmd.commit_writes(
self._commit.take().unwrap()
)
).map_err(PythonError::from)?;
let table = rt()?
.block_on(cmd.commit_writes(self._commit.take().unwrap()))
.map_err(PythonError::from)?;

self._table.state = table.state;
self._commit = None;
Expand Down Expand Up @@ -431,7 +431,8 @@ impl RawDeltaTable {
cmd = cmd.with_metadata(json_metadata);
};

let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default())
let converted_filters =
convert_partition_filters(partition_filters.unwrap_or_default())
.map_err(PythonError::from)?;
cmd = cmd.with_filters(&converted_filters);

Expand Down
2 changes: 2 additions & 0 deletions python/tests/test_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def test_z_order_optimize(
assert dt.version() == old_version + 1
assert len(dt.file_uris()) == 1


def test_optimize_deferred_write(
tmp_path: pathlib.Path,
sample_data: pa.Table,
Expand All @@ -104,6 +105,7 @@ def test_optimize_deferred_write(
last_action = dt.history(1)[0]
assert last_action["operation"] == "OPTIMIZE"


def test_optimize_min_commit_interval(
tmp_path: pathlib.Path,
sample_data: pa.Table,
Expand Down

0 comments on commit 1099229

Please sign in to comment.