Skip to content

Commit

Permalink
Merge branch 'main' into update-codeowners
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored Sep 25, 2023
2 parents 734baab + 05e6444 commit 7e2afb6
Show file tree
Hide file tree
Showing 25 changed files with 568 additions and 204 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ jobs:
AWS_ACCESS_KEY_ID: deltalake
AWS_SECRET_ACCESS_KEY: weloverust
AWS_ENDPOINT_URL: http://localhost:4566
AWS_STORAGE_ALLOW_HTTP: "1"
AWS_ALLOW_HTTP: "1"
AZURE_USE_EMULATOR: "1"
AZURE_STORAGE_ALLOW_HTTP: "1"
AZURITE_BLOB_STORAGE_URL: "http://localhost:10000"
Expand Down
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
<a href="https://pypi.python.org/pypi/deltalake">
<img alt="Deltalake" src="https://img.shields.io/pypi/pyversions/deltalake.svg?style=flat-square&color=00ADD4&logo=python">
</a>
<a target="_blank" href="https://go.delta.io/slack">
<a target="_blank" href="https://join.slack.com/t/delta-users/shared_invite/zt-23h0xwez7-wDTm43ZVEW2ZcbKn6Bc8Fg">
<img alt="#delta-rs in the Delta Lake Slack workspace" src="https://img.shields.io/badge/slack-delta-blue.svg?logo=slack&style=flat-square&color=F75101">
</a>
</p>
Expand Down Expand Up @@ -106,7 +106,7 @@ You can also try Delta Lake docker at [DockerHub](https://go.delta.io/dockerhub)
We encourage you to reach out, and are [commited](https://github.com/delta-io/delta-rs/blob/main/CODE_OF_CONDUCT.md)
to provide a welcoming community.

- [Join us in our Slack workspace](https://go.delta.io/slack)
- [Join us in our Slack workspace](https://join.slack.com/t/delta-users/shared_invite/zt-23h0xwez7-wDTm43ZVEW2ZcbKn6Bc8Fg)
- [Report an issue](https://github.com/delta-io/delta-rs/issues/new?template=bug_report.md)
- Looking to contribute? See our [good first issues](https://github.com/delta-io/delta-rs/contribute).

Expand Down Expand Up @@ -139,7 +139,7 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc
| S3 - R2 | ![done] | ![done] | requires lock for concurrent writes |
| Azure Blob | ![done] | ![done] | |
| Azure ADLS Gen2 | ![done] | ![done] | |
| Micorosft OneLake | [![open]][onelake-rs] | [![open]][onelake-rs] | |
| Microsoft OneLake | [![open]][onelake-rs] | [![open]][onelake-rs] | |
| Google Cloud Storage | ![done] | ![done] | |

### Supported Operations
Expand All @@ -153,7 +153,7 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc
| Delete - predicates | ![done] | | Delete data based on a predicate |
| Optimize - compaction | ![done] | ![done] | Harmonize the size of data file |
| Optimize - Z-order | ![done] | ![done] | Place similar data into the same file |
| Merge | [![open]][merge-rs] | [![open]][merge-py] | |
| Merge | [![semi-done]][merge-rs]| [![open]][merge-py] | Merge two tables (limited to full re-write) |
| FS check | ![done] | | Remove corrupted files from table |

### Protocol Support Level
Expand All @@ -173,13 +173,14 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc

| Reader Version | Requirement | Status |
| -------------- | ----------------------------------- | ------ |
| Version 2 | Collumn Mapping | |
| Version 2 | Column Mapping | |
| Version 3 | Table Features (requires reader V7) | |

[datafusion]: https://github.com/apache/arrow-datafusion
[ballista]: https://github.com/apache/arrow-ballista
[polars]: https://github.com/pola-rs/polars
[open]: https://cdn.jsdelivr.net/gh/Readme-Workflows/Readme-Icons@main/icons/octicons/IssueNeutral.svg
[semi-done]: https://cdn.jsdelivr.net/gh/Readme-Workflows/Readme-Icons@main/icons/octicons/ApprovedChangesGrey.svg
[done]: https://cdn.jsdelivr.net/gh/Readme-Workflows/Readme-Icons@main/icons/octicons/ApprovedChanges.svg
[roadmap]: https://github.com/delta-io/delta-rs/issues/1128
[merge-py]: https://github.com/delta-io/delta-rs/issues/1357
Expand Down
42 changes: 40 additions & 2 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import operator
import warnings
from dataclasses import dataclass
from datetime import datetime
from datetime import datetime, timedelta
from functools import reduce
from pathlib import Path
from typing import (
Expand Down Expand Up @@ -411,6 +411,11 @@ def metadata(self) -> Metadata:
return self._metadata

def protocol(self) -> ProtocolVersions:
"""
Get the reader and writer protocol versions of the DeltaTable.
:return: the current ProtocolVersions registered in the transaction log
"""
return ProtocolVersions(*self._table.protocol_versions())

def history(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -691,6 +696,7 @@ def compact(
partition_filters: Optional[FilterType] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
min_commit_interval: Optional[Union[int, timedelta]] = None,
) -> Dict[str, Any]:
"""
Compacts small files to reduce the total number of files in the table.
Expand All @@ -708,10 +714,25 @@ def compact(
:param max_concurrent_tasks: the maximum number of concurrent tasks to use for
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
faster, but will also use more memory.
:param min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is
created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you
want a commit per partition.
:return: the metrics from optimize
Examples:
Use a timedelta object to specify the seconds, minutes or hours of the interval.
>>> from deltalake import DeltaTable
>>> from datetime import timedelta
>>> dt = DeltaTable("tmp")
>>> time_delta = timedelta(minutes=10)
>>> dt.optimize.z_order(["timestamp"], min_commit_interval=time_delta)
"""
if isinstance(min_commit_interval, timedelta):
min_commit_interval = int(min_commit_interval.total_seconds())

metrics = self.table._table.compact_optimize(
partition_filters, target_size, max_concurrent_tasks
partition_filters, target_size, max_concurrent_tasks, min_commit_interval
)
self.table.update_incremental()
return json.loads(metrics)
Expand All @@ -723,6 +744,7 @@ def z_order(
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
max_spill_size: int = 20 * 1024 * 1024 * 1024,
min_commit_interval: Optional[Union[int, timedelta]] = None,
) -> Dict[str, Any]:
"""
Reorders the data using a Z-order curve to improve data skipping.
Expand All @@ -738,14 +760,30 @@ def z_order(
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
faster, but will also use more memory.
:param max_spill_size: the maximum number of bytes to spill to disk. Defaults to 20GB.
:param min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is
created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you
want a commit per partition.
:return: the metrics from optimize
Examples:
Use a timedelta object to specify the seconds, minutes or hours of the interval.
>>> from deltalake import DeltaTable
>>> from datetime import timedelta
>>> dt = DeltaTable("tmp")
>>> time_delta = timedelta(minutes=10)
>>> dt.optimize.compact(min_commit_interval=time_delta)
"""
if isinstance(min_commit_interval, timedelta):
min_commit_interval = int(min_commit_interval.total_seconds())

metrics = self.table._table.z_order_optimize(
list(columns),
partition_filters,
target_size,
max_concurrent_tasks,
max_spill_size,
min_commit_interval,
)
self.table.update_incremental()
return json.loads(metrics)
1 change: 1 addition & 0 deletions python/docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def get_release_version() -> str:
("py:class", "pyarrow._dataset_parquet.ParquetFileWriteOptions"),
("py:class", "pathlib.Path"),
("py:class", "datetime.datetime"),
("py:class", "datetime.timedelta"),
]

# Add any paths that contain templates here, relative to this directory.
Expand Down
14 changes: 12 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::future::IntoFuture;
use std::sync::Arc;
use std::time;
use std::time::{SystemTime, UNIX_EPOCH};

use arrow::pyarrow::PyArrowType;
Expand Down Expand Up @@ -273,18 +274,22 @@ impl RawDeltaTable {
}

/// Run the optimize command on the Delta Table: merge small files into a large file by bin-packing.
#[pyo3(signature = (partition_filters = None, target_size = None, max_concurrent_tasks = None))]
#[pyo3(signature = (partition_filters = None, target_size = None, max_concurrent_tasks = None, min_commit_interval = None))]
pub fn compact_optimize(
&mut self,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
target_size: Option<i64>,
max_concurrent_tasks: Option<usize>,
min_commit_interval: Option<u64>,
) -> PyResult<String> {
let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone())
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get));
if let Some(size) = target_size {
cmd = cmd.with_target_size(size);
}
if let Some(commit_interval) = min_commit_interval {
cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval));
}
let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default())
.map_err(PythonError::from)?;
cmd = cmd.with_filters(&converted_filters);
Expand All @@ -297,14 +302,15 @@ impl RawDeltaTable {
}

/// Run z-order variation of optimize
#[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None, max_spill_size = 20 * 1024 * 1024 * 1024))]
#[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None, max_spill_size = 20 * 1024 * 1024 * 1024, min_commit_interval = None))]
pub fn z_order_optimize(
&mut self,
z_order_columns: Vec<String>,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
target_size: Option<i64>,
max_concurrent_tasks: Option<usize>,
max_spill_size: usize,
min_commit_interval: Option<u64>,
) -> PyResult<String> {
let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone())
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get))
Expand All @@ -313,6 +319,10 @@ impl RawDeltaTable {
if let Some(size) = target_size {
cmd = cmd.with_target_size(size);
}
if let Some(commit_interval) = min_commit_interval {
cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval));
}

