Skip to content

Commit

Permalink
Merge pull request #3 from superconductive/dev_eug_0
Browse files Browse the repository at this point in the history
Changed DataContext to class to represent a project - a collection of dataset types
  • Loading branch information
jcampbell committed May 3, 2019
2 parents 428d328 + 936a794 commit 2680245
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 43 deletions.
73 changes: 51 additions & 22 deletions great_expectations/data_asset/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@ def __init__(self, *args, **kwargs):
parameter not obvious from the signature.
"""
interactive_evaluation = kwargs.pop("interactive_evaluation", True)
autoinspect_func = kwargs.pop("autoinspect_func", None)
initial_config = kwargs.pop("config", None)
data_asset_name = kwargs.pop("data_asset_name", None)

super(DataAsset, self).__init__(*args, **kwargs)
self._initialize_expectations(config=initial_config, data_asset_name=data_asset_name)
if autoinspect_func is not None:
autoinspect_func(self)
self._interactive_evaluation = interactive_evaluation


def autoinspect(self, autoinspect_func=columns_exist):
autoinspect_func(self)
Expand Down Expand Up @@ -157,21 +159,29 @@ def wrapper(self, *args, **kwargs):
exception_message = None

# Finally, execute the expectation method itself
try:
return_obj = func(self, **evaluation_args)
if self._interactive_evaluation == True:
try:
return_obj = func(self, **evaluation_args)

except Exception as err:
if catch_exceptions:
raised_exception = True
exception_traceback = traceback.format_exc()
exception_message = str(err)

return_obj = {
"success": False
}

else:
raise(err)

# Add a "success" object to the config
expectation_config["success_on_last_run"] = return_obj["success"]

except Exception as err:
if catch_exceptions:
raised_exception = True
exception_traceback = traceback.format_exc()
exception_message = str(err)
else:
return_obj = {"message": "expecatation stored"}

return_obj = {
"success": False
}

else:
raise(err)

# Append the expectation to the config.
self._append_expectation(expectation_config)
Expand All @@ -187,9 +197,6 @@ def wrapper(self, *args, **kwargs):
"exception_traceback": exception_traceback
}

# Add a "success" object to the config
expectation_config["success_on_last_run"] = return_obj["success"]

# Add meta to return object
if meta is not None:
return_obj['meta'] = meta
Expand Down Expand Up @@ -807,6 +814,10 @@ def validate(self,
Raises:
AttributeError - if 'catch_exceptions'=None and an expectation throws an AttributeError
"""
validate__interactive_evaluation = self._interactive_evaluation
if self._interactive_evaluation == False:
# Turn this off for an explicit call to validate
self._interactive_evaluation = True

results = []

