Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check call count #9954

Draft
wants to merge 18 commits into
base: 0.18.x
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -4766,30 +4766,46 @@ def _compile_evaluation_parameter_dependencies(self) -> None:
self._evaluation_parameter_dependencies = {}
# we have to iterate through all expectation suites because evaluation parameters
# can reference metric values from other suites
for key in self.expectations_store.list_keys():
try:
expectation_suite_dict: dict = cast(
dict, self.expectations_store.get(key)

if self.expectations_store.cloud_mode:
# use get_all to prevent round trips to GX Cloud.
for exp_suite_dict in self.expectations_store.get_all():
if not exp_suite_dict:
continue
expectation_suite = ExpectationSuite(
**exp_suite_dict, data_context=self
)
except ValidationError as e:
# if a suite that isn't associated with the checkpoint compiling eval params is misconfigured
# we should ignore that instead of breaking all checkpoints in the entire context
logger.info(
f"Suite with identifier {key} was not considered when compiling evaluation parameter dependencies "
f"because it failed to load with message: {e}"

dependencies: dict = (
expectation_suite.get_evaluation_parameter_dependencies()
)
continue
if not expectation_suite_dict:
continue
expectation_suite = ExpectationSuite(
**expectation_suite_dict, data_context=self
)
if len(dependencies) > 0:
nested_update(self._evaluation_parameter_dependencies, dependencies)

dependencies: dict = (
expectation_suite.get_evaluation_parameter_dependencies()
)
if len(dependencies) > 0:
nested_update(self._evaluation_parameter_dependencies, dependencies)
else:
for key in self.expectations_store.list_keys():
try:
expectation_suite_dict: dict = cast(
dict, self.expectations_store.get(key)
)
except ValidationError as e:
# if a suite that isn't associated with the checkpoint compiling eval params is misconfigured
# we should ignore that instead of breaking all checkpoints in the entire context
logger.info(
f"Suite with identifier {key} was not considered when compiling evaluation parameter dependencies "
f"because it failed to load with message: {e}"
)
continue

if not expectation_suite_dict:
continue
expectation_suite = ExpectationSuite(
**expectation_suite_dict, data_context=self
)

dependencies = expectation_suite.get_evaluation_parameter_dependencies()
if len(dependencies) > 0:
nested_update(self._evaluation_parameter_dependencies, dependencies)

self._evaluation_parameter_dependencies_compiled = True

Expand Down
1 change: 1 addition & 0 deletions great_expectations/data_context/data_context_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class DataContextVariableSchema(str, enum.Enum):
)
CONFIG_VERSION = "config_version"
DATASOURCES = "datasources"
FLUENT_DATASOURCES = "fluent_datasources"
EXPECTATIONS_STORE_NAME = "expectations_store_name"
VALIDATIONS_STORE_NAME = "validations_store_name"
EVALUATION_PARAMETER_STORE_NAME = "evaluation_parameter_store_name"
Expand Down
25 changes: 24 additions & 1 deletion great_expectations/data_context/store/database_store_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,30 @@ def _get(self, key):

@override
def _get_all(self) -> list[Any]:
raise NotImplementedError
keys = self.list_keys()
sel = (
sa.select(sa.column("value"))
.select_from(self._table)
.where(
sa.or_(
sa.and_(
*(
getattr(self._table.columns, key_col) == val
for key_col, val in zip(self.key_columns, key)
)
)
for key in keys
)
)
)

try:
with self.engine.begin() as connection:
rows = connection.execute(sel).fetchall()
return [r[0] for r in rows]
except (IndexError, SQLAlchemyError) as e:
logger.debug(f"Error fetching value: {e!s}")
raise gx_exceptions.StoreError("Unable to fetch values for keys")

@override
def _set(self, key, value, allow_update=True, **kwargs) -> None:
Expand Down
37 changes: 36 additions & 1 deletion great_expectations/data_context/store/expectations_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import random
import uuid
from typing import Dict
from typing import TYPE_CHECKING, Dict

import great_expectations.exceptions as gx_exceptions
from great_expectations.compatibility.typing_extensions import override
Expand All @@ -24,6 +24,19 @@
verify_dynamic_loading_support,
)

