Skip to content

Commit

Permalink
Merge branch 'develop' into m/_/ignore-revs
Browse files Browse the repository at this point in the history
  • Loading branch information
Kilo59 committed Mar 6, 2024
2 parents 62f4d7b + f5d4500 commit 2016db7
Show file tree
Hide file tree
Showing 8 changed files with 675 additions and 7 deletions.
5 changes: 5 additions & 0 deletions docs/sphinx_api_docs_source/public_api_missing_threshold.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
"File: great_expectations/datasource/fluent/sources.py Name: delete_datasource",
"File: great_expectations/datasource/fluent/spark_datasource.py Name: get_batch_list_from_batch_request",
"File: great_expectations/datasource/fluent/sql_datasource.py Name: get_batch_list_from_batch_request",
"File: great_expectations/datasource/fluent/invalid_datasource.py Name: get_asset",
"File: great_expectations/datasource/fluent/invalid_datasource.py Name: get_batch_list_from_batch_request",
"File: great_expectations/datasource/fluent/invalid_datasource.py Name: add_sorters",
"File: great_expectations/datasource/fluent/invalid_datasource.py Name: build_batch_request",
"File: great_expectations/datasource/fluent/invalid_datasource.py Name: get_batch_request_options_keys",
"File: great_expectations/datasource/new_datasource.py Name: get_batch_list_from_batch_request",
"File: great_expectations/exceptions/exceptions.py Name: InvalidExpectationConfigurationError",
"File: great_expectations/expectations/expectation.py Name: validate_configuration",
Expand Down
25 changes: 20 additions & 5 deletions great_expectations/data_context/store/datasource_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

import copy
import logging
import warnings
from pprint import pformat as pf
from typing import TYPE_CHECKING, Optional, Union, overload

import great_expectations.exceptions as gx_exceptions
from great_expectations.compatibility.pydantic import (
ValidationError as PydanticValidationError,
)
from great_expectations.compatibility.typing_extensions import override
from great_expectations.core.data_context_key import (
DataContextKey,
Expand All @@ -18,6 +22,10 @@
)
from great_expectations.data_context.types.refs import GXCloudResourceRef
from great_expectations.datasource.fluent import Datasource as FluentDatasource
from great_expectations.datasource.fluent import (
GxInvalidDatasourceWarning,
InvalidDatasource,
)
from great_expectations.datasource.fluent.sources import _SourceFactories
from great_expectations.util import filter_properties_dict

Expand Down Expand Up @@ -111,12 +119,19 @@ def deserialize(
return value
elif isinstance(value, dict):
# presence of a 'type' field means it's a fluent datasource
type_ = value.get("type")
type_: str | None = value.get("type")
if type_:
datasource_model = _SourceFactories.type_lookup.get(type_)
if not datasource_model:
raise LookupError(f"Unknown Datasource 'type': '{type_}'")
return datasource_model(**value)
try:
datasource_model = _SourceFactories.type_lookup[type_]
return datasource_model(**value)
except (PydanticValidationError, LookupError) as config_error:
warnings.warn(
f"Datasource {value.get('name', '')} configuration is invalid."
" Check `my_datasource.config_error` attribute for more details.",
GxInvalidDatasourceWarning,
)
# Any fields that are not part of the schema are ignored
return InvalidDatasource(config_error=config_error, **value)
return self._schema.load(value)
else:
return self._schema.loads(value)
Expand Down
5 changes: 5 additions & 0 deletions great_expectations/datasource/fluent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
GxDatasourceWarning,
TestConnectionError,
)
from great_expectations.datasource.fluent.invalid_datasource import (
InvalidAsset,
InvalidDatasource,
GxInvalidDatasourceWarning,
)

# Now that DataAsset has both been defined, we need to
# provide it to the BatchConfig pydantic model.
Expand Down
207 changes: 207 additions & 0 deletions great_expectations/datasource/fluent/invalid_datasource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
from __future__ import annotations

import warnings
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Final,
List,
NoReturn,
Type,
Union,
overload,
)

from great_expectations.compatibility import pydantic
from great_expectations.compatibility.pydantic import Field
from great_expectations.compatibility.typing_extensions import override
from great_expectations.datasource.fluent import (
DataAsset,
Datasource,
GxDatasourceWarning,
TestConnectionError,
)
from great_expectations.datasource.fluent.type_lookup import TypeLookup, ValidTypes

if TYPE_CHECKING:
from great_expectations.core.partitioners import Partitioner
from great_expectations.datasource.fluent.batch_request import BatchRequest
from great_expectations.datasource.fluent.interfaces import Batch


# Controls which methods should raise an error when called on an InvalidDatasource
METHOD_SHOULD_RAISE_ERROR: Final[set] = {
"get_batch_list_from_batch_request",
"add_batch_config",
}


class GxInvalidDatasourceWarning(GxDatasourceWarning):
"""
A warning that the Datasource configuration is invalid and will must be updated before it can used.
"""


class InvalidAsset(DataAsset):
"""
A DataAsset that is invalid.
The DataAsset itself may be valid, but it is classified as invalid because its parent Datasource or sibling assets are invalid.
"""

type: str = "invalid"
name: str = "invalid"

class Config:
extra = "ignore"

def _raise_type_error(self) -> NoReturn:
"""
Raise a TypeError indicating that the Asset is invalid.
If available, raise from the original config error that caused the Datasource to be invalid.
"""
error = TypeError(f"{self.name} Asset is invalid")
if datasource := getattr(self, "datasource", None):
raise error from datasource.config_error
raise error

