Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] DataAsset uses partitioner from BatchConfig #9499

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
c3969dd
wip
joshua-stauffer Feb 14, 2024
c9d892a
Merge branch 'develop' into f/v1-175/generic_partitioners
joshua-stauffer Feb 14, 2024
3cc0639
add generic partitioners
joshua-stauffer Feb 14, 2024
36239d9
move Partitioner out of typechecking
joshua-stauffer Feb 16, 2024
4854727
update import
joshua-stauffer Feb 16, 2024
19164a2
type
joshua-stauffer Feb 16, 2024
8bf119b
schema update
joshua-stauffer Feb 16, 2024
e695d19
add partitioner to BatchRequest
joshua-stauffer Feb 16, 2024
a167db1
update tests
joshua-stauffer Feb 16, 2024
50994e7
update test snapshot
joshua-stauffer Feb 16, 2024
e20b62e
add partitioner to DataAsset.build_batch_request
joshua-stauffer Feb 16, 2024
307263b
update type stub
joshua-stauffer Feb 16, 2024
5032352
update test snapshot
joshua-stauffer Feb 16, 2024
8193139
missed one
joshua-stauffer Feb 16, 2024
fa5cda8
Merge branch 'develop' into f/v1-175/add_partitioners_to_batch_request
joshua-stauffer Feb 16, 2024
9218fc9
add test
joshua-stauffer Feb 16, 2024
f8780c2
add partitioner map to base class
joshua-stauffer Feb 20, 2024
c7e9103
add partitioner maps
joshua-stauffer Feb 20, 2024
29e28af
rename spark partitioners
joshua-stauffer Feb 20, 2024
c0d2c1a
refactor sql partitioners
joshua-stauffer Feb 20, 2024
af5e0d9
rename spark partitioners
joshua-stauffer Feb 20, 2024
8795e5f
Merge branch 'develop' into f/v1-175/add_partitioners_to_batch_request
joshua-stauffer Feb 20, 2024
9d2bdf5
schema update
joshua-stauffer Feb 20, 2024
f9b2210
Merge branch 'f/v1-175/add_partitioners_to_batch_request' into f/v1-1…
joshua-stauffer Feb 20, 2024
89f7193
update per rename
joshua-stauffer Feb 20, 2024
4b405a0
Merge branch 'develop' into f/v1-175/asset_uses_partitioner_from_batc…
joshua-stauffer Feb 20, 2024
7bb9777
implement method to replace batch_options property
joshua-stauffer Feb 20, 2024
1c0c5d7
move partitioner resolve method to subclass
joshua-stauffer Feb 21, 2024
baff01e
typeguard
joshua-stauffer Feb 21, 2024
364dfaa
add override
joshua-stauffer Feb 21, 2024
f0d2f52
rename param
joshua-stauffer Feb 21, 2024
d626182
update types
joshua-stauffer Feb 21, 2024
a4e24b4
remove from interface
joshua-stauffer Feb 21, 2024
269089e
add types to subclass
joshua-stauffer Feb 21, 2024
f00a1fb
tweak type
joshua-stauffer Feb 21, 2024
dbd95d0
revert
joshua-stauffer Feb 21, 2024
1004b30
types
joshua-stauffer Feb 21, 2024
8623707
move type out of typechecking
joshua-stauffer Feb 21, 2024
ac3ee51
move import
joshua-stauffer Feb 21, 2024
85f072a
try type as function
joshua-stauffer Feb 21, 2024
9ed5cee
revert
joshua-stauffer Feb 21, 2024
c2ba4c7
hacky type workaround
joshua-stauffer Feb 21, 2024
fbf7657
use Type
joshua-stauffer Feb 21, 2024
abc76d5
spark: use batch request partitioner
joshua-stauffer Feb 21, 2024
0cf9e3a
sql: use batch request partitioner
joshua-stauffer Feb 21, 2024
de6fc0c
add sqlite partitioners
joshua-stauffer Feb 21, 2024
454aef8
fix sqlite tests
joshua-stauffer Feb 22, 2024
7a9fafa
use get_batch_request_options instead of batch_request_options property
joshua-stauffer Feb 22, 2024
6edf63f
wip
joshua-stauffer Feb 22, 2024
7da1552
rename method
joshua-stauffer Feb 22, 2024
aa244ab
sqlite tests passing
joshua-stauffer Feb 22, 2024
d0b3961
update signature
joshua-stauffer Feb 22, 2024
ab1149f
update types
joshua-stauffer Feb 22, 2024
e2e4d16
move ConvertedDatetime partitioner into sql
joshua-stauffer Feb 22, 2024
d801d17
ensure sqlite assets have access to correct partitioner map
joshua-stauffer Feb 22, 2024
f97c441
schema update
joshua-stauffer Feb 22, 2024
d88a277
Merge branch 'develop' into f/v1-175/asset_uses_partitioner_from_batc…
joshua-stauffer Feb 22, 2024
d0ff1ca
schema update
joshua-stauffer Feb 22, 2024
b924f97
refactor partitioner to batch request in conftest
joshua-stauffer Feb 23, 2024
7b7247f
update integration tests to use partitioner in batch request
joshua-stauffer Feb 23, 2024
3362915
remove deprecated partitioner
joshua-stauffer Feb 23, 2024
7611ecb
Merge branch 'develop' into f/v1-175/asset_uses_partitioner_from_batc…
joshua-stauffer Feb 23, 2024
ff5abdf
schema update
joshua-stauffer Feb 23, 2024
6137d92
update integration tests
joshua-stauffer Feb 23, 2024
1ae3497
update viral snippet
joshua-stauffer Feb 23, 2024
d91e36d
update tests related to fixture
joshua-stauffer Feb 23, 2024
6c964ad
update postgresql tests
joshua-stauffer Feb 23, 2024
e7d28e3
update snippets
joshua-stauffer Feb 23, 2024
175814b
public api
joshua-stauffer Feb 23, 2024
53d8c89
schema update
joshua-stauffer Feb 23, 2024
4d3b6ee
update api excludes
joshua-stauffer Feb 23, 2024
4eab300
fix doc snippet
joshua-stauffer Feb 23, 2024
4b0651e
remove assert
joshua-stauffer Feb 26, 2024
43888ed
remove comment
joshua-stauffer Feb 26, 2024
9d42d57
remove batch config option keys method from batch config
joshua-stauffer Feb 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import warnings

