Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Implement LegacyBatchDefinition.batching_regex #9717

Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c1163c4
move file function into class
joshua-stauffer Apr 4, 2024
fe8839a
Merge branch 'develop' into f/v1-274/refactor_loose_functions_to_class
joshua-stauffer Apr 4, 2024
18aacc3
replace floating funcs with a top level override
joshua-stauffer Apr 4, 2024
9320f8c
remove unnecessary fork
joshua-stauffer Apr 4, 2024
349741e
Merge branch 'develop' into f/v1-274/refactor_loose_functions_to_class
joshua-stauffer Apr 4, 2024
54e5eb7
Add batching_regex to batch config and plugin to FilePathDataconnector
joshua-stauffer Apr 4, 2024
9fa22da
Merge branch 'develop' into f/v1-274/add_batching_regex_to_batch_request
joshua-stauffer Apr 4, 2024
530edf2
cleanup dead code and add test
joshua-stauffer Apr 4, 2024
5cb3ff7
improve test
joshua-stauffer Apr 4, 2024
a547fb2
Merge branch 'develop' into f/v1-274/add_batching_regex_to_batch_request
joshua-stauffer Apr 4, 2024
a6ed68f
update type file
joshua-stauffer Apr 4, 2024
73ea68c
update type to be 3.8 compatible
joshua-stauffer Apr 4, 2024
4fcbaad
update test to include batching_regex
joshua-stauffer Apr 5, 2024
e1be534
parametrize test
joshua-stauffer Apr 5, 2024
cfc395d
add batching_regex to LegacyBatchDefinition
joshua-stauffer Apr 5, 2024
39de135
update test
joshua-stauffer Apr 5, 2024
4a6db0e
update type
joshua-stauffer Apr 5, 2024
fcd1a89
update type of batching_regex
joshua-stauffer Apr 5, 2024
7fc8473
make re.Pattern serializable and test
joshua-stauffer Apr 5, 2024
1de0b4d
_generate_batch_spec_parameters_from_batch_definition uses batching_r…
joshua-stauffer Apr 5, 2024
ab7661f
Merge branch 'develop' into f/v1-274/add_batching_regex_to_legacy_bat…
joshua-stauffer Apr 5, 2024
dfe3d41
allow DataConnector to continue using init batching_regex
joshua-stauffer Apr 5, 2024
cc8d958
update test
joshua-stauffer Apr 5, 2024
8982b1e
fix another test
joshua-stauffer Apr 5, 2024
7cad8d9
fix test
joshua-stauffer Apr 5, 2024
cc43376
remove dead code
joshua-stauffer Apr 5, 2024
6c20d34
break into two tests; update
joshua-stauffer Apr 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions great_expectations/core/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datetime
import json
import logging
import re
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -109,6 +110,7 @@ def __init__( # noqa: PLR0913
data_asset_name: str,
batch_identifiers: IDDict,
batch_spec_passthrough: dict | None = None,
batching_regex: re.Pattern | None = None,
) -> None:
self._validate_batch_definition(
datasource_name=datasource_name,
Expand All @@ -124,6 +126,7 @@ def __init__( # noqa: PLR0913
self._data_asset_name = data_asset_name
self._batch_identifiers = batch_identifiers
self._batch_spec_passthrough = batch_spec_passthrough
self._batching_regex = batching_regex

@public_api
@override
Expand All @@ -141,6 +144,8 @@ def to_json_dict(self) -> dict[str, JSONValues]:
}
if self._batch_spec_passthrough:
fields_dict["batch_spec_passthrough"] = self._batch_spec_passthrough
if self._batching_regex:
fields_dict["batching_regex"] = self._batching_regex

return convert_to_json_serializable(data=fields_dict)

Expand Down Expand Up @@ -220,6 +225,10 @@ def batch_spec_passthrough(self, batch_spec_passthrough: dict | None) -> None:
def id(self) -> str:
return IDDict(self.to_json_dict()).to_id()

@property
def batching_regex(self) -> re.Pattern | None:
return self._batching_regex

def __eq__(self, other):
if not isinstance(other, self.__class__):
# Delegate comparison to the other instance's __eq__.
Expand Down
3 changes: 3 additions & 0 deletions great_expectations/core/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,9 @@ def convert_to_json_serializable( # noqa: C901, PLR0911, PLR0912
if isinstance(data, RenderedContent):
return data.to_json_dict()

if isinstance(data, re.Pattern):
return data.pattern

# Unable to serialize (unrecognized data type).
raise TypeError(f"{data!s} is of type {type(data).__name__} which cannot be serialized.") # noqa: TRY003

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,18 @@
Returns:
dict -- dictionary of "BatchSpec" properties
""" # noqa: E501
group_names: List[str] = self._regex_parser.get_all_group_names()
# todo: remove fallback self._batching_regex
batching_regex = batch_definition.batching_regex or self._batching_regex
if not batching_regex:
raise RuntimeError("BatchDefinition must contain a batching_regex.") # noqa: TRY003

Check warning on line 293 in great_expectations/datasource/fluent/data_asset/data_connector/file_path_data_connector.py

View check run for this annotation

Codecov / codecov/patch

great_expectations/datasource/fluent/data_asset/data_connector/file_path_data_connector.py#L293

Added line #L293 was not covered by tests
regex_parser = RegExParser(
regex_pattern=batching_regex,
unnamed_regex_group_prefix=self._unnamed_regex_group_prefix,
)
group_names: List[str] = regex_parser.get_all_group_names()
path: str = map_batch_definition_to_data_reference_string_using_regex(
batch_definition=batch_definition,
regex_pattern=self._batching_regex,
regex_pattern=batching_regex,
group_names=group_names,
)
if not path:
Expand Down Expand Up @@ -328,13 +336,9 @@
return batch_definitions

# Cache was empty so we need to calculate BatchDefinitions
regex_parser = RegExParser(
regex_pattern=batching_regex,
unnamed_regex_group_prefix=self._unnamed_regex_group_prefix,
)
for data_reference in self.get_data_references():
batch_definition = self._build_batch_definition(
data_reference=data_reference, regex_parser=regex_parser
data_reference=data_reference, batching_regex=batching_regex
)
if batch_definition:
# storing these as a list seems unnecessary; in this implementation
Expand All @@ -355,10 +359,10 @@
return batch_definitions

def _build_batch_definition(
self, data_reference: str, regex_parser: RegExParser
self, data_reference: str, batching_regex: re.Pattern
) -> LegacyBatchDefinition | None:
batch_identifiers = self._build_batch_identifiers(
data_reference=data_reference, regex_parser=regex_parser
data_reference=data_reference, batching_regex=batching_regex
)
if batch_identifiers is None:
return None
Expand All @@ -370,11 +374,16 @@
data_connector_name=_DATA_CONNECTOR_NAME,
data_asset_name=self._data_asset_name,
batch_identifiers=batch_identifiers,
batching_regex=batching_regex,
)

def _build_batch_identifiers(
self, data_reference: str, regex_parser: RegExParser
self, data_reference: str, batching_regex: re.Pattern
) -> Optional[IDDict]:
regex_parser = RegExParser(
regex_pattern=batching_regex,
unnamed_regex_group_prefix=self._unnamed_regex_group_prefix,
)
matches: Optional[re.Match] = regex_parser.get_matches(target=data_reference)
if matches is None:
return None
Expand Down
9 changes: 9 additions & 0 deletions tests/core/test_convert_to_json_serializable.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import re

import numpy as np
import pytest

Expand Down Expand Up @@ -54,3 +56,10 @@ def test_serialization_numpy_datetime():
datetime_to_test = "2022-12-08T12:56:23.423"
data = np.datetime64(datetime_to_test)
assert convert_to_json_serializable(data) == datetime_to_test


@pytest.mark.unit
def test_serialization_of_pattern():
pattern_to_test = r"data_(?P<year>\d{4})-(?P<month>\d{2}).csv"
data = re.compile(pattern_to_test)
assert convert_to_json_serializable(data) == pattern_to_test
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"alex_20200819_1300.csv",
],
)

batching_regex = re.compile(r"(?P<name>.+)_(?P<timestamp>.+)_(?P<price>.+)\.csv")
my_data_connector: DataConnector = FilesystemDataConnector(
datasource_name="my_file_path_datasource",
data_asset_name="my_filesystem_data_asset",
batching_regex=re.compile(r"(?P<name>.+)_(?P<timestamp>.+)_(?P<price>.+)\.csv"),
batching_regex=batching_regex,
base_directory=pathlib.Path(base_directory),
glob_directive="*.csv",
)
Expand All @@ -148,6 +148,9 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
)
)
)
processed_batching_regex = re.compile(
"(?P<path>(?P<name>.+)_(?P<timestamp>.+)_(?P<price>.+)\\.csv)"
)
expected: List[LegacyBatchDefinition] = [
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -161,6 +164,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1040",
}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -174,6 +178,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1000",
}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -187,6 +192,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1300",
}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -200,6 +206,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1500",
}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -213,6 +220,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1900",
}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -226,6 +234,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1567",
}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -239,6 +248,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1003",
}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -252,6 +262,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1009",
}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -265,6 +276,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1002",
}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -278,6 +290,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1001",
}
),
batching_regex=processed_batching_regex,
),
]
assert expected == unsorted_batch_definition_list
Expand Down Expand Up @@ -335,6 +348,7 @@ def test_return_only_unique_batch_definitions(tmp_path_factory):
]
assert my_data_connector.get_unmatched_data_reference_count() == 2

processed_batching_regex = re.compile("(?P<path>(?P<directory>.+)/(?P<filename>.+\\.csv))")
expected: List[LegacyBatchDefinition] = [
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -343,6 +357,7 @@ def test_return_only_unique_batch_definitions(tmp_path_factory):
batch_identifiers=IDDict(
{"path": "A/file_1.csv", "directory": "A", "filename": "file_1.csv"}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -351,6 +366,7 @@ def test_return_only_unique_batch_definitions(tmp_path_factory):
batch_identifiers=IDDict(
{"path": "A/file_2.csv", "directory": "A", "filename": "file_2.csv"}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -359,6 +375,7 @@ def test_return_only_unique_batch_definitions(tmp_path_factory):
batch_identifiers=IDDict(
{"path": "A/file_3.csv", "directory": "A", "filename": "file_3.csv"}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -367,6 +384,7 @@ def test_return_only_unique_batch_definitions(tmp_path_factory):
batch_identifiers=IDDict(
{"path": "B/file_1.csv", "directory": "B", "filename": "file_1.csv"}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -375,6 +393,7 @@ def test_return_only_unique_batch_definitions(tmp_path_factory):
batch_identifiers=IDDict(
{"path": "B/file_2.csv", "directory": "B", "filename": "file_2.csv"}
),
batching_regex=processed_batching_regex,
),
]

Expand Down