let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default())
.map_err(PythonError::from)?;
cmd = cmd.with_filters(&converted_filters);
Expand Down
2 changes: 1 addition & 1 deletion python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def s3_localstack_creds():

@pytest.fixture()
def s3_localstack(monkeypatch, s3_localstack_creds):
monkeypatch.setenv("AWS_STORAGE_ALLOW_HTTP", "TRUE")
monkeypatch.setenv("AWS_ALLOW_HTTP", "TRUE")
for key, value in s3_localstack_creds.items():
monkeypatch.setenv(key, value)

Expand Down
2 changes: 1 addition & 1 deletion python/tests/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def test_roundtrip_s3_direct(s3_localstack_creds, sample_data: pa.Table):

# Can pass storage_options in directly
storage_opts = {
"AWS_STORAGE_ALLOW_HTTP": "true",
"AWS_ALLOW_HTTP": "true",
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}
storage_opts.update(s3_localstack_creds)
Expand Down
22 changes: 22 additions & 0 deletions python/tests/test_optimize.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pathlib
from datetime import timedelta

import pyarrow as pa
import pytest
Expand Down Expand Up @@ -46,3 +47,24 @@ def test_z_order_optimize(
last_action = dt.history(1)[0]
assert last_action["operation"] == "OPTIMIZE"
assert dt.version() == old_version + 1


def test_optimize_min_commit_interval(
tmp_path: pathlib.Path,
sample_data: pa.Table,
):
write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append")
write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append")
write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append")

dt = DeltaTable(tmp_path)
old_version = dt.version()

dt.optimize.z_order(["date32", "timestamp"], min_commit_interval=timedelta(0))

last_action = dt.history(1)[0]
assert last_action["operation"] == "OPTIMIZE"
# The table has 5 distinct partitions, each of which are Z-ordered
# independently. So with min_commit_interval=0, each will get its
# own commit.
assert dt.version() == old_version + 5
5 changes: 4 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ repository = "https://github.com/delta-io/delta.rs"
readme = "README.md"
edition = "2021"

[package.metadata.docs.rs]
all-features = true

[dependencies]
# arrow
arrow = { workspace = true, optional = true }
Expand Down Expand Up @@ -105,7 +108,7 @@ dashmap = { version = "5", optional = true }
sqlparser = { version = "0.37", optional = true }

# NOTE dependencies only for integration tests
fs_extra = { version = "1.2.0", optional = true }
fs_extra = { version = "1.3.0", optional = true }
tempdir = { version = "0", optional = true }

dynamodb_lock = { version = "0", default-features = false, optional = true }
Expand Down
2 changes: 1 addition & 1 deletion rust/examples/recordbatch-writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn main() -> Result<(), DeltaTableError> {
info!("It doesn't look like our delta table has been created");
create_initialized_table(&table_path).await
}
Err(err) => Err(err).unwrap(),
Err(err) => panic!("{:?}", err),
};