import great_expectations as gx
from great_expectations.core.partitioners import PartitionerColumnValue
from great_expectations.datasource.fluent import GxDatasourceWarning

sqlite_database_path = str(
Expand Down Expand Up @@ -76,11 +77,14 @@
}

# <snippet name="docs/docusaurus/docs/snippets/how_to_connect_to_a_sql_table.py add_vendor_id_splitter">
table_asset.add_partitioner_column_value("vendor_id")
partitioner = PartitionerColumnValue(column_name="vendor_id")
# </snippet>

# <snippet name="docs/docusaurus/docs/snippets/how_to_connect_to_a_sql_table.py build_vendor_id_batch_request">
my_batch_request = my_asset.build_batch_request({"vendor_id": 1})
my_batch_request = my_asset.build_batch_request(
options={"vendor_id": 1},
partitioner=partitioner,
)
# </snippet>

batches = my_asset.get_batch_list_from_batch_request(my_batch_request)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import tempfile

import great_expectations as gx
from great_expectations.core.partitioners import PartitionerYearAndMonth

temp_dir = tempfile.TemporaryDirectory()
full_path_to_project_directory = pathlib.Path(temp_dir.name).resolve()
Expand Down Expand Up @@ -38,10 +39,10 @@

# Python
# <snippet name="docs/docusaurus/docs/snippets/organize_batches_in_sqlite_datasource.py add_splitter_year_and_month">
my_table_asset.add_partitioner_year_and_month(column_name="pickup_datetime")
partitioner = PartitionerYearAndMonth(column_name="pickup_datetime")
# </snippet>

