Skip to content

Commit

Permalink
[FEATURE] Implement LegacyBatchDefinition.batching_regex (#9717)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-stauffer committed Apr 5, 2024
1 parent 8832d77 commit b3d4769
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 241 deletions.
9 changes: 9 additions & 0 deletions great_expectations/core/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datetime
import json
import logging
import re
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -109,6 +110,7 @@ def __init__( # noqa: PLR0913
data_asset_name: str,
batch_identifiers: IDDict,
batch_spec_passthrough: dict | None = None,
batching_regex: re.Pattern | None = None,
) -> None:
self._validate_batch_definition(
datasource_name=datasource_name,
Expand All @@ -124,6 +126,7 @@ def __init__( # noqa: PLR0913
self._data_asset_name = data_asset_name
self._batch_identifiers = batch_identifiers
self._batch_spec_passthrough = batch_spec_passthrough
self._batching_regex = batching_regex

@public_api
@override
Expand All @@ -141,6 +144,8 @@ def to_json_dict(self) -> dict[str, JSONValues]:
}
if self._batch_spec_passthrough:
fields_dict["batch_spec_passthrough"] = self._batch_spec_passthrough
if self._batching_regex:
fields_dict["batching_regex"] = self._batching_regex

return convert_to_json_serializable(data=fields_dict)

Expand Down Expand Up @@ -220,6 +225,10 @@ def batch_spec_passthrough(self, batch_spec_passthrough: dict | None) -> None:
def id(self) -> str:
return IDDict(self.to_json_dict()).to_id()

@property
def batching_regex(self) -> re.Pattern | None:
return self._batching_regex

def __eq__(self, other):
if not isinstance(other, self.__class__):
# Delegate comparison to the other instance's __eq__.
Expand Down
3 changes: 3 additions & 0 deletions great_expectations/core/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,9 @@ def convert_to_json_serializable( # noqa: C901, PLR0911, PLR0912
if isinstance(data, RenderedContent):
return data.to_json_dict()

if isinstance(data, re.Pattern):
return data.pattern

# Unable to serialize (unrecognized data type).
raise TypeError(f"{data!s} is of type {type(data).__name__} which cannot be serialized.") # noqa: TRY003

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,18 @@ def _generate_batch_spec_parameters_from_batch_definition(
Returns:
dict -- dictionary of "BatchSpec" properties
""" # noqa: E501
group_names: List[str] = self._regex_parser.get_all_group_names()
# todo: remove fallback self._batching_regex
batching_regex = batch_definition.batching_regex or self._batching_regex
if not batching_regex:
raise RuntimeError("BatchDefinition must contain a batching_regex.") # noqa: TRY003
regex_parser = RegExParser(
regex_pattern=batching_regex,
unnamed_regex_group_prefix=self._unnamed_regex_group_prefix,
)
group_names: List[str] = regex_parser.get_all_group_names()
path: str = map_batch_definition_to_data_reference_string_using_regex(
batch_definition=batch_definition,
regex_pattern=self._batching_regex,
regex_pattern=batching_regex,
group_names=group_names,
)
if not path:
Expand Down Expand Up @@ -328,13 +336,9 @@ def _get_data_references_cache(
return batch_definitions

# Cache was empty so we need to calculate BatchDefinitions
regex_parser = RegExParser(
regex_pattern=batching_regex,
unnamed_regex_group_prefix=self._unnamed_regex_group_prefix,
)
for data_reference in self.get_data_references():
batch_definition = self._build_batch_definition(
data_reference=data_reference, regex_parser=regex_parser
data_reference=data_reference, batching_regex=batching_regex
)
if batch_definition:
# storing these as a list seems unnecessary; in this implementation
Expand All @@ -355,10 +359,10 @@ def _get_batch_definitions(self, batching_regex: re.Pattern) -> List[LegacyBatch
return batch_definitions

def _build_batch_definition(
self, data_reference: str, regex_parser: RegExParser
self, data_reference: str, batching_regex: re.Pattern
) -> LegacyBatchDefinition | None:
batch_identifiers = self._build_batch_identifiers(
data_reference=data_reference, regex_parser=regex_parser
data_reference=data_reference, batching_regex=batching_regex
)
if batch_identifiers is None:
return None
Expand All @@ -370,11 +374,16 @@ def _build_batch_definition(
data_connector_name=_DATA_CONNECTOR_NAME,
data_asset_name=self._data_asset_name,
batch_identifiers=batch_identifiers,
batching_regex=batching_regex,
)

def _build_batch_identifiers(
self, data_reference: str, regex_parser: RegExParser
self, data_reference: str, batching_regex: re.Pattern
) -> Optional[IDDict]:
regex_parser = RegExParser(
regex_pattern=batching_regex,
unnamed_regex_group_prefix=self._unnamed_regex_group_prefix,
)
matches: Optional[re.Match] = regex_parser.get_matches(target=data_reference)
if matches is None:
return None
Expand Down
9 changes: 9 additions & 0 deletions tests/core/test_convert_to_json_serializable.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import re

import numpy as np
import pytest

Expand Down Expand Up @@ -54,3 +56,10 @@ def test_serialization_numpy_datetime():
datetime_to_test = "2022-12-08T12:56:23.423"
data = np.datetime64(datetime_to_test)
assert convert_to_json_serializable(data) == datetime_to_test


@pytest.mark.unit
def test_serialization_of_pattern():
pattern_to_test = r"data_(?P<year>\d{4})-(?P<month>\d{2}).csv"
data = re.compile(pattern_to_test)
assert convert_to_json_serializable(data) == pattern_to_test
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"alex_20200819_1300.csv",
],
)

batching_regex = re.compile(r"(?P<name>.+)_(?P<timestamp>.+)_(?P<price>.+)\.csv")
my_data_connector: DataConnector = FilesystemDataConnector(
datasource_name="my_file_path_datasource",
data_asset_name="my_filesystem_data_asset",
batching_regex=re.compile(r"(?P<name>.+)_(?P<timestamp>.+)_(?P<price>.+)\.csv"),
batching_regex=batching_regex,
base_directory=pathlib.Path(base_directory),
glob_directive="*.csv",
)
Expand All @@ -148,6 +148,9 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
)
)
)
processed_batching_regex = re.compile(
"(?P<path>(?P<name>.+)_(?P<timestamp>.+)_(?P<price>.+)\\.csv)"
)
expected: List[LegacyBatchDefinition] = [
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -161,6 +164,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1040",
}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -174,6 +178,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1000",
}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -187,6 +192,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1300",
}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -200,6 +206,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1500",
}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -213,6 +220,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1900",
}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -226,6 +234,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1567",
}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -239,6 +248,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1003",
}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -252,6 +262,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1009",
}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -265,6 +276,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1002",
}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -278,6 +290,7 @@ def test_return_all_batch_definitions_unsorted(tmp_path_factory):
"price": "1001",
}
),
batching_regex=processed_batching_regex,
),
]
assert expected == unsorted_batch_definition_list
Expand Down Expand Up @@ -335,6 +348,7 @@ def test_return_only_unique_batch_definitions(tmp_path_factory):
]
assert my_data_connector.get_unmatched_data_reference_count() == 2

processed_batching_regex = re.compile("(?P<path>(?P<directory>.+)/(?P<filename>.+\\.csv))")
expected: List[LegacyBatchDefinition] = [
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -343,6 +357,7 @@ def test_return_only_unique_batch_definitions(tmp_path_factory):
batch_identifiers=IDDict(
{"path": "A/file_1.csv", "directory": "A", "filename": "file_1.csv"}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -351,6 +366,7 @@ def test_return_only_unique_batch_definitions(tmp_path_factory):
batch_identifiers=IDDict(
{"path": "A/file_2.csv", "directory": "A", "filename": "file_2.csv"}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -359,6 +375,7 @@ def test_return_only_unique_batch_definitions(tmp_path_factory):
batch_identifiers=IDDict(
{"path": "A/file_3.csv", "directory": "A", "filename": "file_3.csv"}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -367,6 +384,7 @@ def test_return_only_unique_batch_definitions(tmp_path_factory):
batch_identifiers=IDDict(
{"path": "B/file_1.csv", "directory": "B", "filename": "file_1.csv"}
),
batching_regex=processed_batching_regex,
),
LegacyBatchDefinition(
datasource_name="my_file_path_datasource",
Expand All @@ -375,6 +393,7 @@ def test_return_only_unique_batch_definitions(tmp_path_factory):
batch_identifiers=IDDict(
{"path": "B/file_2.csv", "directory": "B", "filename": "file_2.csv"}
),
batching_regex=processed_batching_regex,
),
]

Expand Down

0 comments on commit b3d4769

Please sign in to comment.