let writer_properties = WriterProperties::builder()
Expand Down
7 changes: 7 additions & 0 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,12 @@ impl Default for Format {
}
}

/// Return a default empty schema to be used for edge-cases when a schema is missing
fn default_schema() -> String {
warn!("A `metaData` action was missing a `schemaString` and has been given an empty schema");
r#"{"type":"struct", "fields": []}"#.into()
}

/// Action that describes the metadata of the table.
/// This is a top-level action in Delta log entries.
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
Expand All @@ -520,6 +526,7 @@ pub struct MetaData {
/// Specification of the encoding for the files stored in the table
pub format: Format,
/// Schema of the table
#[serde(default = "default_schema")]
pub schema_string: String,
/// An array containing the names of columns by which the data should be partitioned
pub partition_columns: Vec<String>,
Expand Down
2 changes: 1 addition & 1 deletion rust/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ pub mod s3_storage_options {
/// See also <https://docs.rs/rusoto_sts/0.47.0/rusoto_sts/struct.WebIdentityProvider.html#method.from_k8s_env>.
pub const AWS_ROLE_SESSION_NAME: &str = "AWS_ROLE_SESSION_NAME";
/// Allow http connections - mainly useful for integration tests
pub const AWS_STORAGE_ALLOW_HTTP: &str = "AWS_STORAGE_ALLOW_HTTP";
pub const AWS_ALLOW_HTTP: &str = "AWS_ALLOW_HTTP";

/// If set to "true", allows creating commits without concurrent writer protection.
/// Only safe if there is one writer to a given table.
Expand Down
4 changes: 2 additions & 2 deletions rust/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ impl ListingSchemaProvider {
storage_options: Option<HashMap<String, String>>,
) -> DeltaResult<Self> {
let uri = ensure_table_uri(root_uri)?;
let storage_options = storage_options.unwrap_or_default().into();
let mut storage_options = storage_options.unwrap_or_default().into();
// We already parsed the url, so unwrapping is safe.
let store = configure_store(&uri, &storage_options)?;
let store = configure_store(&uri, &mut storage_options)?;
Ok(Self {
authority: uri.to_string(),
store,
Expand Down
Loading

0 comments on commit 7e2afb6

Please sign in to comment.