Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for multiple partition columns and filters in to_pyarrow_dataset() and OR filters in write_datalake() #1722

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 42 additions & 10 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,16 +606,48 @@ def to_pyarrow_dataset(
default_fragment_scan_options=ParquetFragmentScanOptions(pre_buffer=True),
)

fragments = [
format.make_fragment(
file,
filesystem=filesystem,
partition_expression=part_expression,
)
for file, part_expression in self._table.dataset_partitions(
self.schema().to_pyarrow(), partitions
)
]
fragments = []
if partitions is None:
partition_filters = None
else:
if partitions and isinstance(partitions, list):
partition_count = len(partitions)
partition_type = type(partitions[0])

if partition_count == 1 and partition_type is list:
partition_filters = partitions
elif partition_count == 1 and partition_type is tuple:
partition_filters = [partitions]
elif all(isinstance(x, tuple) for x in partitions):
partition_filters = [partitions]
elif all(isinstance(x, list) for x in partitions):
partition_filters = partitions
else:
partition_filters = None
else:
raise ValueError(
"Partitions must be a list of tuples, or a lists of lists of tuples"
)

if partition_filters is not None:
for partition in partition_filters:
for file, partition_expression in self._table.dataset_partitions(
schema=self.schema().to_pyarrow(), partition_filters=partition
):
fragments.append(
format.make_fragment(file, filesystem, partition_expression)
)
else:
fragments = [
format.make_fragment(
file,
filesystem=filesystem,
partition_expression=part_expression,
)
for file, part_expression in self._table.dataset_partitions(
self.schema().to_pyarrow(), partitions
)
]

schema = self.schema().to_pyarrow()

Expand Down
128 changes: 117 additions & 11 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,43 @@ class AddAction:
stats: str


def _match_filter(
filter_: List[Tuple[str, str, Any]], partition_values: Mapping[str, Optional[str]]
):
"""Matches a filter against a partition value from AddAction instance.

This ensures that create_write_transaction is called with a valid partition filter.

:param filter_: A list of tuple(s) in DNF format (column, operator, value)
:param partition_values: A mapping of partition values

Examples:
>>> _match_filter([("a", "=", 1)], {"a": 1})
>>> _match_filter([("a", "=", 1), ("b", "=", 2)], {"a": 1, "b": 2})
"""
column, op, value = filter_
actual_value = partition_values.get(column)

if op == "=" or op == "==":
return actual_value == value
elif op == "!=":
return actual_value != value
elif op == "<":
return actual_value < value
elif op == ">":
return actual_value > value
elif op == "<=":
return actual_value <= value
elif op == ">=":
return actual_value >= value
elif op == "in":
return actual_value in value
elif op == "not in":
return actual_value not in value
else:
raise ValueError(f'"{filter_}" is not a valid operator in predicates.')


