Skip to content

Commit

Permalink
Merge pull request #633 from great-expectations/refactor/convert_eval…
Browse files Browse the repository at this point in the history
…uation_parameter_to_store

Refactor/convert evaluation parameter to store
  • Loading branch information
jcampbell committed Aug 21, 2019
2 parents 298af12 + 0ccd1c4 commit 459cdef
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 147 deletions.
2 changes: 1 addition & 1 deletion great_expectations/data_asset/data_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ def validate(self,
# So, we load them in reverse order

if data_context is not None:
runtime_evaluation_parameters = data_context.bind_evaluation_parameters(run_id) # , expectation_suite)
runtime_evaluation_parameters = data_context.get_parameters_in_evaluation_parameter_store_by_run_id(run_id)
else:
runtime_evaluation_parameters = {}

Expand Down
145 changes: 39 additions & 106 deletions great_expectations/data_context/data_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ def __init__(self, project_config, context_root_dir, data_asset_name_delimiter='
# Stuff below this comment is legacy code, not yet fully converted to new-style Stores.
self.data_doc_directory = os.path.join(self.root_directory, "uncommitted/documentation")

self._load_evaluation_parameter_store()
self._compiled = False
# /End store stuff

Expand Down Expand Up @@ -537,78 +536,6 @@ def get_datasource(self, datasource_name="default"):
self._datasources[datasource_name] = datasource
return datasource

def _load_evaluation_parameter_store(self):
"""Load the evaluation parameter store to use for managing cross data-asset parameterized expectations.
By default, the Context uses an in-memory parameter store only suitable for evaluation on a single node.
Returns:
None
"""
# This is a trivial class that implements in-memory key value store.
# We use it when user does not specify a custom class in the config file
class MemoryEvaluationParameterStore(object):
def __init__(self):
self.dict = {}

def get(self, run_id, name):
if run_id in self.dict:
return self.dict[run_id][name]
else:
return {}

def set(self, run_id, name, value):
if run_id not in self.dict:
self.dict[run_id] = {}
self.dict[run_id][name] = value

def get_run_parameters(self, run_id):
if run_id in self.dict:
return self.dict[run_id]
else:
return {}

#####
#
# If user wishes to provide their own implementation for this key value store (e.g.,
# Redis-based), they should specify the following in the project config file:
#
# evaluation_parameter_store:
# type: demostore
# config: - this is optional - this is how we can pass kwargs to the object's c-tor
# param1: boo
# param2: bah
#
# Module called "demostore" must be found in great_expectations/plugins/store.
# Class "GreatExpectationsEvaluationParameterStore" must be defined in that module.
# The class must implement the following methods:
# 1. def __init__(self, **kwargs)
#
# 2. def get(self, name)
#
# 3. def set(self, name, value)
#
# We will load the module dynamically
#
#####
try:
config_block = self._project_config.get("evaluation_parameter_store")
if not config_block or not config_block.get("type"):
self._evaluation_parameter_store = MemoryEvaluationParameterStore()
else:
module_name = config_block.get("type")
class_name = "GreatExpectationsEvaluationParameterStore"

loaded_module = __import__(module_name, fromlist=[module_name])
loaded_class = getattr(loaded_module, class_name)
if config_block.get("config"):
self._evaluation_parameter_store = loaded_class(**config_block.get("config"))
else:
self._evaluation_parameter_store = loaded_class()
except Exception:
logger.exception("Failed to load evaluation_parameter_store class")
raise

def list_expectation_suites(self):
"""Returns currently-defined expectation suites available in a nested dictionary structure
reflecting the namespace provided by this DataContext.
Expand Down Expand Up @@ -921,19 +848,6 @@ def save_expectation_suite(self, expectation_suite, data_asset_name=None, expect
json.dump(expectation_suite, outfile, indent=2)
self._compiled = False

def bind_evaluation_parameters(self, run_id): # , expectations):
"""Return current evaluation parameters stored for the provided run_id, ready to be bound to parameterized
expectation values.
Args:
run_id: the run_id for which to return evaluation parameters
Returns:
evaluation_parameters (dict)
"""
# TOOO: only return parameters requested by the given expectations
return self._evaluation_parameter_store.get_run_parameters(run_id)

def register_validation_results(self, run_id, validation_results, data_asset=None):
"""Process results of a validation run. This method is called by data_asset objects that are connected to
a DataContext during validation. It performs several actions:
Expand Down Expand Up @@ -1002,6 +916,17 @@ def register_validation_results(self, run_id, validation_results, data_asset=Non
value=data_asset
)

self.extract_and_store_parameters_from_validation_results(
validation_results,
data_asset_name,
expectation_suite_name,
run_id,
)

return validation_results

def extract_and_store_parameters_from_validation_results(self, validation_results, data_asset_name, expectation_suite_name, run_id):

if not self._compiled:
self._compile()

Expand All @@ -1010,14 +935,15 @@ def register_validation_results(self, run_id, validation_results, data_asset=Non
"expectation_suite_name" not in validation_results["meta"]
):
logger.warning(
"Both data_asset_name ane expectation_suite_name must be in validation results to "
"Both data_asset_name and expectation_suite_name must be in validation results to "
"register evaluation parameters."
)
return validation_results
return

elif (data_asset_name not in self._compiled_parameters["data_assets"] or
expectation_suite_name not in self._compiled_parameters["data_assets"][data_asset_name]):
# This is fine; short-circuit since we do not need to register any results from this dataset.
return validation_results
return

for result in validation_results['results']:
# Unoptimized: loop over all results and check if each is needed
Expand All @@ -1027,7 +953,7 @@ def register_validation_results(self, run_id, validation_results, data_asset=Non
if (("column" in result['expectation_config']['kwargs']) and
("columns" in self._compiled_parameters["data_assets"][data_asset_name][expectation_suite_name][expectation_type]) and
(result['expectation_config']['kwargs']["column"] in
self._compiled_parameters["data_assets"][data_asset_name][expectation_suite_name][expectation_type]["columns"])):
self._compiled_parameters["data_assets"][data_asset_name][expectation_suite_name][expectation_type]["columns"])):

column = result['expectation_config']['kwargs']["column"]
# Now that we have a small search space, invert logic, and look for the parameters in our result
Expand All @@ -1036,9 +962,9 @@ def register_validation_results(self, run_id, validation_results, data_asset=Non
for desired_param in desired_parameters:
desired_key = desired_param.split(":")[-1]
if type_key == "result" and desired_key in result['result']:
self.store_validation_param(run_id, desired_param, result["result"][desired_key])
self.set_parameters_in_evaluation_parameter_store_by_run_id_and_key(run_id, desired_param, result["result"][desired_key])
elif type_key == "details" and desired_key in result["result"]["details"]:
self.store_validation_param(run_id, desired_param, result["result"]["details"])
self.set_parameters_in_evaluation_parameter_store_by_run_id_and_key(run_id, desired_param, result["result"]["details"])
else:
logger.warning("Unrecognized key for parameter %s" % desired_param)

Expand All @@ -1049,16 +975,17 @@ def register_validation_results(self, run_id, validation_results, data_asset=Non
for desired_param in desired_parameters:
desired_key = desired_param.split(":")[-1]
if type_key == "result" and desired_key in result['result']:
self.store_validation_param(run_id, desired_param, result["result"][desired_key])
self.set_parameters_in_evaluation_parameter_store_by_run_id_and_key(run_id, desired_param, result["result"][desired_key])
elif type_key == "details" and desired_key in result["result"]["details"]:
self.store_validation_param(run_id, desired_param, result["result"]["details"])
self.set_parameters_in_evaluation_parameter_store_by_run_id_and_key(run_id, desired_param, result["result"]["details"])
else:
logger.warning("Unrecognized key for parameter %s" % desired_param)

return validation_results

@property
def evaluation_parameter_store(self):
return self.stores[self._project_config.evaluation_parameter_store_name]

def store_validation_param(self, run_id, key, value):
def set_parameters_in_evaluation_parameter_store_by_run_id_and_key(self, run_id, key, value):
"""Store a new validation parameter.
Args:
Expand All @@ -1069,20 +996,29 @@ def store_validation_param(self, run_id, key, value):
Returns:
None
"""
self._evaluation_parameter_store.set(run_id, key, value)
run_params = self.get_parameters_in_evaluation_parameter_store_by_run_id(run_id)
run_params[key] = value
self.evaluation_parameter_store.set(run_id, run_params)

def get_validation_param(self, run_id, key):
"""Get a new validation parameter.
def get_parameters_in_evaluation_parameter_store_by_run_id(self, run_id):
"""Fetches all validation parameters for a given run_id.
Args:
run_id: run_id for desired value
run_id: current run_id
key: parameter key
value: parameter value
Returns:
value stored in evaluation_parameter_store for the provided run_id and key
None
"""
return self._evaluation_parameter_store.get(run_id, key)
if self.evaluation_parameter_store.has_key(run_id):
return copy.deepcopy(
self.evaluation_parameter_store.get(run_id)
)
else:
return {}

#TODO: Can we raname this to _compile_all_evaluation_parameters_from_expectation_suites or something similar?
def _compile(self):
"""Compiles all current expectation configurations in this context to be ready for result registration.
Expand Down Expand Up @@ -1270,8 +1206,6 @@ def write_resource(
*path_components
)
safe_mmkdir(os.path.dirname(path))
# print(path)
# print(path_components[1:])
with open(path, "w") as writer:
writer.write(resource)

Expand Down Expand Up @@ -1659,7 +1593,6 @@ def add_datasource(self, name, type_, **kwargs):
logger.debug("Starting DataContext.add_datasource")

super(DataContext, self).add_datasource(name, type_, **kwargs)
logger.debug("x")
self._save_project_config()


Expand Down
6 changes: 6 additions & 0 deletions great_expectations/data_context/store/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ def _set(self, key, value):
def list_keys(self):
raise NotImplementedError

def has_key(self, key):
raise NotImplementedError


class InMemoryStore(ContextAwareStore):
"""Uses an in-memory dictionary as a store.
Expand All @@ -134,6 +137,9 @@ def _set(self, key, value):
def list_keys(self):
return self.store.keys()

def has_key(self, key):
return key in self.store


class FilesystemStore(ContextAwareStore):
"""Uses a local filepath as a store.
Expand Down
1 change: 1 addition & 0 deletions great_expectations/data_context/templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
plugins_directory: plugins/
expectations_directory: expectations/
evaluation_parameter_store_name: evaluation_parameter_store
# Configure additional data context options here.
Expand Down
3 changes: 3 additions & 0 deletions great_expectations/data_context/types/configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class DataContextConfig(Config):
_allowed_keys = set([
"plugins_directory",
"expectations_directory",
"evaluation_parameter_store_name",
"datasources",
"stores",
"data_docs", # TODO: Rename this to sites, to remove a layer of extraneous nesting
Expand All @@ -36,6 +37,7 @@ class DataContextConfig(Config):
_required_keys = set([
"plugins_directory",
"expectations_directory",
"evaluation_parameter_store_name",
"datasources",
"stores",
"data_docs",
Expand All @@ -44,6 +46,7 @@ class DataContextConfig(Config):
_key_types = {
"plugins_directory": string_types,
"expectations_directory": string_types,
"evaluation_parameter_store_name": string_types,
"datasources": dict,
"stores": dict,
"data_docs": dict,
Expand Down
6 changes: 5 additions & 1 deletion tests/data_context/test_configuration_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ def test_preserve_comments(data_context):
assert content == """\
plugins_directory: plugins/
expectations_directory: expectations/
stores: {}
stores:
evaluation_parameter_store:
module_name: great_expectations.data_context.store
class_name: InMemoryStore
datasources:
# For example, this one.
mydatasource:
Expand All @@ -74,6 +77,7 @@ def test_preserve_comments(data_context):
data_asset_type:
class_name: PandasDataset
type: pandas
evaluation_parameter_store_name: evaluation_parameter_store
data_docs:
sites:
"""
Expand Down

0 comments on commit 459cdef

Please sign in to comment.