Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/custom types #626

Merged
merged 8 commits into from Aug 17, 2019
10 changes: 10 additions & 0 deletions docs/roadmap_and_changelog/changelog.rst
Expand Up @@ -2,6 +2,16 @@

v.0.7.7
-----------------
* Standardize the way that plugin module loading works. DataContext will begin to use the new-style class and plugin
identification moving forward; yml configs should specify class_name and module_name (with module_name optional for
GE types). For now, it is possible to use the "type" parameter in configuration (as before).
* Add support for custom data_asset_type to all datasources
* Fix several memory and performance issues in SparkDFDataset.
- Use only distinct value count instead of bringing values to driver
- Migrate away from UDF for set membership, nullity, and regex expectations
* Fix several UI issues in the data_documentation
- Broken link on Home
- Scroll follows navigation properly
* Add support for strict_min and strict_max to inequality-based expectations to allow strict inequality checks
(thanks @RoyalTS!)

Expand Down
1 change: 1 addition & 0 deletions great_expectations/data_context/types/__init__.py
@@ -0,0 +1 @@
from .configurations import ClassConfig
18 changes: 18 additions & 0 deletions great_expectations/data_context/types/configurations.py
@@ -0,0 +1,18 @@
from ruamel.yaml import YAML, yaml_object
from great_expectations.types import LooselyTypedDotDict
yaml = YAML()


@yaml_object(yaml)
class ClassConfig(LooselyTypedDotDict):
_allowed_keys = {
"module_name",
"class_name"
}
_required_keys = {
"class_name"
}
_key_types = {
"module_name": str,
"class_name": str
}
2 changes: 1 addition & 1 deletion great_expectations/datasource/__init__.py
@@ -1,4 +1,4 @@
from .datasource import Datasource
from .datasource import Datasource
from .pandas_source import PandasDatasource
from .sqlalchemy_source import SqlAlchemyDatasource
from .spark_source import SparkDFDatasource
Expand Down
62 changes: 61 additions & 1 deletion great_expectations/datasource/datasource.py
Expand Up @@ -10,6 +10,10 @@

from ..data_context.util import NormalizedDataAssetName
from great_expectations.exceptions import BatchKwargsError
from great_expectations.data_context.types import ClassConfig
from great_expectations.exceptions import InvalidConfigError
import warnings
from importlib import import_module