my_batch_request = my_table_asset.build_batch_request()
my_batch_request = my_table_asset.build_batch_request(partitioner=partitioner)
batches = my_table_asset.get_batch_list_from_batch_request(my_batch_request)

assert len(batches) == 12
Expand All @@ -51,11 +52,14 @@
my_asset = my_table_asset.add_sorters(["+year", "-month"])
# </snippet>

assert my_asset.batch_request_options == ("year", "month")
assert my_asset.get_batch_request_options_keys(partitioner=partitioner) == (
"year",
"month",
)

# Python
# <snippet name="docs/docusaurus/docs/snippets/organize_batches_in_sqlite_datasource.py my_batch_list">
my_batch_request = my_table_asset.build_batch_request()
my_batch_request = my_table_asset.build_batch_request(partitioner=partitioner)
batches = my_table_asset.get_batch_list_from_batch_request(my_batch_request)
# </snippet>

Expand Down
26 changes: 26 additions & 0 deletions docs/sphinx_api_docs_source/public_api_excludes.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,4 +828,30 @@
name="get_or_create_spark_session",
filepath=pathlib.Path("great_expectations/core/util.py"),
),
IncludeExcludeDefinition(
reason="This method does not need to be accessed by users, and will eventually be removed from docs.",
name="get_batch_request_options_keys",
filepath=pathlib.Path(
"great_expectations/datasource/fluent/file_path_data_asset.py"
),
),
IncludeExcludeDefinition(
reason="This method does not need to be accessed by users, and will eventually be removed from docs.",
name="get_batch_request_options_keys",
filepath=pathlib.Path(
"great_expectations/datasource/fluent/pandas_datasource.py"
),
),
IncludeExcludeDefinition(
reason="This method does not need to be accessed by users, and will eventually be removed from docs.",
name="get_batch_request_options_keys",
filepath=pathlib.Path(
"great_expectations/datasource/fluent/spark_datasource.py"
),
),
IncludeExcludeDefinition(
reason="This method does not need to be accessed by users, and will eventually be removed from docs.",
name="get_batch_request_options_keys",
filepath=pathlib.Path("great_expectations/datasource/fluent/sql_datasource.py"),
),
]
19 changes: 19 additions & 0 deletions great_expectations/core/partitioners.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,37 @@
from typing import List, Literal, Union

from great_expectations._docs_decorators import public_api
from great_expectations.compatibility import pydantic


@public_api
class PartitionerYear(pydantic.BaseModel):
column_name: str
method_name: Literal["partition_on_year"] = "partition_on_year"


@public_api
class PartitionerYearAndMonth(pydantic.BaseModel):
column_name: str
method_name: Literal["partition_on_year_and_month"] = "partition_on_year_and_month"


@public_api
class PartitionerYearAndMonthAndDay(pydantic.BaseModel):
column_name: str
method_name: Literal[
"partition_on_year_and_month_and_day"
] = "partition_on_year_and_month_and_day"


@public_api
class PartitionerDatetimePart(pydantic.BaseModel):
datetime_parts: List[str]
column_name: str
method_name: Literal["partition_on_date_parts"] = "partition_on_date_parts"


@public_api
class PartitionerDividedInteger(pydantic.BaseModel):
divisor: int
column_name: str
Expand All @@ -34,24 +40,36 @@ class PartitionerDividedInteger(pydantic.BaseModel):
] = "partition_on_divided_integer"


@public_api
class PartitionerModInteger(pydantic.BaseModel):
mod: int
column_name: str
method_name: Literal["partition_on_mod_integer"] = "partition_on_mod_integer"


@public_api
class PartitionerColumnValue(pydantic.BaseModel):
column_name: str
method_name: Literal["partition_on_column_value"] = "partition_on_column_value"


@public_api
class PartitionerMultiColumnValue(pydantic.BaseModel):
column_names: List[str]
method_name: Literal[
"partition_on_multi_column_values"
] = "partition_on_multi_column_values"


@public_api
class PartitionerConvertedDatetime(pydantic.BaseModel):
column_name: str
method_name: Literal[
"partition_on_converted_datetime"
] = "partition_on_converted_datetime"
date_format_string: str
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do any validation on this string?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current behavior is to not validate the string, so i think it's likely okay to not. I'm not 100% how we would validate it, either, since its correctness depends on the shape of the source data.