if TYPE_CHECKING:
from typing import Literal

from typing_extensions import TypedDict

class DataPayload(TypedDict):
id: str
attributes: dict
type: Literal["expectation_suite"]

class CloudResponsePayloadTD(TypedDict):
data: DataPayload | list[DataPayload]


class ExpectationsStore(Store):
"""
Expand Down Expand Up @@ -300,3 +313,25 @@ def self_check(self, pretty_print): # noqa: PLR0912
print()

return return_obj

@override
@staticmethod
def gx_cloud_response_json_to_object_collection(
response_json: CloudResponsePayloadTD, # type: ignore[override]
) -> list[dict]:
"""
This method takes full json response from GX cloud and outputs a list of dicts appropriate for
deserialization into a collection of GX objects
"""
data = response_json["data"]
if not isinstance(data, list):
raise TypeError(
"GX Cloud did not return a list of Expectation Suites when expected"
)

return [ExpectationsStore._convert_raw_json_to_object_dict(d) for d in data]

@staticmethod
def _convert_raw_json_to_object_dict(data: DataPayload) -> dict:
data["attributes"]["suite"]["ge_cloud_id"] = data["id"]
return data["attributes"]["suite"]
12 changes: 10 additions & 2 deletions great_expectations/data_context/store/inline_store_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
)
from great_expectations.data_context.store.store_backend import StoreBackend
from great_expectations.data_context.types.base import DataContextConfig
from great_expectations.exceptions.exceptions import StoreBackendError
from great_expectations.exceptions.exceptions import (
StoreBackendError,
StoreBackendUnsupportedResourceTypeError,
)
from great_expectations.util import filter_properties_dict

if TYPE_CHECKING:
Expand Down Expand Up @@ -98,7 +101,12 @@ def _get(self, key: tuple[str, ...]) -> Any:

@override
def _get_all(self) -> list[Any]:
raise NotImplementedError
project_config = self._data_context.config
variable_config = project_config.get(self._resource_type)
if isinstance(variable_config, dict):
return list(variable_config.values())
else:
raise StoreBackendUnsupportedResourceTypeError(self._resource_type.value)

@override
def _set(self, key: tuple[str, ...], value: Any, **kwargs: dict) -> None:
Expand Down
54 changes: 40 additions & 14 deletions great_expectations/data_context/store/tuple_store_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,10 @@ def _get(self, key):

@override
def _get_all(self) -> list[Any]:
raise NotImplementedError
keys = [
key for key in self.list_keys() if key != StoreBackend.STORE_BACKEND_ID_KEY
]
return [self._get(key) for key in keys]

def _set(self, key, value, **kwargs):
if not isinstance(key, tuple):
Expand Down Expand Up @@ -561,13 +564,29 @@ def _build_s3_object_key(self, key):
return s3_object_key

def _get(self, key):
client = self._create_client()
s3_object_key = self._build_s3_object_key(key)
return self._get_by_s3_object_key(client, s3_object_key)

s3 = self._create_client()
@override
def _get_all(self) -> list[Any]:
"""Get all objects from the store.
NOTE: This is non-performant because we download each object separately.
See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/bucket/objects.html#objects
for the docs.
"""
client = self._create_client()
keys = self.list_keys()
keys = [k for k in keys if k != StoreBackend.STORE_BACKEND_ID_KEY]
s3_object_keys = [self._build_s3_object_key(key) for key in keys]
return [self._get_by_s3_object_key(client, key) for key in s3_object_keys]

def _get_by_s3_object_key(self, s3_client, s3_object_key):
try:
s3_response_object = s3.get_object(Bucket=self.bucket, Key=s3_object_key)
except (s3.exceptions.NoSuchKey, s3.exceptions.NoSuchBucket):
s3_response_object = s3_client.get_object(
Bucket=self.bucket, Key=s3_object_key
)
except (s3_client.exceptions.NoSuchKey, s3_client.exceptions.NoSuchBucket):
raise InvalidKeyError(
f"Unable to retrieve object from TupleS3StoreBackend with the following Key: {s3_object_key!s}"
)
Expand All @@ -578,10 +597,6 @@ def _get(self, key):
.decode(s3_response_object.get("ContentEncoding", "utf-8"))
)

@override
def _get_all(self) -> list[Any]:
raise NotImplementedError

def _set(
self,
key,
Expand Down Expand Up @@ -852,12 +867,26 @@ def _build_gcs_object_key(self, key):
return gcs_object_key

def _get(self, key):
gcs_object_key = self._build_gcs_object_key(key)
from great_expectations.compatibility import google

gcs = google.storage.Client(project=self.project)
bucket = gcs.bucket(self.bucket)
return self._get_by_gcs_object_key(bucket, key)

@override
def _get_all(self) -> list[Any]:
from great_expectations.compatibility import google

gcs = google.storage.Client(project=self.project)
bucket = gcs.bucket(self.bucket)

keys = self.list_keys()
keys = [k for k in keys if k != StoreBackend.STORE_BACKEND_ID_KEY]

return [self._get_by_gcs_object_key(bucket, key) for key in keys]

def _get_by_gcs_object_key(self, bucket, key):
gcs_object_key = self._build_gcs_object_key(key)
gcs_response_object = bucket.get_blob(gcs_object_key)
if not gcs_response_object:
raise InvalidKeyError(
Expand All @@ -866,10 +895,6 @@ def _get(self, key):
else:
return gcs_response_object.download_as_bytes().decode("utf-8")

@override
def _get_all(self) -> list[Any]:
raise NotImplementedError

def _set(
self,
key,
Expand Down Expand Up @@ -1095,7 +1120,8 @@ def _get(self, key):

@override
def _get_all(self) -> list[Any]:
raise NotImplementedError
keys = self.list_keys()
return [self._get(key) for key in keys]

def _set(self, key, value, content_encoding="utf-8", **kwargs):
from great_expectations.compatibility.azure import ContentSettings
Expand Down
2 changes: 1 addition & 1 deletion great_expectations/datasource/fluent/fluent_base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def dict( # noqa: PLR0913
f" {class_name} may be missing a context."
)
else:
logger.info(
logger.debug(
f"{class_name}.dict() - missing `config_provider`, skipping config substitution"
)

Expand Down
5 changes: 5 additions & 0 deletions great_expectations/exceptions/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ class GitIgnoreScaffoldingError(GreatExpectationsError):
pass


class StoreBackendUnsupportedResourceTypeError(StoreBackendError):
def __init__(self, resource_type: str) -> None:
super().__init__(f"Unsupported resource type: {resource_type}")


class StoreBackendTransientError(StoreBackendError):
"""The result of a timeout or other networking issues"""

Expand Down
41 changes: 41 additions & 0 deletions tests/data_context/store/test_database_store_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,47 @@ def test_database_store_backend_schema_spec(caplog, sa, test_backends):
connection.execute(sa.text(f"DROP TABLE {store_backend._table};"))


def test_database_store_backend_get_all(caplog, sa, test_backends):
if "postgresql" not in test_backends:
pytest.skip("test_database_store_backend_get_url_for_key requires postgresql")

store_backend = DatabaseStoreBackend(
credentials={
"drivername": "postgresql",
"username": "postgres",
"password": "",
"host": os.getenv("GE_TEST_LOCAL_DB_HOSTNAME", "localhost"),
"port": "5432",
"schema": "special",
"database": "test_ci",
},
table_name="test_database_store_backend_url_key",
key_columns=["k1", "k2"],
)

FOO = "foo"
BAR = "bar"
store_backend.set(
(
"1",
"2",
),
FOO,
)
store_backend.set(
(
"aaa",
"bbb",
),
BAR,
)
assert store_backend.get_all() == [FOO, BAR]

# clean up values
with store_backend.engine.begin() as connection:
connection.execute(sa.text(f"DROP TABLE {store_backend._table};"))


def test_database_store_backend_get_url_for_key(caplog, sa, test_backends):
if "postgresql" not in test_backends:
pytest.skip("test_database_store_backend_get_url_for_key requires postgresql")
Expand Down
Loading
Loading