Skip to content

Commit

Permalink
[MAINTENANCE] Remove fluent partitioner methods from DataAssets (#9517)
Browse files Browse the repository at this point in the history
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
joshua-stauffer and pre-commit-ci[bot] committed Feb 28, 2024
1 parent 959fd51 commit c882919
Show file tree
Hide file tree
Showing 179 changed files with 195 additions and 39,137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
my_asset = my_asset.add_sorters(["+year", "-month"])
# </snippet>

assert my_asset.batch_request_options == ("year", "month", "path")
assert my_asset.get_batch_request_options_keys() == ("year", "month", "path")

# Python
# <snippet name="docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/data_assets/organize_batches_in_pandas_filesystem_datasource.py my_batch_list">
Expand Down
2 changes: 1 addition & 1 deletion docs/docusaurus/docs/snippets/batch_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
assert batch_request.options == {"year": "2019", "month": "02"}

# <snippet name="docs/docusaurus/docs/snippets/batch_request options">
options = asset.batch_request_options
options = asset.get_batch_request_options_keys()
print(options)
# </snippet>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@

# Python
# <snippet name="docs/docusaurus/docs/snippets/get_existing_data_asset_from_existing_datasource_pandas_filesystem_example.py my_batch_request_options">
print(my_asset.batch_request_options)
print(my_asset.get_batch_request_options_keys())
# </snippet>

assert my_asset.batch_request_options == ("year", "month", "path")
assert my_asset.get_batch_request_options_keys() == ("year", "month", "path")

# Python
# <snippet name="docs/docusaurus/docs/snippets/get_existing_data_asset_from_existing_datasource_pandas_filesystem_example.py my_batch_request">
Expand Down
2 changes: 1 addition & 1 deletion great_expectations/datasource/fluent/batch_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class BatchRequest(pydantic.BaseModel):
data_asset_name: The name of the Data Asset used to connect to the data.
options: A dict that can be used to filter the batch groups associated with the Data Asset.
The dict structure depends on the asset type. The available keys for dict can be obtained by
calling DataAsset.batch_request_options.
calling DataAsset.get_batch_request_options_keys(...).
batch_slice: A python slice that can be used to filter the sorted batches by index.
e.g. `batch_slice = "[-5:]"` will request only the last 5 batches after the options filter is applied.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def _get_batch_definition_list(
Returns:
List of batch definitions, in the case of a _DirectoryDataAssetMixin the list contains a single item.
"""
if self.partitioner:
if batch_request.partitioner:
# Currently non-sql asset partitioners do not introspect the datasource for available
# batches and only return a single batch based on specified batch_identifiers.
batch_identifiers = batch_request.options
Expand Down
178 changes: 3 additions & 175 deletions great_expectations/datasource/fluent/file_path_data_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@
)

if TYPE_CHECKING:
from typing_extensions import Self

from great_expectations.core.batch import BatchDefinition, BatchMarkers
from great_expectations.core.id_dict import BatchSpec
from great_expectations.core.partitioners import Partitioner
Expand Down Expand Up @@ -106,7 +104,6 @@ class _FilePathDataAsset(DataAsset):
default_factory=dict,
description="Optional filesystem specific advanced parameters for connecting to data assets",
)
partitioner: Optional[SparkPartitioner] = None

_unnamed_regex_param_prefix: str = pydantic.PrivateAttr(
default="batch_request_param_"
Expand Down Expand Up @@ -176,36 +173,10 @@ def get_partitioner_implementation(
)
return PartitionerClass(**abstract_partitioner.dict())

@property
@override
def batch_request_options(
self,
) -> tuple[str, ...]:
"""The potential keys for BatchRequestOptions.
Example:
```python
>>> print(asset.batch_request_options)
("day", "month", "year", "path")
>>> options = {"year": "2023"}
>>> batch_request = asset.build_batch_request(options=options)
```
Returns:
A tuple of keys that can be used in a BatchRequestOptions dictionary.
"""
partitioner_options: tuple[str, ...] = tuple()
if self.partitioner:
partitioner_options = tuple(self.partitioner.param_names)
return (
tuple(self._all_group_names)
+ (FILE_PATH_BATCH_SPEC_KEY,)
+ partitioner_options
)

@override
def get_batch_request_options_keys(
self, partitioner: Optional[Partitioner]
self,
partitioner: Optional[Partitioner] = None,
) -> tuple[str, ...]:
option_keys: tuple[str, ...] = tuple(self._all_group_names) + (
FILE_PATH_BATCH_SPEC_KEY,
Expand All @@ -228,7 +199,7 @@ def build_batch_request(
Args:
options: A dict that can be used to filter the batch groups returned from the asset.
The dict structure depends on the asset type. The available keys for dict can be obtained by
calling batch_request_options.
calling get_batch_request_options_keys(...).
batch_slice: A python slice that can be used to limit the sorted batches by index.
e.g. `batch_slice = "[-5:]"` will request only the last 5 batches after the options filter is applied.
partitioner: A Partitioner used to narrow the data returned from the asset.
Expand Down Expand Up @@ -478,146 +449,3 @@ def _get_reader_options_include(self) -> set[str]:
"""One needs to explicitly provide set(str)-valued reader options for "pydantic.BaseModel.dict()" method \
to use as its "include" directive for File-Path style DataAsset processing."""
)

def _add_partitioner(self: Self, partitioner: SparkPartitioner) -> Self:
self.partitioner = partitioner
return self

@public_api
def add_partitioner_year(
self: Self,
column_name: str,
) -> Self:
"""Associates a year partitioner with this data asset.
Args:
column_name: A column name of the date column where year will be parsed out.
Returns:
This asset so we can use this method fluently.
"""
return self._add_partitioner(
SparkPartitionerYear(
method_name="partition_on_year", column_name=column_name
)
)

@public_api
def add_partitioner_year_and_month(
self: Self,
column_name: str,
) -> Self:
"""Associates a year, month partitioner with this asset.
Args:
column_name: A column name of the date column where year and month will be parsed out.
Returns:
This asset so we can use this method fluently.
"""
return self._add_partitioner(
SparkPartitionerYearAndMonth(
method_name="partition_on_year_and_month", column_name=column_name
)
)

@public_api
def add_partitioner_year_and_month_and_day(
self: Self,
column_name: str,
) -> Self:
"""Associates a year, month, day partitioner with this asset.
Args:
column_name: A column name of the date column where year and month will be parsed out.
Returns:
This asset so we can use this method fluently.
"""
return self._add_partitioner(
SparkPartitionerYearAndMonthAndDay(
method_name="partition_on_year_and_month_and_day",
column_name=column_name,
)
)

@public_api
def add_partitioner_datetime_part(
self: Self, column_name: str, datetime_parts: List[str]
) -> Self:
"""Associates a datetime part partitioner with this asset.
Args:
column_name: Name of the date column where parts will be parsed out.
datetime_parts: A list of datetime parts to partition on, specified as DatePart objects or as their string equivalent e.g. "year", "month", "week", "day", "hour", "minute", or "second"
Returns:
This asset so we can use this method fluently.
"""
return self._add_partitioner(
SparkPartitionerDatetimePart(
method_name="partition_on_date_parts",
column_name=column_name,
datetime_parts=datetime_parts,
)
)

@public_api
def add_partitioner_column_value(self: Self, column_name: str) -> Self:
"""Associates a column value partitioner with this asset.
Args:
column_name: A column name of the column to partition on.
Returns:
This asset so we can use this method fluently.
"""
return self._add_partitioner(
SparkPartitionerColumnValue(
method_name="partition_on_column_value",
column_name=column_name,
)
)

@public_api
def add_partitioner_divided_integer(
self: Self, column_name: str, divisor: int
) -> Self:
"""Associates a divided integer partitioner with this asset.
Args:
column_name: A column name of the column to partition on.
divisor: The divisor to use when partitioning.
Returns:
This asset so we can use this method fluently.
"""
return self._add_partitioner(
SparkPartitionerDividedInteger(
method_name="partition_on_divided_integer",
column_name=column_name,
divisor=divisor,
)
)

@public_api
def add_partitioner_mod_integer(self: Self, column_name: str, mod: int) -> Self:
"""Associates a mod integer partitioner with this asset.
Args:
column_name: A column name of the column to partition on.
mod: The mod to use when partitioning.
Returns:
This asset so we can use this method fluently.
"""
return self._add_partitioner(
SparkPartitionerModInteger(
method_name="partition_on_mod_integer",
column_name=column_name,
mod=mod,
)
)

@public_api
def add_partitioner_multi_column_values(
self: Self, column_names: list[str]
) -> Self:
"""Associates a multi-column value partitioner with this asset.
Args:
column_names: A list of column names to partition on.
Returns:
This asset so we can use this method fluently.
"""
return self._add_partitioner(
SparkPartitionerMultiColumnValue(
column_names=column_names,
method_name="partition_on_multi_column_values",
)
)
26 changes: 4 additions & 22 deletions great_expectations/datasource/fluent/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,28 +208,8 @@ def test_connection(self) -> None:
"""One needs to implement "test_connection" on a DataAsset subclass."""
)

# Abstract Methods
@property
def batch_request_options(self) -> tuple[str, ...]:
"""The potential keys for BatchRequestOptions.
Example:
```python
>>> print(asset.batch_request_options)
("day", "month", "year")
>>> options = {"year": "2023"}
>>> batch_request = asset.build_batch_request(options=options)
```
Returns:
A tuple of keys that can be used in a BatchRequestOptions dictionary.
"""
raise NotImplementedError(
"""One needs to implement "batch_request_options" on a DataAsset subclass."""
)

def get_batch_request_options_keys(
self, partitioner: Optional[Partitioner]
self, partitioner: Optional[Partitioner] = None
) -> tuple[str, ...]:
raise NotImplementedError(
"""One needs to implement "get_batch_request_options_keys" on a DataAsset subclass."""
Expand All @@ -246,7 +226,7 @@ def build_batch_request(
Args:
options: A dict that can be used to filter the batch groups returned from the asset.
The dict structure depends on the asset type. The available keys for dict can be obtained by
calling batch_request_options.
calling get_batch_request_options_keys(...).
batch_slice: A python slice that can be used to limit the sorted batches by index.
e.g. `batch_slice = "[-5:]"` will request only the last 5 batches after the options filter is applied.
partitioner: A Partitioner used to narrow the data returned from the asset.
Expand Down Expand Up @@ -304,11 +284,13 @@ def add_batch_config(
batch_config = BatchConfig(name=name, partitioner=partitioner)
batch_config.set_data_asset(self)
self.batch_configs.append(batch_config)
self.update_batch_config_field_set()
if self.datasource.data_context:
try:
batch_config = self.datasource.add_batch_config(batch_config)
except Exception:
self.batch_configs.remove(batch_config)
self.update_batch_config_field_set()
raise
self.update_batch_config_field_set()
return batch_config
Expand Down
10 changes: 4 additions & 6 deletions great_expectations/datasource/fluent/pandas_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
Optional,
Sequence,
Set,
Tuple,
Type,
TypeVar,
Union,
Expand Down Expand Up @@ -114,13 +115,10 @@ def _get_reader_method(self) -> str:
def test_connection(self) -> None:
...

@property
@override
def batch_request_options(self) -> tuple[str, ...]:
return tuple()

@override
def get_batch_request_options_keys(self, partitioner):
def get_batch_request_options_keys(
self, partitioner: Optional[Partitioner] = None
) -> Tuple[str, ...]:
return tuple()

@override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"title": "BatchRequest",
"description": "--Public API--A BatchRequest is the way to specify which data Great Expectations will validate.\n\nA Batch Request is provided to a Data Asset in order to create one or more Batches.\n\nArgs:\n datasource_name: The name of the Datasource used to connect to the data.\n data_asset_name: The name of the Data Asset used to connect to the data.\n options: A dict that can be used to filter the batch groups associated with the Data Asset.\n The dict structure depends on the asset type. The available keys for dict can be obtained by\n calling DataAsset.batch_request_options.\n batch_slice: A python slice that can be used to filter the sorted batches by index.\n e.g. `batch_slice = \"[-5:]\"` will request only the last 5 batches after the options filter is applied.\n\nReturns:\n BatchRequest",
"description": "--Public API--A BatchRequest is the way to specify which data Great Expectations will validate.\n\nA Batch Request is provided to a Data Asset in order to create one or more Batches.\n\nArgs:\n datasource_name: The name of the Datasource used to connect to the data.\n data_asset_name: The name of the Data Asset used to connect to the data.\n options: A dict that can be used to filter the batch groups associated with the Data Asset.\n The dict structure depends on the asset type. The available keys for dict can be obtained by\n calling DataAsset.get_batch_request_options_keys(...).\n batch_slice: A python slice that can be used to filter the sorted batches by index.\n e.g. `batch_slice = \"[-5:]\"` will request only the last 5 batches after the options filter is applied.\n\nReturns:\n BatchRequest",
"type": "object",
"properties": {
"datasource_name": {
Expand Down

0 comments on commit c882919

Please sign in to comment.