Skip to content

Commit

Permalink
[BUGFIX] Addresses issue #2993 (#3054) by using configuration when it…
Browse files Browse the repository at this point in the history
… is available instead of discovering keys (listing keys) in existing sources. (#3377)

* BUGFIX #3054: Removed the file scan for the expectations suite by pulling it from the provided parameters instead

* Updated call to DataContext.store_evaluation_parameters, it was being called with the incorrect arguments

* Got unit tests passing defined at data_context/store/test_valuation_parameter_store.py, that rely on a store.list_keys() to pass (this may be another bug, as no files are identified as test fixtures)

* Updated changelog

* Fixed linting issues from PR

* Added unit test to test_store_backends

* Test passes

* Linted

* Lilnted merge

Co-authored-by: talagluck <tal@superconductive.com>
  • Loading branch information
donaldheppner and talagluck committed Sep 17, 2021
1 parent 8caa2e7 commit 82b3e29
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 20 deletions.
1 change: 1 addition & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ title: Changelog
---

### Develop
* [BUGFIX] Addresses issue #2993 (#3054) by using configuration over list keys when possible.


### 0.13.34
Expand Down
4 changes: 2 additions & 2 deletions great_expectations/checkpoint/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(self, data_context):
def run(
self,
validation_result_suite,
validation_result_suite_identifier,
validation_result_suite_identifier: ValidationResultIdentifier,
data_asset,
expectation_suite_identifier=None,
checkpoint_identifier=None,
Expand Down Expand Up @@ -808,7 +808,7 @@ def __init__(self, data_context, target_store_name=None):
def _run(
self,
validation_result_suite,
validation_result_suite_identifier,
validation_result_suite_identifier: ValidationResultIdentifier,
data_asset,
payload=None,
expectation_suite_identifier=None,
Expand Down
27 changes: 19 additions & 8 deletions great_expectations/data_context/data_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2275,7 +2275,7 @@ def get_expectation_suite(
"expectation_suite %s not found" % expectation_suite_name
)

def list_expectation_suite_names(self):
def list_expectation_suite_names(self) -> List[str]:
"""Lists the available expectation suite names"""
sorted_expectation_suite_names = [
i.expectation_suite_name for i in self.list_expectation_suites()
Expand Down Expand Up @@ -2414,7 +2414,14 @@ def store_validation_result_metrics(

def store_evaluation_parameters(self, validation_results, target_store_name=None):
if not self._evaluation_parameter_dependencies_compiled:
self._compile_evaluation_parameter_dependencies()
# get the expectation suite if it exists, otherwise None
expectation_suite_name = validation_results.meta["expectation_suite_name"]
if expectation_suite_name in self.list_expectation_suite_names():
expectation_suite = self.get_expectation_suite(expectation_suite_name)
else:
expectation_suite = None

self._compile_evaluation_parameter_dependencies(expectation_suite)

if target_store_name is None:
target_store_name = self.evaluation_parameter_store_name
Expand Down Expand Up @@ -2443,13 +2450,17 @@ def validations_store_name(self):
def validations_store(self):
return self.stores[self.validations_store_name]

def _compile_evaluation_parameter_dependencies(self):
def _compile_evaluation_parameter_dependencies(self, expectation_suite):
self._evaluation_parameter_dependencies = {}
for key in self.expectations_store.list_keys():
expectation_suite = self.expectations_store.get(key)
if not expectation_suite:
continue

# only if we don't have an expectation suite do we do a key scan
if expectation_suite is None:
for key in self.expectations_store.list_keys():
expectation_suite = self.expectations_store.get(key)
if not expectation_suite:
continue

if expectation_suite:
dependencies = expectation_suite.get_evaluation_parameter_dependencies()
if len(dependencies) > 0:
nested_update(self._evaluation_parameter_dependencies, dependencies)
Expand Down Expand Up @@ -2592,7 +2603,7 @@ def build_data_docs(
if (site_names and (site_name in site_names)) or not site_names:
complete_site_config = site_config
module_name = "great_expectations.render.renderer.site_builder"
site_builder = instantiate_class_from_config(
site_builder: SiteBuilder = instantiate_class_from_config(
config=complete_site_config,
runtime_environment={
"data_context": self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def __init__(self, expectation_suite_identifier, run_id, batch_identifier):
self._batch_identifier = batch_identifier

@property
def expectation_suite_identifier(self):
def expectation_suite_identifier(self) -> ExpectationSuiteIdentifier:
return self._expectation_suite_identifier

@property
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import logging
import warnings
from collections import OrderedDict
from typing import Union

from dateutil.parser import parse

import great_expectations.exceptions as ge_exceptions
from great_expectations.checkpoint.util import send_slack_notification
from great_expectations.core.async_executor import AsyncExecutor
from great_expectations.core.batch import Batch
from great_expectations.data_asset import DataAsset
from great_expectations.data_asset.util import parse_result_format
from great_expectations.data_context.types.resource_identifiers import (
Expand Down Expand Up @@ -405,8 +407,8 @@ def run(

def _run_actions(
self,
batch,
expectation_suite_identifier,
batch: Union[Batch, DataAsset],
expectation_suite_identifier: ExpectationSuiteIdentifier,
expectation_suite,
batch_validation_result,
run_id,
Expand Down
3 changes: 2 additions & 1 deletion tests/data_context/store/test_evaluation_parameter_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
)
from great_expectations.core.metric import ValidationMetricIdentifier
from great_expectations.core.run_identifier import RunIdentifier
from great_expectations.data_context.data_context import DataContext
from great_expectations.data_context.util import instantiate_class_from_config


Expand Down Expand Up @@ -78,7 +79,7 @@ def in_memory_param_store(request, test_backends):


def test_evaluation_parameter_store_methods(
data_context_parameterized_expectation_suite,
data_context_parameterized_expectation_suite: DataContext,
):
run_id = RunIdentifier(run_name="20191125T000000.000000Z")
source_patient_data_results = ExpectationSuiteValidationResult(
Expand Down
201 changes: 200 additions & 1 deletion tests/data_context/store/test_store_backends.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import json
import os
import uuid
from collections import OrderedDict
Expand All @@ -10,7 +11,11 @@
from moto import mock_s3

import tests.test_utils as test_utils
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from great_expectations.core.expectation_suite import ExpectationSuite
from great_expectations.core.run_identifier import RunIdentifier
from great_expectations.data_context import DataContext
from great_expectations.data_context.data_context import BaseDataContext
from great_expectations.data_context.store import (
GeCloudStoreBackend,
InMemoryStoreBackend,
Expand All @@ -20,15 +25,131 @@
TupleGCSStoreBackend,
TupleS3StoreBackend,
)
from great_expectations.data_context.types.base import CheckpointConfig
from great_expectations.data_context.types.base import (
CheckpointConfig,
DataContextConfig,
)
from great_expectations.data_context.types.resource_identifiers import (
ExpectationSuiteIdentifier,
ValidationResultIdentifier,
)
from great_expectations.data_context.util import file_relative_path
from great_expectations.exceptions import InvalidKeyError, StoreBackendError, StoreError
from great_expectations.self_check.util import expectationSuiteSchema
from great_expectations.util import gen_directory_tree_str


@pytest.fixture()
def basic_data_context_config_for_validation_operator():
return DataContextConfig(
config_version=2,
plugins_directory=None,
evaluation_parameter_store_name="evaluation_parameter_store",
expectations_store_name="expectations_store",
datasources={},
stores={
"expectations_store": {"class_name": "ExpectationsStore"},
"evaluation_parameter_store": {"class_name": "EvaluationParameterStore"},
"validation_result_store": {"class_name": "ValidationsStore"},
"metrics_store": {"class_name": "MetricStore"},
},
validations_store_name="validation_result_store",
data_docs_sites={},
validation_operators={
"store_val_res_and_extract_eval_params": {
"class_name": "ActionListValidationOperator",
"action_list": [
{
"name": "store_validation_result",
"action": {
"class_name": "StoreValidationResultAction",
"target_store_name": "validation_result_store",
},
},
{
"name": "extract_and_store_eval_parameters",
"action": {
"class_name": "StoreEvaluationParametersAction",
"target_store_name": "evaluation_parameter_store",
},
},
],
},
"errors_and_warnings_validation_operator": {
"class_name": "WarningAndFailureExpectationSuitesValidationOperator",
"action_list": [
{
"name": "store_validation_result",
"action": {
"class_name": "StoreValidationResultAction",
"target_store_name": "validation_result_store",
},
},
{
"name": "extract_and_store_eval_parameters",
"action": {
"class_name": "StoreEvaluationParametersAction",
"target_store_name": "evaluation_parameter_store",
},
},
],
},
},
)


@pytest.fixture
def validation_operators_data_context(
basic_data_context_config_for_validation_operator, filesystem_csv_4
):
data_context = BaseDataContext(basic_data_context_config_for_validation_operator)

data_context.add_datasource(
"my_datasource",
class_name="PandasDatasource",
batch_kwargs_generators={
"subdir_reader": {
"class_name": "SubdirReaderBatchKwargsGenerator",
"base_directory": str(filesystem_csv_4),
}
},
)
data_context.create_expectation_suite("f1.foo")

df = data_context.get_batch(
batch_kwargs=data_context.build_batch_kwargs(
"my_datasource", "subdir_reader", "f1"
),
expectation_suite_name="f1.foo",
)
df.expect_column_values_to_be_between(column="x", min_value=1, max_value=9)
failure_expectations = df.get_expectation_suite(discard_failed_expectations=False)

df.expect_column_values_to_not_be_null(column="y")
warning_expectations = df.get_expectation_suite(discard_failed_expectations=False)

data_context.save_expectation_suite(
failure_expectations, expectation_suite_name="f1.failure"
)
data_context.save_expectation_suite(
warning_expectations, expectation_suite_name="f1.warning"
)

return data_context


@pytest.fixture()
def parameterized_expectation_suite():
fixture_path = file_relative_path(
__file__,
"../../test_fixtures/expectation_suites/parameterized_expression_expectation_suite_fixture.json",
)
with open(
fixture_path,
) as suite:
return expectationSuiteSchema.load(json.load(suite))


def test_StoreBackendValidation():
backend = InMemoryStoreBackend()

Expand Down Expand Up @@ -488,6 +609,84 @@ def test_TupleS3StoreBackend_with_prefix():
)


@mock_s3
def test_tuple_s3_store_backend_expectation_suite_and_validation_operator_share_prefix(
validation_operators_data_context: DataContext,
parameterized_expectation_suite: ExpectationSuite,
):
"""
What does this test test and why?
In cases where an s3 store is used with the same prefix for both validations
and expectations, the list_keys() operation picks up files ending in .json
from both validations and expectations stores.
To avoid this issue, the expectation suite configuration, if available, is used
to locate the specific key for the suite in place of calling list_keys().
NOTE: It is an issue with _all stores_ when the result of list_keys() contain paths
with a period (.) and are passed to ExpectationSuiteIdentifier.from_tuple() method,
as happens in the DataContext.store_evaluation_parameters() method. The best fix is
to choose a different delimiter for generating expectation suite identifiers (or
perhaps escape the period in path names).
For now, the fix is to avoid the call to list_keys() in
DataContext.store_evaluation_parameters() if the expectation suite is known (from config).
This approach should also improve performance.
This test confirms the fix for GitHub issue #3054.
"""
bucket = "leakybucket"
prefix = "this_is_a_test_prefix"

# create a bucket in Moto's mock AWS environment
conn = boto3.resource("s3", region_name="us-east-1")
conn.create_bucket(Bucket=bucket)

# replace store backends with the mock, both with the same prefix (per issue #3054)
validation_operators_data_context.validations_store._store_backend = (
TupleS3StoreBackend(
bucket=bucket,
prefix=prefix,
)
)
validation_operators_data_context.expectations_store._store_backend = (
TupleS3StoreBackend(
bucket=bucket,
prefix=prefix,
)
)

validation_operators_data_context.save_expectation_suite(
parameterized_expectation_suite, "param_suite"
)

# ensure the suite is in the context
assert validation_operators_data_context.expectations_store.has_key(
ExpectationSuiteIdentifier("param_suite")
)

res = validation_operators_data_context.run_validation_operator(
"store_val_res_and_extract_eval_params",
assets_to_validate=[
(
validation_operators_data_context.build_batch_kwargs(
"my_datasource", "subdir_reader", "f1"
),
"param_suite",
)
],
evaluation_parameters={
"urn:great_expectations:validations:source_patient_data.default:expect_table_row_count_to_equal.result"
".observed_value": 3
},
)

assert (
res["success"] is True
), "No exception thrown, validation operators ran successfully"


@mock_s3
def test_tuple_s3_store_backend_slash_conditions():
bucket = "my_bucket"
Expand Down

0 comments on commit 82b3e29

Please sign in to comment.