Skip to content

Commit

Permalink
Merge branch 'main' into cdc-update-only
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed May 21, 2024
2 parents 83da4fe + 16ecd1c commit 763b6dd
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 191 deletions.
11 changes: 9 additions & 2 deletions crates/core/src/operations/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,9 @@ impl DeltaWriter {
}
}

/// Write configuration for partition writers
#[derive(Debug)]
pub(crate) struct PartitionWriterConfig {
pub struct PartitionWriterConfig {
/// Schema of the data written to disk
file_schema: ArrowSchemaRef,
/// Prefix applied to all paths
Expand All @@ -244,6 +245,7 @@ pub(crate) struct PartitionWriterConfig {
}

impl PartitionWriterConfig {
/// Create a new instance of [PartitionWriterConfig]
pub fn try_new(
file_schema: ArrowSchemaRef,
partition_values: IndexMap<String, Scalar>,
Expand Down Expand Up @@ -272,8 +274,12 @@ impl PartitionWriterConfig {
}
}

/// Partition writer implementation
/// This writer takes in table data as RecordBatches and writes it out to partitioned parquet files.
/// It buffers data in memory until it reaches a certain size, then writes it out to optimize file sizes.
/// When you complete writing you get back a list of Add actions that can be used to update the Delta table commit log.
#[derive(Debug)]
pub(crate) struct PartitionWriter {
pub struct PartitionWriter {
object_store: ObjectStoreRef,
writer_id: uuid::Uuid,
config: PartitionWriterConfig,
Expand Down Expand Up @@ -412,6 +418,7 @@ impl PartitionWriter {
Ok(())
}

/// Close the writer and get the new [Add] actions.
pub async fn close(mut self) -> DeltaResult<Vec<Add>> {
self.flush_arrow_writer().await?;
Ok(self.files_written)
Expand Down
225 changes: 37 additions & 188 deletions docs/integrations/delta-lake-daft.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,26 @@

[Daft](https://www.getdaft.io) is a framework for ETL, analytics, and ML/AI at scale with a familiar Python dataframe API, implemented in Rust.

Daft and Delta Lake work really well together. Daft provides unified compute for Delta Lake’s unified storage. Together, Delta Lake and Daft give you high-performance query optimization and distributed compute on massive datasets.

Delta Lake is a great storage format for Daft workloads. Delta gives Daft users:

- **Query optimization** via file-skipping and column pruning
- **Versioning** for easy time travel functionality
- **Faster reads** via Z-ordering
- **ACID transactions** and **schema enforcement** for more reliable reads and writes

For Delta Lake users, Daft is a great data processing tool because it gives you the following features:

- **Skipping Filtered Data**: Daft implements automatic partition pruning and stats-based file pruning for filter predicates, skipping data that doesn’t need to be read.
- **Multimodal Dataframes**: read, write and transform multimodal data incl. images, JSON, PDF, audio, etc.
- **Parallel + Distributed Reads**: Daft parallelizes Delta Lake table reads over all cores of your machine, if using the default multithreading runner, or all cores + machines of your Ray cluster, if using the distributed Ray runner.
- **Multi-cloud Support**: Daft supports reading Delta Lake tables from AWS S3, Azure Blob Store, and GCS, as well as local files.
- **Skipping Filtered Data**: Daft implements automatic partition pruning and stats-based file pruning for filter predicates, skipping data that doesn’t need to be read.

Daft and Delta Lake work really well together. Daft provides unified compute for Delta Lake’s unified storage. Together, Delta Lake and Daft give you high-performance query optimization and distributed compute on massive datasets.
Let's look at how to use Delta Lake with Daft.

## Installing Daft for Delta Lake

The easiest way to use Delta Lake format with Daft DataFrames is to install Daft with the `[deltalake]` extras using `pip`:
The easiest way to use the Delta Lake table format with Daft DataFrames is to install Daft with the `[deltalake]` extras using `pip`:

```python
!pip install -U "getdaft[deltalake]"
Expand Down Expand Up @@ -79,72 +87,32 @@ On top of this, Daft also gives you:
- **Expressions API** for easy column transformations
- **UDFs** for multi-column transformation, incl. ML applications

Let's take a quick look at some of Daft's basic DataFrame operations.

You can **select columns** from your DataFrame using the `select` method. We'll use the `show` method to show the first `n` rows (defaults to 10):

```python
> df.select("first_name", "country").show()

| | first_name | country |
|---:|:-------------|:----------|
| 0 | Ernesto | Argentina |
| 1 | Bruce | China |
| 2 | Jack | China |
| 3 | Wolfgang | Germany |
| 4 | Soraya | Germany |
```

You can **sort** your Daft DataFrame using the `sort` method:

```python
> df.sort(df["country"], desc=True).show()

| | first_name | last_name | country | continent |
|---:|:-------------|:------------|:----------|:------------|
| 0 | Wolfgang | Manche | Germany | NaN |
| 1 | Soraya | Jala | Germany | NaN |
| 2 | Bruce | Lee | China | Asia |
| 3 | Jack | Ma | China | Asia |
| 4 | Ernesto | Guevara | Argentina | NaN |
```
Check out the [Daft User Guide](https://www.getdaft.io/projects/docs/en/latest/user_guide/index.html) for a complete list of DataFrame operations.

You can **filter** your DataFrame using the `where` method:
## Data Skipping Optimizations

```python
> df.where(df["continent"] == "Asia").show()
Delta Lake and Daft work together to give you highly-optimized query performance.

| | first_name | last_name | country | continent |
|---:|:-------------|:------------|:----------|:------------|
| 0 | Bruce | Lee | China | Asia |
| 1 | Jack | Ma | China | Asia |
```
Delta Lake stores your data in Parquet files. Parquet is a columnar row format that natively supports column pruning. If your query only needs to read data from a specific column or set of columns, you don't need to read in the entire file. This can save you lots of time and compute.

You can group your DataFrame by a specific columns using the `groupby` method. You can then specify the aggregation method, in this case using the `count` aggregator:
Delta Lake goes beyond the basic Parquet features by also giving you:

```python
> df.select("first_name", "country").groupby(df["country"]).count("first_name").show()
- partitioned reads
- file skipping via z-ordering.

| | country | first_name |
|---:|:----------|-------------:|
| 0 | Germany | 2 |
| 1 | China | 2 |
| 2 | Argentina | 1 |
```
This is great for Daft users who want to run efficient queries on large-scale data.

Check out the [Daft User Guide](https://www.getdaft.io/projects/docs/en/latest/user_guide/index.html) for a complete list of DataFrame operations.
Let's look at how this works.

## Data Skipping Optimizations
### Partitioned Reads

You may have noticed the Delta Lake warning at the top when we first called `collect` on our DataFrame:
You may have noticed the Delta Lake warning at the top when we first called `collect()` on our DataFrame:

<div class="alert alert-block alert-danger">WARNING: has partitioning keys = [PartitionField(country#Utf8)], but no partition filter was specified. This will result in a full table scan.</div>
> `WARNING: has partitioning keys = [PartitionField(country#Utf8)], but no partition filter was specified. This will result in a full table scan.`
Delta Lake is informing us that the data is partitioned on the `country` column.

Daft's native query optimizer has access to all of the Delta Lake metadata.

This means it can optimize your query by skipping the partitions that are not relevant for this query. Instead of having to read all 3 partitions, we can read only 1 and get the same result, just faster!
Daft does some nice magic here to help you out. The Daft query optimizer has access to all of the Delta Lake metadata. This means it can optimize your query by skipping the partitions that are not relevant for this query. Instead of having to read all 3 partitions, we can read only 1 and get the same result, just faster!

```python
# Filter on partition columns will result in efficient partition pruning; non-matching partitions will be skipped.
Expand All @@ -156,7 +124,9 @@ This means it can optimize your query by skipping the partitions that are not re
| 1 | Soraya | Jala | Germany | nan |
```

You can use the `explain` method to see how Daft is optimizing your query. Since we've already called `collect` on our DataFrame, it is already in memory. So below we copy the output of `explain(show_all=True)` **before** calling `collect`:
You can use the `explain()` method to see how Daft is optimizing your query.

> Since we've already called `collect` on our DataFrame, it is already in memory. So below we copy the output of `explain(show_all=True)` **before** calling `collect`:
Running `df.where(df["continent"] == "Asia").explain(True)` returns:

Expand Down Expand Up @@ -202,146 +172,25 @@ Whereas running `df.where(df["country"] == "Germany").explain(True)` returns:
| Clustering spec = { Num partitions = 1 }
```

Running a query on a non-partitioned column like `continent` will require reading in all partitions, totalling 3045 bytes in this case.
Running a query on a non-partitioned column like `continent` will require reading in all partitions, totalling 3045 bytes in the case of this toy example.

Instead, running a query on a partitioned column (`country` in this case) means Daft only has to read only the relevant partition, saving us a whopping 2000+ bytes in this toy example :)
Instead, running a query on a partitioned column (`country` in this case) means Daft only has to read only the relevant partition, saving us a ~60% of the compute. This has huge impacts when you're working at scale.

You can read [High-Performance Querying on Massive Delta Lake Tables with Daft](https://delta.io/blog/daft-delta-lake-integration/) for an in-depth benchmarking of query optimization with Delta Lake and Daft.
### Z-Ordering for enhanced file skipping

## Transform columns with Expressions
[Z-ordering](https://delta.io/blog/2023-06-03-delta-lake-z-order/) stores similar data close together to optimize query performance. This is especially useful when you're querying on one or multiple columns.

Daft provides a flexible [Expressions](https://www.getdaft.io/projects/docs/en/latest/api_docs/expressions.html) API for defining computation that needs to happen over your columns.
Using Z-Ordered Delta tables instead of regular Parquet can give Daft users significant speed-ups.

For example, we can use `daft.col()` expressions together with the `with_column` method to create a new column `full_name`, joining the contents of the `last_name` column to the `first_name` column:
Read [High-Performance Querying on Massive Delta Lake Tables with Daft](https://delta.io/blog/daft-delta-lake-integration/) for an in-depth benchmarking of query optimization with Delta Lake and Daft using partitioning and Z-ordering.

```python
> df_full = df.with_column("full_name", daft.col('first_name') + ' ' + daft.col('last_name'))
> df_full.show()

| | first_name | last_name | country | continent | full_name |
|---:|:-------------|:------------|:----------|:------------|:----------------|
| 0 | Ernesto | Guevara | Argentina | NaN | Ernesto Guevara |
| 1 | Bruce | Lee | China | Asia | Bruce Lee |
| 2 | Jack | Ma | China | Asia | Jack Ma |
| 3 | Wolfgang | Manche | Germany | NaN | Wolfgang Manche |
| 4 | Soraya | Jala | Germany | NaN | Soraya Jala |
```

## Multimodal Data Type Support
## Daft gives you Multimodal Data Type Support

Daft has a rich multimodal type-system with support for Python objects, Images, URLs, Tensors and more.

Daft columns can contain any Python objects. For example, let's add a column containing a Python class `Dog` for some of the people in our dataset:

```python
> import numpy as np

> class Dog:
> def __init__(self, name):
> self.name = name

> def bark(self):
> return f"{self.name}!"

> df_dogs = daft.from_pydict({
> 'full_name': ['Ernesto Guevara','Bruce Lee','Jack Ma','Wolfgang Manche','Soraya Jala'],
> "dogs": [Dog("ruffles"), Dog("shnoodles"), Dog("waffles"), Dog("doofus"), Dog("Fluffles")],
> })

> df_dogs.show()

| | full_name | dogs |
|---:|:----------------|:-------------------------------------|
| 0 | Ernesto Guevara | <__main__.Dog object at 0x1603d1c10> |
| 1 | Bruce Lee | <__main__.Dog object at 0x126ab9b90> |
| 2 | Jack Ma | <__main__.Dog object at 0x1603d27d0> |
| 3 | Wolfgang Manche | <__main__.Dog object at 0x1603d1cd0> |
| 4 | Soraya Jala | <__main__.Dog object at 0x1603d3f50> |

```

You can join this new `dogs` column to your existing DataFrame using the `join` method:

```python
> df_family = df_full.join(df_dogs, on=["full_name"])
> df_family.show()

| | full_name | first_name | last_name | country | continent | dogs |
|---:|:----------------|:-------------|:------------|:----------|:------------|:-------------------------------------|
| 0 | Ernesto Guevara | Ernesto | Guevara | Argentina | NaN | <__main__.Dog object at 0x1603d1c10> |
| 1 | Bruce Lee | Bruce | Lee | China | Asia | <__main__.Dog object at 0x126ab9b90> |
| 2 | Jack Ma | Jack | Ma | China | Asia | <__main__.Dog object at 0x1603d27d0> |
| 3 | Wolfgang Manche | Wolfgang | Manche | Germany | NaN | <__main__.Dog object at 0x1603d1cd0> |
| 4 | Soraya Jala | Soraya | Jala | Germany | NaN | <__main__.Dog object at 0x1603d3f50> |
```

We can then use the `apply` method to apply a function to each instance of the Dog class:

```python
> from daft import DataType

> df_family = df_family.with_column(
> "dogs_bark_name",
> df_family["dogs"].apply(lambda dog: dog.bark(), return_dtype=DataType.string()),
> )

> df_family.show()

| | first_name | last_name | country | continent | full_name | dogs | dogs_bark_name |
|---:|:-------------|:------------|:----------|:------------|:----------------|:-------------------------------------|:-----------------|
| 0 | Ernesto | Guevara | Argentina | NaN | Ernesto Guevara | <__main__.Dog object at 0x1603d1c10> | ruffles! |
| 1 | Bruce | Lee | China | Asia | Bruce Lee | <__main__.Dog object at 0x126ab9b90> | shnoodles! |
| 2 | Jack | Ma | China | Asia | Jack Ma | <__main__.Dog object at 0x1603d27d0> | waffles! |
| 3 | Wolfgang | Manche | Germany | NaN | Wolfgang Manche | <__main__.Dog object at 0x1603d1cd0> | doofus! |
| 4 | Soraya | Jala | Germany | NaN | Soraya Jala | <__main__.Dog object at 0x1603d3f50> | Fluffles! |
```

Daft DataFrames can also contain [many other data types](https://www.getdaft.io/projects/docs/en/latest/api_docs/datatype.html), like tensors, JSON, URLs and images. The Expressions API provides useful tools to work with these data types.

Take a look at the notebook in the `delta-examples` Github repository for a closer look at how Daft handles URLs, images and ML applications.

## Transform multiple columns with UDFs

You can use User-Defined Functions (UDFs) to run functions over multiple rows or columns:

```python
> from daft import udf

> @udf(return_dtype=DataType.string())
> def custom_bark(dog_series, owner_series):
> return [
> f"{dog.name} loves {owner_name}!"
> for dog, owner_name
> in zip(dog_series.to_pylist(), owner_series.to_pylist())
> ]

> df_family = df_family.with_column("custom_bark", custom_bark(df_family["dogs"], df_family["first_name"]))
> df_family.select("full_name", "dogs_bark_name", "custom_bark").show()

| | full_name | dogs_bark_name | custom_bark |
|---:|:----------------|:-----------------|:-----------------------|
| 0 | Ernesto Guevara | ruffles! | ruffles loves Ernesto! |
| 1 | Bruce Lee | shnoodles! | shnoodles loves Bruce! |
| 2 | Jack Ma | waffles! | waffles loves Jack! |
| 3 | Wolfgang Manche | doofus! | doofus loves Wolfgang! |
| 4 | Soraya Jala | Fluffles! | Fluffles loves Soraya! |
```

Daft supports workloads with [many more data types](https://www.getdaft.io/projects/docs/en/latest/api_docs/datatype.html) than traditional DataFrame APIs.

By combining multimodal data support with the UDF functionality you can [run ML workloads](https://www.getdaft.io/projects/docs/en/latest/user_guide/tutorials.html#mnist-digit-classification) right within your DataFrame.

## When should I use Daft DataFrames?

Daft DataFrames are designed for multimodal, distributed workloads.

You may want to consider using Daft if you're working with:

1. **Large datasets** that don't fit into memory or would benefit from parallelization
2. **Multimodal data types**, such as images, JSON, vector embeddings, and tensors
3. **ML workloads** that would benefit from interactive computation within DataFrame (via UDFs)
The [Expressions API](https://www.getdaft.io/projects/docs/en/latest/api_docs/expressions.html) provides useful tools to work with these data types. By combining multimodal data support with the [User-Defined Functions API](https://www.getdaft.io/projects/docs/en/latest/api_docs/udf.html) you can [run ML workloads](https://www.getdaft.io/projects/docs/en/latest/user_guide/tutorials.html#mnist-digit-classification) right within your DataFrame.

Take a look at the [Daft tutorials](https://www.getdaft.io/projects/docs/en/latest/user_guide/tutorials.html) for in-depth examples of each use case.
Take a look at the notebook in the [`delta-examples` Github repository](https://github.com/delta-io/delta-examples) for a closer look at how Daft handles URLs, images and ML applications.

## Contribute to `daft`

Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-python"
version = "0.17.4"
version = "0.17.5"
authors = ["Qingping Hou <dave2008713@gmail.com>", "Will Jones <willjones127@gmail.com>"]
homepage = "https://github.com/delta-io/delta-rs"
license = "Apache-2.0"
Expand Down

0 comments on commit 763b6dd

Please sign in to comment.