logger = logging.getLogger(__name__)
yaml = YAML()
Expand Down Expand Up @@ -75,13 +79,18 @@ def __init__(self, name, type_, data_context=None, data_asset_type=None, generat
"""
self._data_context = data_context
self._name = name
if isinstance(data_asset_type, string_types):
warnings.warn(
"String-only configuration for data_asset_type is deprecated. Use module_name and class_name instead.",
DeprecationWarning)
self._data_asset_type = data_asset_type
self._generators = {}
if generators is None:
generators = {}
self._datasource_config = {
"type": type_,
"generators": generators
"generators": generators,
"data_asset_type": data_asset_type
}

# extra_config = self._load_datasource_config()
Expand Down Expand Up @@ -413,3 +422,54 @@ def _guess_reader_method_from_path(path):
return ReaderMethods.JSON
else:
return None

def _get_data_asset_class(self, data_asset_type):
"""Returns the class to be used to generate a data_asset from this datasource"""
if isinstance(data_asset_type, string_types):
# We have a custom type, but it is defined with only a string
try:
logger.warning("Use of custom_data_assets module is deprecated. Please define data_asset_type"
"using a module_name and class_name.")
# FOR LEGACY REASONS support the fixed "custom_data_assets" name
# FIXME: this option should be removed in a future release
custom_data_assets_module = __import__("custom_data_assets", fromlist=["custom_data_assets"])
data_asset_type_class = getattr(custom_data_assets_module, data_asset_type)
return data_asset_type_class
except ImportError:
logger.error(
"Unable to import custom_data_asset module. "
"Check the plugins directory for 'custom_data_assets'."
)
raise InvalidConfigError(
"Unable to import custom_data_asset module. "
"Check the plugins directory for 'custom_data_assets'."
)
except AttributeError:
logger.error(
"Unable to find data_asset_type: '%s'." % data_asset_type
)
raise InvalidConfigError("Unable to find data_asset_type: '%s'." % data_asset_type)
elif isinstance(data_asset_type, ClassConfig):
try:
if data_asset_type.module_name is None:
data_asset_type.module_name = "great_expectations.dataset"

loaded_module = import_module(data_asset_type.module_name)
data_asset_type_class = getattr(loaded_module, data_asset_type.class_name)
return data_asset_type_class
except ImportError:
logger.error(
"Unable to find module '%s'." % data_asset_type.module_name
)
raise InvalidConfigError("Unable to find module '%s'." % data_asset_type.module_name)
except AttributeError:
logger.error(
"Unable to find data_asset_type: '%s' in module '%s'."
% (data_asset_type.class_name, data_asset_type.module_name)
)
raise InvalidConfigError(
"Unable to find data_asset_type: '%s' in module '%s'."
% (data_asset_type.class_name, data_asset_type.module_name)
)
else:
raise InvalidConfigError("Invalid configuration for data_asset_type")
41 changes: 34 additions & 7 deletions great_expectations/datasource/pandas_source.py
Expand Up @@ -7,7 +7,7 @@
from great_expectations.datasource.generator.filesystem_path_generator import SubdirReaderGenerator, GlobReaderGenerator
from great_expectations.datasource.generator.in_memory_generator import InMemoryGenerator
from great_expectations.dataset.pandas_dataset import PandasDataset

from great_expectations.data_context.types import ClassConfig
from great_expectations.exceptions import BatchKwargsError


Expand All @@ -17,7 +17,7 @@ class PandasDatasource(Datasource):
existing in-memory dataframes.
"""

def __init__(self, name="pandas", data_context=None, data_asset_type="SqlAlchemyDataset", generators=None, **kwargs):
def __init__(self, name="pandas", data_context=None, data_asset_type=None, generators=None, **kwargs):
if generators is None:
# Provide a gentle way to build a datasource with a sane default,
# including ability to specify the base_directory and reader_options
Expand All @@ -34,6 +34,16 @@ def __init__(self, name="pandas", data_context=None, data_asset_type="SqlAlchemy
"reader_options": reader_options
}
}
if data_asset_type is None:
data_asset_type = ClassConfig(
class_name="PandasDataset")
else:
try:
data_asset_type = ClassConfig(**data_asset_type)
except TypeError:
# In this case, we allow the passed config, for now, in case they're using a legacy string-only config
pass

