Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Batch definition sorting #9720

Merged
merged 42 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
42a91df
[BUGFIX] Exclude batch_definitions from _EXCLUDE_FROM_READER_OPTIONS
tyler-hoffman Apr 3, 2024
2dccd34
Merge branch 'b/_/exclude_batch_definition' into f/v1_21/batch-defini…
tyler-hoffman Apr 8, 2024
4817407
Add sort_batches_ascending to partitioners
tyler-hoffman Apr 8, 2024
4c80342
Add ascending property to concrete implementations as well
tyler-hoffman Apr 8, 2024
b5d585b
Implement sorting for sql assets
tyler-hoffman Apr 8, 2024
a64af4d
Implement sorting for filesystem assets
tyler-hoffman Apr 8, 2024
75d049f
Add another sorting field that I missed
tyler-hoffman Apr 9, 2024
7a2cf62
Add some more temporary generic sort_batches implementations
tyler-hoffman Apr 9, 2024
3e3f3a7
Merge branch 'develop' into f/v1_21/batch-definition-sorting
tyler-hoffman Apr 9, 2024
8a1826e
Move sorting back to assets
tyler-hoffman Apr 9, 2024
d4fa91b
Fix a test
tyler-hoffman Apr 10, 2024
7ae70ed
Remove no-longer-needed test case
tyler-hoffman Apr 10, 2024
1d08f0a
Invoke schema --sync
tyler-hoffman Apr 10, 2024
b8e7b1a
Merge branch 'develop' into f/v1_21/batch-definition-sorting
tyler-hoffman Apr 10, 2024
f286996
Invoke schema --sync with 3.8
tyler-hoffman Apr 10, 2024
137b699
Fix test
tyler-hoffman Apr 10, 2024
e52d1b3
Merge branch 'develop' into f/v1_21/batch-definition-sorting
tyler-hoffman Apr 10, 2024
1a53bcd
Remove another test that is no longer relevant
tyler-hoffman Apr 10, 2024
3973a32
Add test cases around sorting
tyler-hoffman Apr 10, 2024
24bf66b
Go back to the older sorting
tyler-hoffman Apr 10, 2024
ed8120c
Improve tests
tyler-hoffman Apr 10, 2024
c23918e
Mark unit tests
tyler-hoffman Apr 10, 2024
e5eb301
Add test case on errors
tyler-hoffman Apr 10, 2024
653c15d
Remove old test and update another that may still be useful
tyler-hoffman Apr 10, 2024
948a315
Remove unused constants
tyler-hoffman Apr 10, 2024
110ae5c
Update test and remove warning (for now)
tyler-hoffman Apr 10, 2024
69ebe7b
Mark tests
tyler-hoffman Apr 10, 2024
6cabc9a
Move tests to more accurate file
tyler-hoffman Apr 11, 2024
a2a44f4
Docstrings
tyler-hoffman Apr 11, 2024
064dce9
Add tests around sql
tyler-hoffman Apr 11, 2024
9bb27c3
Merge remote-tracking branch 'origin' into f/v1_21/batch-definition-s…
tyler-hoffman Apr 11, 2024
c4be492
Fix import to use typing_extensions
tyler-hoffman Apr 11, 2024
a9b9825
Use List instead of list
tyler-hoffman Apr 11, 2024
c55aec5
Back out a change
tyler-hoffman Apr 11, 2024
73aa074
Merge branch 'develop' into f/v1_21/batch-definition-sorting
tyler-hoffman Apr 11, 2024
020cd17
Address nit: make use of range ends consistent
tyler-hoffman Apr 12, 2024
b94ec64
Rename sort_batches_ascending -> sort_ascending
tyler-hoffman Apr 12, 2024
6828c79
Invoke schema --sync with 3.8
tyler-hoffman Apr 12, 2024
a35c0ec
DRY up partitioner protocol
tyler-hoffman Apr 12, 2024
423f3e0
Merge remote-tracking branch 'origin' into f/v1_21/batch-definition-s…
tyler-hoffman Apr 12, 2024
7da2cbd
Directly inherit from protocol
tyler-hoffman Apr 12, 2024
ec4e8bc
Update use of partitioner and add type in invalid_datasource.py
tyler-hoffman Apr 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions great_expectations/core/partitioners.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,21 @@
@public_api
class PartitionerYear(pydantic.BaseModel):
column_name: str
sort_batches_ascending: bool = True
tyler-hoffman marked this conversation as resolved.
Show resolved Hide resolved
method_name: Literal["partition_on_year"] = "partition_on_year"


@public_api
class PartitionerYearAndMonth(pydantic.BaseModel):
column_name: str
sort_batches_ascending: bool = True
method_name: Literal["partition_on_year_and_month"] = "partition_on_year_and_month"


@public_api
class PartitionerYearAndMonthAndDay(pydantic.BaseModel):
column_name: str
sort_batches_ascending: bool = True
method_name: Literal["partition_on_year_and_month_and_day"] = (
"partition_on_year_and_month_and_day"
)
Expand All @@ -28,38 +31,44 @@ class PartitionerYearAndMonthAndDay(pydantic.BaseModel):
class PartitionerDatetimePart(pydantic.BaseModel):
datetime_parts: List[str]
column_name: str
sort_batches_ascending: bool = True
method_name: Literal["partition_on_date_parts"] = "partition_on_date_parts"