def write_deltalake(
table_or_uri: Union[str, Path, DeltaTable],
data: Union[
Expand Down Expand Up @@ -268,12 +305,47 @@ def check_data_is_aligned_with_partition_filtering(
) -> None:
if table is None:
return

if partition_filters is None:
filters = None
else:
if isinstance(partition_filters, list):
partition_count = len(partition_filters)

partition_type = type(partition_filters[0])

if partition_count == 1 and partition_type is list:
filters = partition_filters
elif partition_count == 1 and partition_type is tuple:
filters = [partition_filters]
elif all(isinstance(x, tuple) for x in partition_filters):
filters = [partition_filters]
elif all(isinstance(x, list) for x in partition_filters):
filters = partition_filters
else:
filters = None
else:
raise ValueError(
"Partitions must be a list of tuples, or a lists of lists of tuples"
)

allowed_partitions = set()
if filters is not None:
for filter_ in filters:
if isinstance(filter_, list):
allowed_partitions.update(
table._table.get_active_partitions(filter_)
)
else:
allowed_partitions.update(
table._table.get_active_partitions(filter_)
)
else:
allowed_partitions = table._table.get_active_partitions()

existed_partitions: FrozenSet[
FrozenSet[Tuple[str, Optional[str]]]
] = table._table.get_active_partitions()
allowed_partitions: FrozenSet[
FrozenSet[Tuple[str, Optional[str]]]
] = table._table.get_active_partitions(partition_filters)
partition_values = pa.RecordBatch.from_arrays(
[
batch.column(column_name)
Expand Down Expand Up @@ -356,14 +428,48 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
storage_options,
)
else:
table._table.create_write_transaction(
add_actions,
mode,
partition_by or [],
schema,
partition_filters,
)
table.update_incremental()
if table is not None:
if partition_filters is None:
table._table.create_write_transaction(
add_actions,
mode,
partition_by or [],
schema,
partition_filters,
)
table.update_incremental()
elif isinstance(partition_filters, list):
if all(isinstance(x, list) for x in partition_filters):
original_add_actions = add_actions.copy()

for partition_filter in partition_filters:
filtered_add_actions = [
action
for action in original_add_actions
if all(
_match_filter(filter_, action.partition_values)
for filter_ in partition_filter
)
]
table._table.create_write_transaction(
filtered_add_actions,
mode,
partition_by or [],
schema,
partition_filter,
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we creating a write transaction per filter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seemed like the create_write_transaction() was not able to handle a list of lists of tuples (OR conditions), so I had to treat each condition separately and loop through them.

[id_values0-account_values0-created_date_values0-updated_at_values0-value_values0-partition_by0-partition_filters0] TypeError: argument 'partitions_filters': 'list' object cannot be converted to 'PyTuple' [909, 1]

Looping through each filter results in this:

partition_filters=[
                [("created_date", "=", "2023-08-25")],
                [("created_date", "=", "2023-09-07")],
                [("created_date", "=", "2023-09-21")],
            ],

ic| partition_filter: [('created_date', '=', '2023-08-25')]
ic| filtered_add_actions: [AddAction(path='created_date=2023-08-25/2-a542e805-94c2-4e57-a957-0944117d293d-0.parquet',
                                     size=3715,
                                     partition_values={'created_date': '2023-08-25'},
                                     modification_time=1697321774036,
                                     data_change=True,
                                     stats='{"numRecords": 1, "minValues": {"id": 1, "account": '
                                           '"account_a", "updated_at": "2023-10-14T22:16:13.921783", '
                                           '"value": 44.5}, "maxValues": {"id": 1, "account": '
                                           '"account_a", "updated_at": "2023-10-14T22:16:13.921783", '
                                           '"value": 44.5}, "nullCount": {"id": 0, "account": 0, '
                                           '"updated_at": 0, "value": 0}}')]
ic| 'update_incremental'

ic| partition_filter: [('created_date', '=', '2023-09-07')]
ic| filtered_add_actions: [AddAction(path='created_date=2023-09-07/2-a542e805-94c2-4e57-a957-0944117d293d-0.parquet',
                                     size=3715,
                                     partition_values={'created_date': '2023-09-07'},
                                     modification_time=1697321774034,
                                     data_change=True,
                                     stats='{"numRecords": 1, "minValues": {"id": 3, "account": '
                                           '"account_b", "updated_at": "2023-10-14T22:16:13.921786", '
                                           '"value": 68.0}, "maxValues": {"id": 3, "account": '
                                           '"account_b", "updated_at": "2023-10-14T22:16:13.921786", '
                                           '"value": 68.0}, "nullCount": {"id": 0, "account": 0, '
                                           '"updated_at": 0, "value": 0}}')]
ic| 'update_incremental'

ic| partition_filter: [('created_date', '=', '2023-09-21')]
ic| filtered_add_actions: [AddAction(path='created_date=2023-09-21/2-a542e805-94c2-4e57-a957-0944117d293d-0.parquet',
                                     size=3715,
                                     partition_values={'created_date': '2023-09-21'},
                                     modification_time=1697321774034,
                                     data_change=True,
                                     stats='{"numRecords": 1, "minValues": {"id": 4, "account": '
                                           '"account_a", "updated_at": "2023-10-14T22:16:13.921786", '
                                           '"value": 11.5}, "maxValues": {"id": 4, "account": '
                                           '"account_a", "updated_at": "2023-10-14T22:16:13.921786", '
                                           '"value": 11.5}, "nullCount": {"id": 0, "account": 0, '
                                           '"updated_at": 0, "value": 0}}')]
ic| 'update_incremental'

If we use a single list of tuples (AND condition) like the example below, there is still only one call to table.update_incremental().

ic| partition_filter: [('created_date', '>', '2023-08-01'), ('created_date', '<', '2023-12-31')]
ic| filtered_add_actions: [AddAction(path='created_date=2023-08-25/account=account_a/2-6ce878e9-572d-4d78-9baa-8b34fe0b855e-0.parquet',
                                     size=3383,
                                     partition_values={'account': 'account_a',
                                                       'created_date': '2023-08-25'},
                                     modification_time=1697322195775,
                                     data_change=True,
                                     stats='{"numRecords": 1, "minValues": {"id": 1, "updated_at": '
                                           '"2023-10-14T22:23:15.400092", "value": 0.1}, "maxValues": '
                                           '{"id": 1, "updated_at": "2023-10-14T22:23:15.400092", '
                                           '"value": 0.1}, "nullCount": {"id": 0, "updated_at": 0, '
                                           '"value": 0}}'),
                           AddAction(path='created_date=2023-09-05/account=account_b/2-6ce878e9-572d-4d78-9baa-8b34fe0b855e-0.parquet',
                                     size=3383,
                                     partition_values={'account': 'account_b',
                                                       'created_date': '2023-09-05'},
                                     modification_time=1697322195778,
                                     data_change=True,
                                     stats='{"numRecords": 1, "minValues": {"id": 2, "updated_at": '
                                           '"2023-10-14T22:23:15.400093", "value": 0.2}, "maxValues": '
                                           '{"id": 2, "updated_at": "2023-10-14T22:23:15.400093", '
                                           '"value": 0.2}, "nullCount": {"id": 0, "updated_at": 0, '
                                           '"value": 0}}'),
                           AddAction(path='created_date=2023-10-02/account=account_b/2-6ce878e9-572d-4d78-9baa-8b34fe0b855e-0.parquet',
                                     size=3383,
                                     partition_values={'account': 'account_b',
                                                       'created_date': '2023-10-02'},
                                     modification_time=1697322195780,
                                     data_change=True,
                                     stats='{"numRecords": 1, "minValues": {"id": 5, "updated_at": '
                                           '"2023-10-14T22:23:15.400093", "value": 0.5}, "maxValues": '
                                           '{"id": 5, "updated_at": "2023-10-14T22:23:15.400093", '
                                           '"value": 0.5}, "nullCount": {"id": 0, "updated_at": 0, '
                                           '"value": 0}}'),
                           AddAction(path='created_date=2023-09-07/account=account_a/2-6ce878e9-572d-4d78-9baa-8b34fe0b855e-0.parquet',
                                     size=3383,
                                     partition_values={'account': 'account_a',
                                                       'created_date': '2023-09-07'},
                                     modification_time=1697322195780,
                                     data_change=True,
                                     stats='{"numRecords": 1, "minValues": {"id": 3, "updated_at": '
                                           '"2023-10-14T22:23:15.400093", "value": 0.3}, "maxValues": '
                                           '{"id": 3, "updated_at": "2023-10-14T22:23:15.400093", '
                                           '"value": 0.3}, "nullCount": {"id": 0, "updated_at": 0, '
                                           '"value": 0}}'),
                           AddAction(path='created_date=2023-09-21/account=account_c/2-6ce878e9-572d-4d78-9baa-8b34fe0b855e-0.parquet',
                                     size=3383,
                                     partition_values={'account': 'account_c',
                                                       'created_date': '2023-09-21'},
                                     modification_time=1697322195781,
                                     data_change=True,
                                     stats='{"numRecords": 1, "minValues": {"id": 4, "updated_at": '
                                           '"2023-10-14T22:23:15.400093", "value": 0.4}, "maxValues": '
                                           '{"id": 4, "updated_at": "2023-10-14T22:23:15.400093", '
                                           '"value": 0.4}, "nullCount": {"id": 0, "updated_at": 0, '
                                           '"value": 0}}')]
ic| 'update_incremental'

table.update_incremental()
elif all(isinstance(x, tuple) for x in partition_filters):
table._table.create_write_transaction(
add_actions,
mode,
partition_by or [],
schema,
partition_filters,
)
table.update_incremental()
else:
raise ValueError("Invalid format for partition_filters")


def __enforce_append_only(
Expand Down
73 changes: 73 additions & 0 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,79 @@ def test_delta_table_with_filters():
)


def test_pyarrow_dataset_partitions():
table_path = "../rust/tests/data/delta-0.8.0-partitioned"
dt = DeltaTable(table_path)

single_partition = [("day", "=", "1")]
dataset_filtered = dt.to_pyarrow_dataset(partitions=single_partition)
data_filtered = dataset_filtered.to_table()
dataset = dt.to_pyarrow_dataset()
filter_expr = ds.field("day") == "1"
data = dataset.to_table(filter=filter_expr)
assert data_filtered.num_rows == data.num_rows

single_partition_multiple_columns = [("month", "=", "2"), ("day", "=", "5")]
dataset_filtered = dt.to_pyarrow_dataset(
partitions=single_partition_multiple_columns
)
data_filtered = dataset_filtered.to_table()
dataset = dt.to_pyarrow_dataset()
filter_expr = (ds.field("month") == "2") & (ds.field("day") == "5")
data = dataset.to_table(filter=filter_expr)
assert data_filtered.num_rows == data.num_rows

multiple_partitions_single_column = [[("month", "=", "2")], [("month", "=", "4")]]
dataset_filtered = dt.to_pyarrow_dataset(
partitions=multiple_partitions_single_column
)
data_filtered = dataset_filtered.to_table()
dataset = dt.to_pyarrow_dataset()
filter_expr = (ds.field("month") == "2") | (ds.field("month") == "4")
data = dataset.to_table(filter=filter_expr)
assert data_filtered.num_rows == data.num_rows

multiple_partitions_multiple_columns = [
[("year", "=", "2020"), ("month", "=", "2"), ("day", "=", "5")],
[("year", "=", "2021"), ("month", "=", "4"), ("day", "=", "5")],
[("year", "=", "2021"), ("month", "=", "3"), ("day", "=", "1")],
]
dataset_filtered = dt.to_pyarrow_dataset(
partitions=multiple_partitions_multiple_columns
)
data_filtered = dataset_filtered.to_table()
dataset = dt.to_pyarrow_dataset()
filter_expr = (
(
(ds.field("year") == "2020")
& (ds.field("month") == "2")
& (ds.field("day") == "5")
)
| (
(ds.field("year") == "2021")
& (ds.field("month") == "4")
& (ds.field("day") == "5")
)
| (
(ds.field("year") == "2021")
& (ds.field("month") == "3")
& (ds.field("day") == "1")
)
)
data = dataset.to_table(filter=filter_expr)
assert data_filtered.num_rows == data.num_rows

single_partition_single_column_list = [[("year", "=", "2020")]]
dataset_filtered = dt.to_pyarrow_dataset(
partitions=single_partition_single_column_list
)
data_filtered = dataset_filtered.to_table()
dataset = dt.to_pyarrow_dataset()
filter_expr = ds.field("year") == "2020"
data = dataset.to_table(filter=filter_expr)
assert data_filtered.num_rows == data.num_rows


def test_writer_fails_on_protocol():
table_path = "../rust/tests/data/simple_table"
dt = DeltaTable(table_path)
Expand Down