Partitioner = Union[
PartitionerColumnValue,
PartitionerMultiColumnValue,
Expand All @@ -61,4 +79,5 @@ class PartitionerMultiColumnValue(pydantic.BaseModel):
PartitionerYearAndMonth,
PartitionerYearAndMonthAndDay,
PartitionerDatetimePart,
PartitionerConvertedDatetime,
]
89 changes: 78 additions & 11 deletions great_expectations/datasource/fluent/file_path_data_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@
from great_expectations._docs_decorators import public_api
from great_expectations.compatibility import pydantic
from great_expectations.compatibility.typing_extensions import override
from great_expectations.core.partitioners import (
PartitionerColumnValue,
PartitionerDatetimePart,
PartitionerDividedInteger,
PartitionerModInteger,
PartitionerMultiColumnValue,
PartitionerYear,
PartitionerYearAndMonth,
PartitionerYearAndMonthAndDay,
)
from great_expectations.datasource.fluent.batch_request import (
BatchRequest,
BatchRequestOptions,
Expand Down Expand Up @@ -113,6 +123,20 @@ class _FilePathDataAsset(DataAsset):
_test_connection_error_message: str = pydantic.PrivateAttr(
"Could not connect to your asset"
)
_partitioner_implementation_map: dict[
type[Partitioner], type[SparkPartitioner]
] = pydantic.PrivateAttr(
default={
PartitionerYear: SparkPartitionerYear,
PartitionerYearAndMonth: SparkPartitionerYearAndMonth,
PartitionerYearAndMonthAndDay: SparkPartitionerYearAndMonthAndDay,
PartitionerColumnValue: SparkPartitionerColumnValue,
PartitionerDatetimePart: SparkPartitionerDatetimePart,
PartitionerDividedInteger: SparkPartitionerDividedInteger,
PartitionerModInteger: SparkPartitionerModInteger,
PartitionerMultiColumnValue: SparkPartitionerMultiColumnValue,
}
)

class Config:
"""
Expand Down Expand Up @@ -140,6 +164,18 @@ def __init__(self, **data):
)
self._all_group_names = self._regex_parser.get_all_group_names()

def get_partitioner_implementation(
self, abstract_partitioner: Partitioner
) -> SparkPartitioner:
PartitionerClass = self._partitioner_implementation_map.get(
type(abstract_partitioner)
)
if PartitionerClass is None:
raise ValueError(
f"Requested Partitioner `{abstract_partitioner.method_name}` is not implemented for this DataAsset. "
)
return PartitionerClass(**abstract_partitioner.dict())

@property
@override
def batch_request_options(
Expand Down Expand Up @@ -167,6 +203,18 @@ def batch_request_options(
+ partitioner_options
)

@override
def get_batch_request_options_keys(
self, partitioner: Optional[Partitioner]
) -> tuple[str, ...]:
option_keys: tuple[str, ...] = tuple(self._all_group_names) + (
FILE_PATH_BATCH_SPEC_KEY,
)
if partitioner:
spark_partitioner = self.get_partitioner_implementation(partitioner)
option_keys += tuple(spark_partitioner.param_names)
return option_keys

@public_api
@override
def build_batch_request(
Expand Down Expand Up @@ -206,8 +254,13 @@ def build_batch_request(
f"not a string: {value}"
)

if options is not None and not self._valid_batch_request_options(options):
allowed_keys = set(self.batch_request_options)
if options is not None and not self._batch_request_options_are_valid(
options=options,
partitioner=partitioner,
):
allowed_keys = set(
self.get_batch_request_options_keys(partitioner=partitioner)
)
actual_keys = set(options.keys())
raise gx_exceptions.InvalidBatchRequestError(
"Batch request options should only contain keys from the following set:\n"
Expand All @@ -233,9 +286,14 @@ def _validate_batch_request(self, batch_request: BatchRequest) -> None:
if not (
batch_request.datasource_name == self.datasource.name
and batch_request.data_asset_name == self.name
and self._valid_batch_request_options(batch_request.options)
and self._batch_request_options_are_valid(
options=batch_request.options, partitioner=batch_request.partitioner
)
):
options = {option: None for option in self.batch_request_options}
valid_options = self.get_batch_request_options_keys(
partitioner=batch_request.partitioner
)
options = {option: None for option in valid_options}
expect_batch_request_form = BatchRequest(
datasource_name=self.datasource.name,
data_asset_name=self.name,
Expand Down Expand Up @@ -317,11 +375,17 @@ def _get_batch_definition_list(
Returns:
List of batch definitions.
"""
if self.partitioner:
if batch_request.partitioner:
spark_partitioner = self.get_partitioner_implementation(
batch_request.partitioner
)
# Remove the partitioner kwargs from the batch_request to retrieve the batch and add them back later to the batch_spec.options
batch_request_options_counts = Counter(self.batch_request_options)
valid_options = self.get_batch_request_options_keys(
partitioner=batch_request.partitioner
)
batch_request_options_counts = Counter(valid_options)
batch_request_copy_without_partitioner_kwargs = copy.deepcopy(batch_request)
for param_name in self.partitioner.param_names:
for param_name in spark_partitioner.param_names:
# If the option appears twice (e.g. from asset regex and from partitioner) then don't remove.
if batch_request_options_counts[param_name] == 1:
batch_request_copy_without_partitioner_kwargs.options.pop(
Expand Down Expand Up @@ -366,12 +430,15 @@ def _batch_spec_options_from_batch_request(
),
}

if self.partitioner:
batch_spec_options["partitioner_method"] = self.partitioner.method_name
partitioner_kwargs = self.partitioner.partitioner_method_kwargs()
if batch_request.partitioner:
spark_partitioner = self.get_partitioner_implementation(
batch_request.partitioner
)
batch_spec_options["partitioner_method"] = spark_partitioner.method_name
partitioner_kwargs = spark_partitioner.partitioner_method_kwargs()
partitioner_kwargs[
"batch_identifiers"
] = self.partitioner.batch_request_options_to_batch_spec_kwarg_identifiers(
] = spark_partitioner.batch_request_options_to_batch_spec_kwarg_identifiers(
batch_request.options
)
batch_spec_options["partitioner_kwargs"] = partitioner_kwargs
Expand Down
14 changes: 12 additions & 2 deletions great_expectations/datasource/fluent/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ def batch_request_options(self) -> tuple[str, ...]:
"""One needs to implement "batch_request_options" on a DataAsset subclass."""
)