@public_api
class PartitionerDividedInteger(pydantic.BaseModel):
divisor: int
column_name: str
sort_batches_ascending: bool = True
method_name: Literal["partition_on_divided_integer"] = "partition_on_divided_integer"


@public_api
class PartitionerModInteger(pydantic.BaseModel):
mod: int
column_name: str
sort_batches_ascending: bool = True
method_name: Literal["partition_on_mod_integer"] = "partition_on_mod_integer"


@public_api
class PartitionerColumnValue(pydantic.BaseModel):
column_name: str
sort_batches_ascending: bool = True
method_name: Literal["partition_on_column_value"] = "partition_on_column_value"


@public_api
class PartitionerMultiColumnValue(pydantic.BaseModel):
column_names: List[str]
sort_batches_ascending: bool = True
method_name: Literal["partition_on_multi_column_values"] = "partition_on_multi_column_values"


@public_api
class PartitionerConvertedDatetime(pydantic.BaseModel):
column_name: str
sort_batches_ascending: bool = True
method_name: Literal["partition_on_converted_datetime"] = "partition_on_converted_datetime"
date_format_string: str

Expand Down
10 changes: 5 additions & 5 deletions great_expectations/datasource/fluent/file_path_data_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import copy
import logging
import re
import warnings
from collections import Counter
from pprint import pformat as pf
from typing import (
Expand Down Expand Up @@ -308,7 +307,9 @@
)
batch_list.append(batch)

self.sort_batches(batch_list)
if batch_request.partitioner:
spark_partitioner = self.get_partitioner_implementation(batch_request.partitioner)
self.sort_batches(batch_list, spark_partitioner)

return batch_list

Expand All @@ -334,9 +335,8 @@
if batch_parameters_counts[param_name] == 1:
batch_request_copy_without_partitioner_kwargs.options.pop(param_name)
else:
warnings.warn(
f"The same option name is applied for your batch regex and partitioner config: {param_name}" # noqa: E501
)
# TODO: figure out what to do here!
tyler-hoffman marked this conversation as resolved.
Show resolved Hide resolved
...

Check warning on line 339 in great_expectations/datasource/fluent/file_path_data_asset.py

View check run for this annotation

Codecov / codecov/patch

great_expectations/datasource/fluent/file_path_data_asset.py#L339

Added line #L339 was not covered by tests
batch_definition_list = self._data_connector.get_batch_definition_list(
batch_request=batch_request_copy_without_partitioner_kwargs
)
Expand Down
20 changes: 14 additions & 6 deletions great_expectations/datasource/fluent/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
MutableMapping,
MutableSequence,
Optional,
Protocol,
Sequence,
Set,
Type,
Expand Down Expand Up @@ -95,6 +96,13 @@
)


class _PartitionerProtocol(Protocol):
tyler-hoffman marked this conversation as resolved.
Show resolved Hide resolved
sort_batches_ascending: bool

@property
def param_names(self) -> List[str]: ...


class TestConnectionError(Exception):
pass

Expand Down Expand Up @@ -400,21 +408,21 @@ def add_sorters(self: Self, sorters: SortersDefinition) -> Self:
self.order_by = _sorter_from_list(sorters)
return self

def sort_batches(self, batch_list: List[Batch]) -> None:
def sort_batches(self, batch_list: List[Batch], partitioner: _PartitionerProtocol) -> None:
"""Sorts batch_list in place in the order configured in this DataAsset.

Args:
batch_list: The list of batches to sort in place.
"""
for sorter in reversed(self.order_by):
reverse = not partitioner.sort_batches_ascending
for key in reversed(partitioner.param_names):
try:
batch_list.sort(
key=functools.cmp_to_key(_sort_batches_with_none_metadata_values(sorter.key)),
reverse=sorter.reverse,
key=functools.cmp_to_key(_sort_batches_with_none_metadata_values(key)),
reverse=reverse,
)
except KeyError as e:
raise KeyError( # noqa: TRY003
f"Trying to sort {self.name} table asset batches on key {sorter.key} "
f"Trying to sort {self.name} table asset batches on key {key} "
"which isn't available on all batches."
) from e

Expand Down
2 changes: 1 addition & 1 deletion great_expectations/datasource/fluent/invalid_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> NoRe
self._raise_type_error()

@override
def sort_batches(self, batch_list: List[Batch]) -> None:
def sort_batches(self, batch_list: List[Batch], partitioner) -> None:
tyler-hoffman marked this conversation as resolved.
Show resolved Hide resolved
self._raise_type_error()