super(PandasDatasource, self).__init__(name, type_="pandas",
data_context=data_context,
data_asset_type=data_asset_type,
Expand All @@ -52,8 +62,25 @@ def _get_generator_class(self, type_):

def _get_data_asset(self, batch_kwargs, expectation_suite, **kwargs):
batch_kwargs.update(kwargs)
reader_options = batch_kwargs.copy()

if "data_asset_type" in reader_options:
data_asset_type_config = reader_options.pop("data_asset_type") # Get and remove the config
try:
data_asset_type_config = ClassConfig(**data_asset_type_config)
except TypeError:
# We tried; we'll pass the config downstream, probably as a string, and handle an error later
pass
else:
data_asset_type_config = self._data_asset_type

data_asset_type = self._get_data_asset_class(data_asset_type_config)

if not issubclass(data_asset_type, PandasDataset):
raise ValueError("PandasDatasource cannot instantiate batch with data_asset_type: '%s'. It "
"must be a subclass of PandasDataset." % data_asset_type.__name__)

if "path" in batch_kwargs:
reader_options = batch_kwargs.copy()
path = reader_options.pop("path") # We need to remove from the reader
reader_options.pop("timestamp", "") # ditto timestamp (but missing ok)

Expand Down Expand Up @@ -86,10 +113,10 @@ def _get_data_asset(self, batch_kwargs, expectation_suite, **kwargs):
raise BatchKwargsError("Invalid batch_kwargs: path or df is required for a PandasDatasource",
batch_kwargs)

return PandasDataset(df,
expectation_suite=expectation_suite,
data_context=self._data_context,
batch_kwargs=batch_kwargs)
return data_asset_type(df,
expectation_suite=expectation_suite,
data_context=self._data_context,
batch_kwargs=batch_kwargs)

def build_batch_kwargs(self, *args, **kwargs):
if len(args) > 0:
Expand Down
42 changes: 36 additions & 6 deletions great_expectations/datasource/spark_source.py
Expand Up @@ -9,6 +9,8 @@
from great_expectations.datasource.generator.databricks_generator import DatabricksTableGenerator
from great_expectations.datasource.generator.in_memory_generator import InMemoryGenerator

from great_expectations.data_context.types import ClassConfig

logger = logging.getLogger(__name__)

try:
Expand All @@ -24,7 +26,7 @@ class SparkDFDatasource(Datasource):
filesystem (the default subdir_reader generator) and databricks notebooks.
"""

def __init__(self, name="default", data_context=None, data_asset_type="SparkDFDataset", generators=None, **kwargs):
def __init__(self, name="default", data_context=None, data_asset_type=None, generators=None, **kwargs):
if generators is None:
# Provide a gentle way to build a datasource with a sane default,
# including ability to specify the base_directory
Expand All @@ -37,6 +39,18 @@ def __init__(self, name="default", data_context=None, data_asset_type="SparkDFDa
"reader_options": reader_options
}
}

if data_asset_type is None:
data_asset_type = ClassConfig(
class_name="SparkDFDataset"
)
else:
try:
data_asset_type = ClassConfig(**data_asset_type)
except TypeError:
# In this case, we allow the passed config, for now, in case they're using a legacy string-only config
pass

super(SparkDFDatasource, self).__init__(name, type_="spark",
data_context=data_context,
data_asset_type=data_asset_type,
Expand Down Expand Up @@ -67,6 +81,22 @@ def _get_data_asset(self, batch_kwargs, expectation_suite, caching=True, **kwarg

batch_kwargs.update(kwargs)
reader_options = batch_kwargs.copy()

if "data_asset_type" in reader_options:
data_asset_type_config = reader_options.pop("data_asset_type") # Get and remove the config
try:
data_asset_type_config = ClassConfig(**data_asset_type_config)
except TypeError:
# We tried; we'll pass the config downstream, probably as a string, and handle an error later
pass
else:
data_asset_type_config = self._data_asset_type

data_asset_type = self._get_data_asset_class(data_asset_type_config)
if not issubclass(data_asset_type, SparkDFDataset):
raise ValueError("SparkDFDatasource cannot instantiate batch with data_asset_type: '%s'. It "
"must be a subclass of SparkDFDataset." % data_asset_type.__name__)

if "path" in batch_kwargs:
path = reader_options.pop("path") # We remove this so it is not used as a reader option
reader_options.pop("timestamp", "") # ditto timestamp (but missing ok)
Expand Down Expand Up @@ -105,11 +135,11 @@ def _get_data_asset(self, batch_kwargs, expectation_suite, caching=True, **kwarg
else:
raise BatchKwargsError("Unrecognized batch_kwargs for spark_source", batch_kwargs)

return SparkDFDataset(df,
expectation_suite=expectation_suite,
data_context=self._data_context,
batch_kwargs=batch_kwargs,
caching=caching)
return data_asset_type(df,
expectation_suite=expectation_suite,
data_context=self._data_context,
batch_kwargs=batch_kwargs,
caching=caching)

def build_batch_kwargs(self, *args, **kwargs):
if len(args) > 0:
Expand Down
50 changes: 25 additions & 25 deletions great_expectations/datasource/sqlalchemy_source.py
Expand Up @@ -6,6 +6,7 @@
from great_expectations.dataset.sqlalchemy_dataset import SqlAlchemyDataset
from .generator.query_generator import QueryGenerator
from great_expectations.exceptions import DatasourceInitializationError
from great_expectations.data_context.types import ClassConfig

logger = logging.getLogger(__name__)

Expand All @@ -28,14 +29,25 @@ class SqlAlchemyDatasource(Datasource):
uses $parameter, with additional kwargs passed to the get_batch method.
"""

def __init__(self, name="default", data_context=None, data_asset_type="SqlAlchemyDataset", profile=None, generators=None, **kwargs):
def __init__(self, name="default", data_context=None, data_asset_type=None, profile=None, generators=None, **kwargs):
if not sqlalchemy:
raise DatasourceInitializationError(name, "ModuleNotFoundError: No module named 'sqlalchemy'")

if generators is None:
generators = {
"default": {"type": "queries"}
}

if data_asset_type is None:
data_asset_type = ClassConfig(
class_name="SqlAlchemyDataset")
else:
try:
data_asset_type = ClassConfig(**data_asset_type)
except TypeError:
# In this case, we allow the passed config, for now, in case they're using a legacy string-only config
pass

super(SqlAlchemyDatasource, self).__init__(name,
type_="sqlalchemy",
data_context=data_context,
Expand Down Expand Up @@ -98,33 +110,21 @@ def _get_generator_class(self, type_):
raise ValueError("Unrecognized DataAssetGenerator type %s" % type_)

def _get_data_asset(self, batch_kwargs, expectation_suite, **kwargs):
if "data_asset_type" in batch_kwargs and batch_kwargs["data_asset_type"] != self._data_asset_type:
data_asset_type_name = batch_kwargs["data_asset_type"]
elif self._data_asset_type is None:
# Default fallback
data_asset_type_name = "SqlAlchemyDataset"
else:
data_asset_type_name = self._data_asset_type
batch_kwargs.update(kwargs)

if data_asset_type_name != "SqlAlchemyDataset":
if "data_asset_type" in batch_kwargs:
# Sqlalchemy does not use reader_options or need to remove batch_kwargs since it does not pass
# options through to a later reader
data_asset_type_config = batch_kwargs["data_asset_type"]
try:
custom_data_assets_module = __import__("custom_data_assets", fromlist=["custom_data_assets"])
data_asset_type = getattr(custom_data_assets_module, data_asset_type_name)
except ImportError:
logger.error(
"Unable to import custom_data_asset module. Check the plugins directory for 'custom_data_assets'. "
"Falling back to 'SqlAlchemyDataset' class."
)
data_asset_type = SqlAlchemyDataset
except AttributeError:
logger.error(
"Unable to find data_asset_type: %s. Falling back to 'SqlAlchemyDataset' class."
% data_asset_type_name
)
data_asset_type = SqlAlchemyDataset

data_asset_type_config = ClassConfig(**data_asset_type_config)
except TypeError:
# We tried; we'll pass the config downstream, probably as a string, and handle an error later
pass
else:
data_asset_type = SqlAlchemyDataset
data_asset_type_config = self._data_asset_type

data_asset_type = self._get_data_asset_class(data_asset_type_config)

if not issubclass(data_asset_type, SqlAlchemyDataset):
raise ValueError("SqlAlchemyDatasource cannot instantiate batch with data_asset_type: '%s'. It "
Expand Down
5 changes: 4 additions & 1 deletion great_expectations/exceptions.py
Expand Up @@ -9,6 +9,10 @@ class DataContextError(GreatExpectationsError):

class ProfilerError(GreatExpectationsError):
pass

class InvalidConfigError(DataContextError):
def __init__(self, message):
self.message = message

class ConfigNotFoundError(DataContextError):
def __init__(self, context_root_directory):
Expand All @@ -27,4 +31,3 @@ def __init__(self, message, batch_kwargs):
class DatasourceInitializationError(GreatExpectationsError):
def __init__(self, datasource_name, message):
self.message = "Cannot initialize datasource %s, error: %s" % (datasource_name, message)