# hello, great expectations

In [3]:
train = ge.read_csv("./train.csv")
train.expect_column_values_to_not_be_null("type")
train.save_expectation_suite("./my_expectations.json")

In [4]:
test = ge.read_csv("./train.csv")
validation_results = test.validate(expectation_suite="my_expectations.json")


In [5]:
if validation_results["success"]:
    print("giddy up!")
else:
    raise Exception("Oh shit.")

giddy up!


In [7]:
from great_expectations.data_asset import DataAsset
from great_expectations.dataset import SqlAlchemyDataset

import numpy as np
import scipy.stats as stats
import sqlalchemy as sa

class CustomSqlAlchemyDataset(SqlAlchemyDataset):

    _data_asset_type = "CustomSqlAlchemyDataset"

    @DataAsset.expectation(["column", "distribution", "p_value"])
    def expect_column_parameterized_distribution_ks_test_p_value_to_be_greater_than(
            self,
            column,
            distribution,
            p_value=0.05,
            params=None,
            result_format=None,
            include_config=True,
            catch_exceptions=None,
            meta=None
    ):
        if p_value <= 0 or p_value >= 1:
            raise ValueError("p_value must be between 0 and 1")

        positional_parameters = (params['s'], params['loc'], params['scale'])

        rows = self.engine.execute(sa.select([
            sa.column(column)
        ]).select_from(self._table)).fetchall()

        column = np.array([col[0] for col in rows])

        # K-S Test
        ks_result = stats.kstest(column, distribution, args=positional_parameters)

        return {
            "success": ks_result[1] >= p_value,
            "result": {
                "observed_value": ks_result[1],
                "details": {
                    "expected_params": params,
                    "observed_ks_result": ks_result
                }
            }
        }

In [8]:
import json
import os

from great_expectations.dataset import SqlAlchemyDataset
import requests
from sqlalchemy import create_engine

# 1) Load in a new batch of data
db_string = "postgres://{user}:{password}@{host}:{port}/{dbname}".format(
    user=os.environ["DB_USER"],
    password=os.environ["DB_PASSWORD"],
    port=os.environ["DB_PORT"],
    dbname=os.environ["DB_DBNAME"],
    host=os.environ["DB_HOST"],
)
db_engine = create_engine(db_string)
query = """
    SELECT *
    FROM
        listings
    WHERE
        DATE_TRUNC('day', created_at) = CURRENT_DATE - INTERVAL '1' DAY
"""
recent_listings = SqlAlchemyDataset(custom_sql=query, engine=db_engine)

# 2) Validate it against stored expectation suite
validation_results = recent_listings.validate(
    expectation_suite="listings_expectations.json"
)

# 3) Take action based on the validation results
if not validation_results["success"]:
    num_evaluated = validation_results["statistics"]["evaluated_expectations"]
    num_successful = validation_results["statistics"]["successful_expectations"]
    validation_results_text = json.dumps(
        [result.to_json_dict() for result in validation_results["results"]],
        sort_keys=True,
        indent=4,
    )
    slack_data = {
        "text": (
            f"⚠️ Dataset has failed expecations\n"
            f"*Successful Expectations*: `{num_successful}/{num_evaluated}`\n"
            f"*Results*: ```\n{validation_results_text}\n```"
        )
    }

    response = requests.post(
        os.environ['SLACK_WEBHOOK'],
        data=json.dumps(slack_data),
        headers={"Content-Type": "application/json"},
    )

KeyError: 'DB_USER'