Skip to content

Commit

Permalink
Feature/typed val ops results 0.11.x (#1472)
Browse files Browse the repository at this point in the history
* Add new ValidationOperatorResult type

* Use new ValidationOperatorResult type

* Add “name”

* Bring back typed run_id

* Use typed run_id

* Nix unnecessary convert_to_json_serializable for run_id

* Add blank line

* Generate upgrade checklist at end of init

* Sort batch_identifiers

* Update test

* Updating tests to comply with the new validation operator results class.

* Update notebook renderers to use new ValidationOperatorResult

* Update test

* Update changelog

* Update tests

* Update test

Co-authored-by: Alex Sherstinsky <alex@superconductivehealth.com>
  • Loading branch information
roblim and Alex Sherstinsky committed May 22, 2020
1 parent 4f5d851 commit 2ad4870
Show file tree
Hide file tree
Showing 15 changed files with 544 additions and 145 deletions.
2 changes: 2 additions & 0 deletions docs/reference/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Changelog
* (BREAKING) ``ValidationMetric`` and ``ValidationMetricIdentifier`` objects now have a ``data_asset_name`` attribute.
Existing projects with evaluation parameter stores that have database backends must be migrated.
See :ref:`Upgrading to 0.11.x-beta` for instructions.
* (BREAKING) ``ValidationOperator.run`` now returns an instance of new type, ``ValidationOperatorResult`` (instead of a
dictionary). If your code uses output from Validation Operators, it must be updated.
* Data Docs: redesigned index page with paginated/sortable/searchable/filterable tables
* Data Docs: searchable tables on Expectation Suite Validation Result pages
* ``data_asset_name`` is now added to batch_kwargs by batch_kwargs_generators (if available) and surfaced in Data Docs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import os

from dateutil.parser import ParserError, parse

from great_expectations import DataContext
from great_expectations.data_context.store import (
DatabaseStoreBackend,
Expand Down Expand Up @@ -61,6 +60,7 @@ def __init__(self, data_context=None, context_root_dir=None):
TupleS3StoreBackend: self._update_tuple_s3_store_backend_run_id,
TupleGCSStoreBackend: self._update_tuple_gcs_store_backend_run_id,
}
self._generate_upgrade_checklist()

def _generate_upgrade_checklist(self):
for (store_name, store) in self.data_context.stores.items():
Expand Down
12 changes: 7 additions & 5 deletions great_expectations/data_context/data_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@
import webbrowser
from typing import Dict, List, Optional, Union

from dateutil.parser import ParserError, parse
from marshmallow import ValidationError
from ruamel.yaml import YAML, YAMLError

import great_expectations.exceptions as ge_exceptions
from dateutil.parser import ParserError, parse
from great_expectations.core import (
ExpectationSuite,
RunIdentifier,
Expand Down Expand Up @@ -63,6 +60,8 @@
from great_expectations.render.renderer.site_builder import SiteBuilder
from great_expectations.util import verify_dynamic_loading_support
from great_expectations.validator.validator import Validator
from marshmallow import ValidationError
from ruamel.yaml import YAML, YAMLError

try:
from sqlalchemy.exc import SQLAlchemyError
Expand Down Expand Up @@ -370,7 +369,10 @@ def add_validation_operator(
module_name = "great_expectations.validation_operators"
new_validation_operator = instantiate_class_from_config(
config=config,
runtime_environment={"data_context": self,},
runtime_environment={
"data_context": self,
"name": validation_operator_name,
},
config_defaults={"module_name": module_name},
)
if not new_validation_operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import Union

import nbformat

from great_expectations.core import ExpectationSuite
from great_expectations.core.id_dict import BatchKwargs
from great_expectations.render.renderer.renderer import Renderer
Expand Down Expand Up @@ -116,12 +115,7 @@ def add_footer(self) -> None:
}
results = context.run_validation_operator("action_list_operator", assets_to_validate=[batch], run_id=run_id)
expectation_suite_identifier = list(results["details"].keys())[0]
validation_result_identifier = ValidationResultIdentifier(
expectation_suite_identifier=expectation_suite_identifier,
batch_identifier=batch.batch_kwargs.to_id(),
run_id=run_id
)
validation_result_identifier = results.list_validation_result_identifiers()[0]
context.build_data_docs()
context.open_data_docs(validation_result_identifier)"""
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import nbformat

from great_expectations import DataContext
from great_expectations.core import ExpectationSuite
from great_expectations.dataset import Dataset
Expand Down Expand Up @@ -77,12 +76,7 @@ def add_footer(self):
run_id = datetime.utcnow().strftime("%Y%m%dT%H%M%S.%fZ")
results = context.run_validation_operator("action_list_operator", assets_to_validate=[batch], run_id=run_id)
expectation_suite_identifier = list(results["details"].keys())[0]
validation_result_identifier = ValidationResultIdentifier(
expectation_suite_identifier=expectation_suite_identifier,
batch_identifier=batch.batch_kwargs.to_id(),
run_id=run_id
)
validation_result_identifier = results.list_validation_result_identifiers()[0]
context.build_data_docs()
context.open_data_docs(validation_result_identifier)"""
)
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
from copy import deepcopy
from typing import Dict, List, Union

from great_expectations.core import (
ExpectationSuiteValidationResult,
convert_to_json_serializable,
RunIdentifierSchema, RunIdentifier)
from great_expectations.core.id_dict import BatchKwargs
from great_expectations.data_context.types.resource_identifiers import (
ValidationResultIdentifier,
)
from great_expectations.types import DictDot
from marshmallow import Schema, fields, post_load, pre_dump


class ValidationOperatorResult(DictDot):
"""
The run_results property forms the backbone of this type and defines the basic contract for what a validation
operator's run method returns. It is a dictionary where the top-level keys are the ValidationResultIdentifiers of
the validation results generated in the run. Each value is a dictionary having at minimum,
a validation_result key; this dictionary can contain other keys that are relevant for a specific validation operator
implementation. For example, the dictionary from a WarningAndFailureExpectationSuitesValidationOperator
would have an extra key named "expectation_suite_severity_level" to indicate if the suite is at either a
"warning" or "failure" level, as well as an "actions_results" key.
e.g.
{
ValidationResultIdentifier: {
"validation_result": ExpectationSuiteValidationResult,
"actions_results": {}
}
}
"""

def __init__(
self,
run_id: RunIdentifier,
run_results: Dict[
ValidationResultIdentifier,
Dict[str, Union[ExpectationSuiteValidationResult, dict, str]],
],
validation_operator_config,
evaluation_parameters: dict = None,
success: bool = None,
) -> None:
self._run_id = run_id
self._run_results = run_results
self._evaluation_parameters = evaluation_parameters
self._validation_operator_config = validation_operator_config
self._success = success

self._validation_results = None
self._data_assets_validated = None
self._data_assets_validated_by_batch_id = None
self._validation_result_identifiers = None
self._expectation_suite_names = None
self._data_asset_names = None
self._validation_results_by_expectation_suite_name = None
self._validation_results_by_data_asset_name = None
self._batch_identifiers = None
self._statistics = None
self._validation_statistics = None
self._validation_results_by_validation_result_identifier = None

@property
def validation_operator_config(self) -> dict:
return self._validation_operator_config

@property
def run_results(
self,
) -> Dict[
ValidationResultIdentifier,
Dict[str, Union[ExpectationSuiteValidationResult, dict]],
]:
return self._run_results

@property
def run_id(self) -> RunIdentifier:
return self._run_id

@property
def evaluation_parameters(self) -> Union[dict, None]:
return self._evaluation_parameters

@property
def success(self) -> bool:
if self._success is None:
self._success = all(
[
run_result["validation_result"].success
for run_result in self.run_results.values()
]
)
return self._success

def list_batch_identifiers(self) -> List[str]:
if self._batch_identifiers is None:
self._batch_identifiers = list(
set(
[
validation_result_identifier.batch_identifier
for validation_result_identifier in self.list_validation_result_identifiers()
]
)
)
return self._batch_identifiers

def list_data_asset_names(self) -> List[str]:
if self._data_asset_names is None:
self._data_asset_names = list(
set(
[
data_asset["batch_kwargs"].get("data_asset_name") or "__none__"
for data_asset in self.list_data_assets_validated()
]
)
)
return self._data_asset_names

def list_expectation_suite_names(self) -> List[str]:
if self._expectation_suite_names is None:
self._expectation_suite_names = list(
set(
[
validation_result_identifier.expectation_suite_identifier.expectation_suite_name
for validation_result_identifier in self.run_results.keys()
]
)
)
return self._expectation_suite_names

def list_validation_result_identifiers(self) -> List[ValidationResultIdentifier]:
if self._validation_result_identifiers is None:
self._validation_result_identifiers = list(self._run_results.keys())
return self._validation_result_identifiers

def list_validation_results(
self, group_by=None
) -> Union[List[ExpectationSuiteValidationResult], dict]:
if group_by is None:
if self._validation_results is None:
self._validation_results = [
run_result["validation_result"]
for run_result in self.run_results.values()
]
return self._validation_results
elif group_by == "validation_result_identifier":
return self._list_validation_results_by_validation_result_identifier()
elif group_by == "expectation_suite_name":
return self._list_validation_results_by_expectation_suite_name()
elif group_by == "data_asset_name":
return self._list_validation_results_by_data_asset_name()

def _list_validation_results_by_validation_result_identifier(self) -> dict:
if self._validation_results_by_validation_result_identifier is None:
self._validation_results_by_validation_result_identifier = {
validation_result_identifier: run_result["validation_result"]
for validation_result_identifier, run_result in self.run_results.items()
}
return self._validation_results_by_validation_result_identifier

def _list_validation_results_by_expectation_suite_name(self) -> dict:
if self._validation_results_by_expectation_suite_name is None:
self._validation_results_by_expectation_suite_name = {
expectation_suite_name: [
run_result["validation_result"]
for run_result in self.run_results.values()
if run_result["validation_result"].meta["expectation_suite_name"]
== expectation_suite_name
]
for expectation_suite_name in self.list_expectation_suite_names()
}
return self._validation_results_by_expectation_suite_name

def _list_validation_results_by_data_asset_name(self) -> dict:
if self._validation_results_by_data_asset_name is None:
validation_results_by_data_asset_name = {}
for data_asset_name in self.list_data_asset_names():
if data_asset_name == "__none__":
validation_results_by_data_asset_name[data_asset_name] = [
data_asset["validation_results"]
for data_asset in self.list_data_assets_validated()
if data_asset["batch_kwargs"].get("data_asset_name") is None
]
else:
validation_results_by_data_asset_name[data_asset_name] = [
data_asset["validation_results"]
for data_asset in self.list_data_assets_validated()
if data_asset["batch_kwargs"].get("data_asset_name")
== data_asset_name
]
self._validation_results_by_data_asset_name = (
validation_results_by_data_asset_name
)
return self._validation_results_by_data_asset_name

def list_data_assets_validated(
self, group_by: str = None
) -> Union[List[dict], dict]:
if group_by is None:
if self._data_assets_validated is None:
self._data_assets_validated = list(
self._list_data_assets_validated_by_batch_id().values()
)
return self._data_assets_validated
if group_by == "batch_id":
return self._list_data_assets_validated_by_batch_id()

def _list_data_assets_validated_by_batch_id(self) -> dict:
if self._data_assets_validated_by_batch_id is None:
assets_validated_by_batch_id = {}

for validation_result in self.list_validation_results():
batch_kwargs = validation_result.meta["batch_kwargs"]
batch_id = BatchKwargs(batch_kwargs).to_id()
expectation_suite_name = validation_result.meta[
"expectation_suite_name"
]
if batch_id not in assets_validated_by_batch_id:
assets_validated_by_batch_id[batch_id] = {
"batch_kwargs": batch_kwargs,
"validation_results": [validation_result],
"expectation_suite_names": [expectation_suite_name],
}
else:
assets_validated_by_batch_id[batch_id]["validation_results"].append(
validation_result
)
assets_validated_by_batch_id[batch_id][
"expectation_suite_names"
].append(expectation_suite_name)
self._data_assets_validated_by_batch_id = assets_validated_by_batch_id
return self._data_assets_validated_by_batch_id

def get_statistics(self) -> dict:
if self._statistics is None:
data_asset_count = len(self.list_data_assets_validated())
validation_result_count = len(self.list_validation_results())
successful_validation_count = len(
[
validation_result
for validation_result in self.list_validation_results()
if validation_result.success
]
)
unsuccessful_validation_count = (
validation_result_count - successful_validation_count
)
successful_validation_percent = (
validation_result_count
and (successful_validation_count / validation_result_count) * 100
)

self._statistics = {
"data_asset_count": data_asset_count,
"validation_result_count": validation_result_count,
"successful_validation_count": successful_validation_count,
"unsuccessful_validation_count": unsuccessful_validation_count,
"successful_validation_percent": successful_validation_percent,
"validation_statistics": self._list_validation_statistics(),
}

return self._statistics

def _list_validation_statistics(self) -> Dict[ValidationResultIdentifier, dict]:
if self._validation_statistics is None:
self._validation_statistics = {
validation_result_identifier: run_result["validation_result"].statistics
for validation_result_identifier, run_result in self.run_results.items()
}
return self._validation_statistics

def to_json_dict(self):
return validationOperatorResultSchema.dump(self)


class ValidationOperatorResultSchema(Schema):
run_id = fields.Nested(RunIdentifierSchema)
run_results = fields.Dict()
evaluation_parameters = fields.Dict(allow_none=True)
validation_operator_config = fields.Dict()
success = fields.Bool()

# noinspection PyUnusedLocal
@pre_dump
def prepare_dump(self, data, **kwargs):
data = deepcopy(data)
data._run_results = convert_to_json_serializable(data.run_results)
return data

# noinspection PyUnusedLocal
@post_load
def make_expectation_suite_validation_result(self, data, **kwargs):
return ValidationOperatorResult(**data)


validationOperatorResultSchema = ValidationOperatorResultSchema()

0 comments on commit 2ad4870

Please sign in to comment.