# Great Expectations End-to-End: Pipeline Portion

This notebook demonstrates integration with a python-pandas pipeline

In [1]:
import json
import os
import great_expectations as ge
import great_expectations.jupyter_ux
import pandas as pd

In [2]:
# Prep environment
import json
import os
import logging

import uuid # used to generate run_id

from datetime import datetime

import great_expectations as ge
import numpy as np
import pandas as pd

# Prep logger
import tzlocal

def posix2local(timestamp, tz=tzlocal.get_localzone()):
    """Seconds since the epoch -> local time as an aware datetime object."""
    return datetime.fromtimestamp(timestamp, tz)

class Formatter(logging.Formatter):
    def converter(self, timestamp):
        return posix2local(timestamp)

    def formatTime(self, record, datefmt=None):
        dt = self.converter(record.created)
        if datefmt:
            s = dt.strftime(datefmt)
        else:
            t = dt.strftime(self.default_time_format)
            s = self.default_msec_format % (t, record.msecs)
        return s

logger = logging.getLogger('validation_notebook')
chandler = logging.StreamHandler()
chandler.setLevel(logging.DEBUG)
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# chandler.setFormatter(formatter)
chandler.setFormatter(Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%dT%H:%M:%S%z"))


logger.addHandler(chandler)
logger.setLevel(logging.INFO)

# Filter warnings
import warnings
warnings.filterwarnings('ignore')


## Create a DataContext object

First, we need to create a `DataContext` object - it represents Great Expectations in your data pipeline.
We are passing '../../' to this object to let it know where to find its configuration. No need to modify this line


In [3]:
# context = ge.data_context.DataContext('../../', expectation_explorer=True)
context = ge.data_context.DataContext('../../', expectation_explorer=False)

## Data source


Data sources are locations where your pipeline reads its input data from. In our case, it is a directory on the local file system.

When you ran `great_expectations init` in your project, you configured a data source of type "pandas" and gave it a name.


In [4]:
data_source_name = great_expectations.jupyter_ux.set_data_source(context)

In [5]:
#data_source_name = ???

In [6]:
data_source_name

'201810'

In [7]:
great_expectations.jupyter_ux.list_available_data_asset_names(context, data_source_name=data_source_name)

{'upkmemberkeys', '.DS_Store', 'provider', 'claim', 'providersupplemental', 'memberenrollment', 'member', 'claimcode'}


## WIP

When you were creating expectations 

some continuous process that produces them... 

a pipeline is running 

orders 

a new file appears periodically. The file contains a new batch of order records

What does a valid batch of orders look like? What can my code assume about it?

You pointed GE at a directory that contained several files representing batches of this data asset

Let's assume that your pipeline processes new files (new batches) when they arrive.

The way to integrate validation into your pipeline 


you have a node in your DAG 
When you were implementing its logic you made some assumptions about its input data
(e.g., I assume that values in "date_of_birth" column are date sttrings in 'YYYY-MM-DD' format,
they are never null and they are in a reasonable range representing people of working age).
You encoded these assumptions as expectations.
This allows you not to complicate your code with checks of validity 
and you want to protect it from processing data that it was not designed to deal with.

When a new batch of orders (in a form of a CSV file) arrives, 
Detecting that the file is available and reading it into memory is something you already do.

you read the file into a Pandas data frame.
Before passing the data frame to your node, call Great Expectations to validate it.
If the data does not pass the validation, it means that the data violates assumptions made 
by the code. 




In [24]:
file_path_to_validate = '/Users/eugenemandel/project_data/clarify_payer/claim_data/ability_payer/staging/201810/claimcode/000000_0'

In [8]:
df = pd.read_csv(file_path_to_validate, sep="|")
df.head()

In [27]:
from pyspark.sql import SparkSession
from great_expectations.dataset import PandasDataset, SqlAlchemyDataset, SparkDFDataset
spark = SparkSession.builder.getOrCreate()
df = SparkDFDataset(spark.read.csv(file_path_to_validate, sep="|"))
df.spark_df.show()

+-----------+---------+----------+----------+---+---+------+---+----------+
|        _c0|      _c1|       _c2|       _c3|_c4|_c5|   _c6|_c7|       _c8|
+-----------+---------+----------+----------+---+---+------+---+----------+
|37359363450|217767923|2016-04-21|2016-04-21|  5|  0| C2617|  0|2018-10-25|
|37359363450|217767923|2016-04-21|2016-04-21| 16|  0|  0278|  0|2018-10-25|
|37359363450|217767923|2016-04-21|2016-04-21| 13|  1|   131|  1|2018-10-25|
|37359363450|217767923|2016-04-21|2016-04-21| 13|  0|   137|  0|2018-10-25|
|37359363450|217767923|2016-04-21|2016-04-21| 10|  0|    22|  0|2018-10-25|
|37359363450|217767923|2016-04-21|2016-04-21| 11|  1|    OA|  1|2018-10-25|
|37359363450|217767923|2016-04-21|2016-04-21| 11|  0|    OT|  1|2018-10-25|
|37359363450|217767923|2016-04-21|2016-04-21| 17|  5|Z87891|  0|2018-10-25|
|37359363450|217767923|2016-04-21|2016-04-21| 17|  2|   I10|  0|2018-10-25|
|37359363450|217767923|2016-04-21|2016-04-21| 17|  0|  N200|  0|2018-10-25|
|37359363450

# Begin Pipeline Run

In [28]:
# Generate a run-id that GE will use to key shared parameters
run_id = str(uuid.uuid1())


In [None]:
# results = ge.validate(df,
#                   data_context=context,
#                   data_asset_name='claimcode',
# #                   run_id=run_id,
# #                   only_return_failures=False,
# #                   save_dataset_on_failure=dataset_s3,
# #                   result_store=result_s3,
# #                   result_callback=slack_callback,
#                   )




# # results = ge.validate(df,
# #                   data_context=pipeline_data_context,
# #                   data_asset_name="",
# #                   run_id=run_id,
# #                   only_return_failures=False,
# #                   save_dataset_on_failure=dataset_s3,
# #                   result_store=result_s3,
# #                   result_callback=slack_callback,
# #                   )



In [12]:
# results

In [29]:
# df['5'] = df['5'] + 100

In [30]:
ge.validate(df,
                  data_context=context,
                  data_asset_name='claimcode',
                  run_id=run_id,
                  only_return_failures=False,
#                   save_dataset_on_failure=dataset_s3,
#                   result_store=result_s3,
#                   result_callback=slack_callback,
                  )

TypeError: unsupported operand type(s) for +: 'NoneType' and 'str'

In [31]:
run_id

'2fc10bda-87de-11e9-bc40-645aedea36fd'

In [None]:
more ../uncommitted/validations/72bfba34-87b7-11e9-ac4c-645aedea36fd/claimcode.json


In [None]:
! ls -al  ../uncommitted/snapshots/72bfba34-87b7-11e9-ac4c-645aedea36fd/



In [None]:
# GE: Validate prior to our transform
data_asset_name = 'source_diabetes_data'
logger.info("Validating dataset %s" % data_asset_name)
config = pipeline_data_context.get_data_asset_config(data_asset_name)

bucket = 'dev.maglev-design.projects.superconductivehealth.priv'
result_key = 'diabetes_pipeline/' + run_id + '/validation_results/' + data_asset_name + '.json'
failed_dataset_key = 'diabetes_pipeline/' + run_id + '/data_assets/' + data_asset_name + '.csv'

result_s3 = s3.Object(bucket, result_key)
dataset_s3 = s3.Object(bucket, failed_dataset_key)

logger.debug("Storing validation results to s3. bucket: %s; key: %s" % (bucket, result_key))
results = ge.validate(df,
                  expectations_config=config,
                  data_asset_type=DiagnosticCodesDataset,
                  run_id=run_id,
                  only_return_failures=False,
                  save_dataset_on_failure=dataset_s3,
                  result_store=result_s3,
                  result_callback=slack_callback,
                  data_context=pipeline_data_context)


if results["success"] != True:
    logger.warn("Validation included failures! Check results at " + results["meta"]["result_reference"])

In [None]:
results

In [None]:
results

In [None]:
# Complete pipeline
logger.info("Storing output data to %s" % ('./output/' + run_id + '/' + data_asset_name + '.csv'))
os.makedirs("./output/" + run_id, exist_ok=True)
df.to_csv('./output/' + run_id + '/' + data_asset_name + '.csv')
del df