@override
Expand Down
45 changes: 45 additions & 0 deletions great_expectations/datasource/fluent/schemas/BatchRequest.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@
"title": "Column Name",
"type": "string"
},
"sort_batches_ascending": {
"title": "Sort Batches Ascending",
"default": true,
"type": "boolean"
},
"method_name": {
"title": "Method Name",
"default": "partition_on_column_value",
Expand All @@ -133,6 +138,11 @@
"type": "string"
}
},
"sort_batches_ascending": {
"title": "Sort Batches Ascending",
"default": true,
"type": "boolean"
},
"method_name": {
"title": "Method Name",
"default": "partition_on_multi_column_values",
Expand All @@ -159,6 +169,11 @@
"title": "Column Name",
"type": "string"
},
"sort_batches_ascending": {
"title": "Sort Batches Ascending",
"default": true,
"type": "boolean"
},
"method_name": {
"title": "Method Name",
"default": "partition_on_divided_integer",
Expand Down Expand Up @@ -186,6 +201,11 @@
"title": "Column Name",
"type": "string"
},
"sort_batches_ascending": {
"title": "Sort Batches Ascending",
"default": true,
"type": "boolean"
},
"method_name": {
"title": "Method Name",
"default": "partition_on_mod_integer",
Expand All @@ -209,6 +229,11 @@
"title": "Column Name",
"type": "string"
},
"sort_batches_ascending": {
"title": "Sort Batches Ascending",
"default": true,
"type": "boolean"
},
"method_name": {
"title": "Method Name",
"default": "partition_on_year",
Expand All @@ -231,6 +256,11 @@
"title": "Column Name",
"type": "string"
},
"sort_batches_ascending": {
"title": "Sort Batches Ascending",
"default": true,
"type": "boolean"
},
"method_name": {
"title": "Method Name",
"default": "partition_on_year_and_month",
Expand All @@ -253,6 +283,11 @@
"title": "Column Name",
"type": "string"
},
"sort_batches_ascending": {
"title": "Sort Batches Ascending",
"default": true,
"type": "boolean"
},
"method_name": {
"title": "Method Name",
"default": "partition_on_year_and_month_and_day",
Expand Down Expand Up @@ -282,6 +317,11 @@
"title": "Column Name",
"type": "string"
},
"sort_batches_ascending": {
"title": "Sort Batches Ascending",
"default": true,
"type": "boolean"
},
"method_name": {
"title": "Method Name",
"default": "partition_on_date_parts",
Expand All @@ -305,6 +345,11 @@
"title": "Column Name",
"type": "string"
},
"sort_batches_ascending": {
"title": "Sort Batches Ascending",
"default": true,
"type": "boolean"
},
"method_name": {
"title": "Method Name",
"default": "partition_on_converted_datetime",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@
"title": "Column Name",
"type": "string"
},
"sort_batches_ascending": {
"title": "Sort Batches Ascending",
"default": true,
"type": "boolean"
},
"method_name": {
"title": "Method Name",
"default": "partition_on_column_value",
Expand All @@ -139,6 +144,11 @@
"type": "string"
}
},
"sort_batches_ascending": {
"title": "Sort Batches Ascending",
"default": true,
"type": "boolean"
},
"method_name": {
"title": "Method Name",
"default": "partition_on_multi_column_values",
Expand All @@ -165,6 +175,11 @@
"title": "Column Name",
"type": "string"
},
"sort_batches_ascending": {
"title": "Sort Batches Ascending",
"default": true,
"type": "boolean"
},
"method_name": {
"title": "Method Name",
"default": "partition_on_divided_integer",
Expand Down Expand Up @@ -192,6 +207,11 @@
"title": "Column Name",
"type": "string"
},
"sort_batches_ascending": {
"title": "Sort Batches Ascending",
"default": true,
"type": "boolean"
},
"method_name": {
"title": "Method Name",
"default": "partition_on_mod_integer",
Expand All @@ -215,6 +235,11 @@
"title": "Column Name",
"type": "string"
},
"sort_batches_ascending": {
"title": "Sort Batches Ascending",
"default": true,
"type": "boolean"
},
"method_name": {
"title": "Method Name",
"default": "partition_on_year",
Expand All @@ -237,6 +262,11 @@
"title": "Column Name",
"type": "string"
},
"sort_batches_ascending": {
"title": "Sort Batches Ascending",
"default": true,
"type": "boolean"
},
"method_name": {
"title": "Method Name",
"default": "partition_on_year_and_month",
Expand All @@ -259,6 +289,11 @@
"title": "Column Name",
"type": "string"
},
"sort_batches_ascending": {
"title": "Sort Batches Ascending",
"default": true,
"type": "boolean"
},
"method_name": {
"title": "Method Name",
"default": "partition_on_year_and_month_and_day",
Expand Down Expand Up @@ -288,6 +323,11 @@
"title": "Column Name",
"type": "string"
},
"sort_batches_ascending": {
"title": "Sort Batches Ascending",
"default": true,
"type": "boolean"
},
"method_name": {
"title": "Method Name",
"default": "partition_on_date_parts",
Expand All @@ -311,6 +351,11 @@
"title": "Column Name",
"type": "string"
},
"sort_batches_ascending": {
"title": "Sort Batches Ascending",
"default": true,
"type": "boolean"
},
"method_name": {
"title": "Method Name",
"default": "partition_on_converted_datetime",
Expand Down