Expand Down Expand Up @@ -896,16 +907,29 @@ def validate(self,

statistics = _calc_validation_statistics(results)

if data_context is not None:
data_context.register_validation_results(run_id, results)

if only_return_failures:
abbrev_results = []
for exp in results:
if exp["success"] == False:
abbrev_results.append(exp)
results = abbrev_results

# TODO: refactor this once we've settled on the correct naming convetion everywhere
data_asset_name = None
if "data_asset_name" in expectations_config:
data_asset_name = expectations_config["data_asset_name"]
elif "dataset_name" in expectations_config:
data_asset_name = expectations_config["dataset_name"]
elif "meta" in expectations_config:
if "data_asset_name" in expectations_config["meta"]:
data_asset_name = expectations_config["meta"]["data_asset_name"]
elif "dataset_name" in expectations_config["meta"]:
data_asset_name = expectations_config["meta"]["dataset_name"]





result = {
"results": results,
"success": statistics.success,
Expand All @@ -917,10 +941,13 @@ def validate(self,
},
"meta": {
"great_expectations.__version__": __version__,
"data_asset_name": expectations_config["data_asset_name"] if "data_asset_name" in expectations_config else None
"data_asset_name": data_asset_name
}
}

if data_context is not None:
data_context.register_validation_results(run_id, result)

if evaluation_parameters is not None:
result.update({"evaluation_parameters": evaluation_parameters})

Expand All @@ -946,6 +973,8 @@ def validate(self,
if result_callback is not None:
result_callback(result)

self._interactive_evaluation = validate__interactive_evaluation

return result

def get_evaluation_parameter(self, parameter_name, default_value=None):
Expand Down Expand Up @@ -985,7 +1014,7 @@ def set_data_asset_name(self, data_asset_name):

def get_data_asset_name(self):
"""Gets the current name of this data_asset as stored in the expectations configuration."""
if "data_asset_name" in self.expectations_config:
if "data_asset_name" in self._expectations_config:
return self._expectations_config['data_asset_name']
else:
return None
Expand Down
1 change: 1 addition & 0 deletions great_expectations/data_context/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .pandas_context import PandasCSVDataContext
from .sqlalchemy_context import SqlAlchemyDataContext
from .base import DataContext


def get_data_context(context_type, options, *args, **kwargs):
Expand Down
47 changes: 43 additions & 4 deletions great_expectations/data_context/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import os
import json
from great_expectations.version import __version__

class DataContext(object):
"""A generic DataContext, exposing the base API including constructor with `options` parameter, list_datasets,
and get_dataset.
Expand All @@ -9,10 +13,45 @@ def __init__(self, options, *args, **kwargs):
self.connect(options, *args, **kwargs)

def connect(self, options):
return NotImplementedError
self.directory = options
self.validation_params = {}

def list_datasets(self):
return NotImplementedError
def list_data_asset_configs(self):
return [os.path.splitext(os.path.basename(file_path))[0] for file_path in os.listdir(self.directory) if file_path.endswith('.json')]

def get_data_asset_config(self, data_asset_name):
config_file_path = os.path.join(self.directory, data_asset_name + '.json')
if os.path.isfile(config_file_path):
with open(os.path.join(self.directory, data_asset_name + '.json')) as json_file:
return json.load(json_file)
else:
#TODO (Eugene): Would it be better to return None if the file does not exist? Currently this method acts as
# get_or_create
return {
'data_asset_name': data_asset_name,
'meta': {
'great_expectations.__version__': __version__
},
'expectations': [],
}

def save_data_asset_config(self, data_asset_config):
data_asset_name = data_asset_config['data_asset_name']
config_file_path = os.path.join(self.directory, data_asset_name + '.json')
with open(config_file_path, 'w') as outfile:
json.dump(data_asset_config, outfile)

def bind_evaluation_parameters(self, run_id, expectations_config):
return self.validation_params

def register_validation_results(self, run_id, validation_results):
#TODO (Eugene): this is a demo implementation!!!
for result in validation_results['results']:
if result['expectation_config']['expectation_type'] == 'expect_column_unique_value_count_to_be_between'\
and result['expectation_config']['kwargs']['column'] == 'patient_nbr':
self.validation_params = {
"urn:great_expectations:validations:datasets:diabetes_data:expectations:expect_column_unique_value_count_to_be_between:columns:patient_nbr:result:observed_value": result['result']['observed_value']
}

def get_dataset(self, dataset_name):
def _compile(self):
return NotImplementedError
37 changes: 20 additions & 17 deletions great_expectations/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def _convert_to_dataset_class(df, dataset_class, expectations_config=None, autoi
"""
if expectations_config is not None:
# Cast the dataframe into the new class, and manually initialize expectations according to the provided configuration
df.__class__ = dataset_class
df = dataset_class(df)
df._initialize_expectations(expectations_config)
else:
# Instantiate the new Dataset with default expectations
Expand Down Expand Up @@ -182,7 +182,7 @@ def validate(data_asset, expectations_config, data_asset_type=None, *args, **kwa
raise ValueError("The validate util method only supports dataset validations, including custom subclasses. For other data asset types, use the object's own validate method.")

if not issubclass(type(data_asset), data_asset_type):
if isinstance(data_asset, pd.DataFrame) and issubclass(data_asset_type, dataset.PandasDataset):
if isinstance(data_asset, (pd.DataFrame)) and issubclass(data_asset_type, dataset.PandasDataset):
pass # This is a special type of allowed coercion
else:
raise ValueError("The validate util method only supports validation for subtypes of the provided data_asset_type.")
Expand All @@ -198,12 +198,12 @@ def send_slack_notification(validation_json=None):
"""
Post a slack notification.
"""
if "data_asset_name" in validation_json:
data_asset_name = validation_json['data_asset_name']
if "meta" in validation_json and "data_asset_name" in validation_json["meta"]:
data_asset_name = validation_json["meta"]["data_asset_name"]
else:
data_asset_name = "no_name_provided_" + str(uuid.uuid4())

timestamp = datetime.utcnow().timestamp()
timestamp = datetime.now().timestamp()
n_checks_succeeded = validation_json['statistics']['successful_expectations']
n_checks = validation_json['statistics']['evaluated_expectations']

Expand All @@ -216,28 +216,31 @@ def send_slack_notification(validation_json=None):
tada = " :tada:" if validation_json["success"] else ""
color = '#28a745' if validation_json["success"] else '#dc3545'

if "dataset_reference" in validation_json["meta"]:
path = validation_json["meta"]["dataset_reference"]
else:
path = None


session = requests.Session()

text = "{n_checks_succeeded} / {n_checks} expectations were met{tada}.\n".format(
n_checks_succeeded=n_checks_succeeded,
n_checks=n_checks,
tada=tada)

if "result_reference" in validation_json["meta"]:
text = text + "Validation report saved to {result_path}\n".format(
result_path=validation_json["meta"]["result_reference"])

if "dataset_reference" in validation_json["meta"]:
text = text + "Validated dataset saved to {dataset_reference}\n".format(
dataset_reference=validation_json["meta"]["dataset_reference"])

query = {
'attachments': [
{
'fallback': f'Validation of dataset "{data_asset_name}" is complete: {status}.',
'title': f'Dataset Validation Completed with Status "{status}"',
'title_link': run_id,
'pretext': f'Validated dataset "{data_asset_name}".',
'text': '{n_checks_succeeded}/{n_checks} Expectations '
'Were Met{tada}. Validation report saved to "{path}"'.format(
n_checks_succeeded=n_checks_succeeded,
n_checks=n_checks,
tada=tada,
run_id=run_id,
status=status,
path=path),
'text': text,
'color': color,
'footer': 'Great Expectations',
'ts': timestamp
Expand Down

0 comments on commit 2680245

Please sign in to comment.