@override
def test_connection(self) -> None:
if datasource := getattr(self, "datasource", None):
raise TestConnectionError(
f"The Datasource configuration for {self.name} is invalid and cannot be used. Please fix the error and try again"
) from datasource.config_error
# the asset should always have a datasource, but if it doesn't, we should still raise an error
raise TestConnectionError(
"This Asset configuration is invalid and cannot be used. Please fix the error and try again"
)

@override
def add_batch_config(self, name: str, partitioner: Any | None = None) -> NoReturn:
self._raise_type_error()

@override
def add_sorters(self, sorters: List[Any]) -> NoReturn:
self._raise_type_error()

@override
def build_batch_request(
self,
options: dict | None = None,
batch_slice: Any = None,
partitioner: Any = None,
) -> NoReturn:
self._raise_type_error()

@override
def get_batch_list_from_batch_request(
self, batch_request: BatchRequest
) -> NoReturn:
self._raise_type_error()

@override
def sort_batches(self, batch_list: List[Batch]) -> None:
self._raise_type_error()

@override
def get_batch_request_options_keys(
self, partitioner: Partitioner | None = None
) -> NoReturn:
self._raise_type_error()


class InvalidAssetTypeLookup(TypeLookup):
"""A TypeLookup that always returns InvalidAsset for any type."""

@overload
def __getitem__(self, key: str) -> Type: ...

@overload
def __getitem__(self, key: Type) -> str: ...

@override
def __getitem__(self, key: ValidTypes) -> ValidTypes:
if isinstance(key, str):
return InvalidAsset
# if a type is passed, normally we would return the type name but that doesn't make sense here
# for an InvalidAsset
raise NotImplementedError(
f"Looking up the `type` name for {InvalidAsset.__name__} is not supported"
)


class InvalidDatasource(Datasource):
"""
A Datasource that is invalid.
This is used to represent a Datasource that is invalid and cannot be used.
This class should override all methods that would commonly be called when a user intends to use the Datasource.
The overridden methods should indicate to the user that the Datasource configuration is invalid and provide details about
why it was considered invalid.
Any errors raised should raise `from self.config_error`.
"""

# class var definitions
asset_types: ClassVar[List[Type[DataAsset]]] = [InvalidAsset]
_type_lookup: ClassVar[TypeLookup] = InvalidAssetTypeLookup()

type: str = "invalid"
config_error: Union[pydantic.ValidationError, LookupError] = Field(
..., description="The error that caused the Datasource to be invalid."
)
assets: List[InvalidAsset] = []

class Config:
extra = "ignore"
arbitrary_types_allowed = True
json_encoders = {
pydantic.ValidationError: lambda v: v.errors(),
LookupError: lambda v: repr(v),
}

@override
def test_connection(self, test_assets: bool = True) -> None:
raise TestConnectionError(
"This Datasource configuration is invalid and cannot be used. Please fix the error and try again"
) from self.config_error

@override
def get_asset(self, asset_name: str) -> InvalidAsset:
"""
Always raise a warning and return an InvalidAsset.
Don't raise an error because the users may want to inspect the asset config.
"""
warnings.warn(
f"The {self.name} Datasource configuration is invalid and cannot be used. Please fix the error and try again",
GxInvalidDatasourceWarning,
)
return super().get_asset(asset_name)

def _raise_type_error(self, *args, **kwargs) -> NoReturn:
"""
Raise a TypeError indicating that the Datasource is invalid.
Raise from the original config error that caused the Datasource to be invalid.
"""
error = TypeError(
f"{self.name} Datasource is configuration is invalid and cannot be used. Please fix the error and try again"
)
raise error from self.config_error

@override
def __getattribute__(self, attr: str):
"""
Dynamically raise a TypeError with details of the original config error for
any methods and attributes that do not make sense for an InvalidDatasource.
"""
if attr in METHOD_SHOULD_RAISE_ERROR:
raise AttributeError # this causes __getattr__ to be called
return super().__getattribute__(attr)

def __getattr__(self, attr: str):
# __getattr__ is only called if the attribute is not found by __getattribute__
if attr in ("add_dataframe_asset", "__deepcopy__"):
# these methods are part of protocol checks and should return None
return None
return self._raise_type_error()
2 changes: 1 addition & 1 deletion great_expectations/datasource/fluent/metadatasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __new__( # noqa: PYI034 # Self cannot be used with Metaclass

cls = super().__new__(meta_cls, cls_name, bases, cls_dict)

if cls_name == "Datasource" or cls_name.startswith("_"):
if cls_name in ("Datasource", "InvalidDatasource") or cls_name.startswith("_"):
# NOTE: the above check is brittle and must be kept in-line with the Datasource.__name__
logger.debug(f"1c. Skip factory registration of base `{cls_name}`")
return cls
Expand Down
8 changes: 7 additions & 1 deletion great_expectations/datasource/fluent/type_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,13 @@ def __getitem__(self, key: Type) -> str: ...

@override
def __getitem__(self, key: ValidTypes) -> ValidTypes:
return super().__getitem__(key)
try:
return super().__getitem__(key)
except KeyError as key_err:
msg = f"{key} was not found."
if isinstance(key, str):
msg = f"type {msg} Available types are: {', '.join(self.type_names())}"
raise LookupError(msg) from key_err

@override
def __delitem__(self, key: ValidTypes):
Expand Down

0 comments on commit 2016db7

Please sign in to comment.