def get_batch_request_options_keys(
self, partitioner: Optional[Partitioner]
) -> tuple[str, ...]:
raise NotImplementedError(
"""One needs to implement "get_batch_request_options_keys" on a DataAsset subclass."""
)

def build_batch_request(
self,
options: Optional[BatchRequestOptions] = None,
Expand Down Expand Up @@ -353,8 +360,11 @@ def get_batch_config(self, batch_config_name: str) -> BatchConfig:
raise KeyError(f"Multiple keys for {batch_config_name} found")
return batch_configs[0]

def _valid_batch_request_options(self, options: BatchRequestOptions) -> bool:
return set(options.keys()).issubset(set(self.batch_request_options))
def _batch_request_options_are_valid(
self, options: BatchRequestOptions, partitioner: Optional[Partitioner]
) -> bool:
valid_options = self.get_batch_request_options_keys(partitioner=partitioner)
return set(options.keys()).issubset(set(valid_options))

def _get_batch_metadata_from_batch_request(
self, batch_request: BatchRequest
Expand Down
4 changes: 4 additions & 0 deletions great_expectations/datasource/fluent/pandas_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ def test_connection(self) -> None:
def batch_request_options(self) -> tuple[str, ...]:
return tuple()

@override
def get_batch_request_options_keys(self, partitioner):
return tuple()

@override
def get_batch_list_from_batch_request(
self, batch_request: BatchRequest
Expand Down