# Integrate Data Validation Into Your Pipeline



In [None]:
# Prep environment and logging

import json
import great_expectations as ge
import great_expectations.jupyter_ux
from great_expectations.datasource.types import BatchKwargs
from datetime import datetime

great_expectations.jupyter_ux.setup_notebook_logging()



In [6]:
import sqlalchemy
from sqlalchemy import create_engine
def load_file_into_db(file_path, table_name):

    engine = create_engine('mysql://root:technion1@localhost/sch')

    load_data_sql_str = """
    load data local infile
    '{0:s}'
    into table {1:s}
    fields terminated by ',' lines terminated by '\n' ignore 1 lines""".format(file_path, table_name)

    with engine.connect() as con:

        rs = con.execute(load_data_sql_str)


In [15]:
import pandas as pd

new_npi_file_path = '/Users/eugenemandel/projects/ge_npi_demo/new_data/npidata_pfile_20050523-20191001_1.csv'
table_name = "npi_import_raw"

context = ge.data_context.DataContext()

# Generate a run id - a pipeline run id, a timestamp or any other string that is meaningful to you 
# and will help you refer to validation results. We recommend they be chronologically sortable.
run_id = datetime.utcnow().isoformat().replace(":", "") + "Z"

data_asset_name_0 = "npi_data__dir/default/npi_files" # TODO: replace with your value!
expectation_suite_name_0 = "warning" # TODO: replace with your value!

df = pd.read_csv(new_npi_file_path)
batch0 = context.get_batch(data_asset_name_0, expectation_suite_name_0, BatchKwargs(df=df))

results = context.run_validation_operator(
    assets_to_validate=[batch0],
    run_identifier=run_id,
    validation_operator_name="action_list_operator",
)

if not results["success"]:
    print("Failure!")
else:
    load_file_into_db(new_npi_file_path, table_name)
    data_asset_name_1 = "mysql_db/default/npi_import_raw"
    expectation_suite_name_1 = "warning"
    batch1 = context.get_batch(data_asset_name_1, expectation_suite_name_1, BatchKwargs(table=table_name))
results = context.run_validation_operator(
    assets_to_validate=[batch1],
    run_identifier=run_id,
    validation_operator_name="action_list_operator",
)
    



2019-10-07T17:17:31-0700 - INFO - 	2 expectation(s) included in expectation_suite.
2019-10-07T17:17:31-0700 - INFO - 	2 expectation(s) included in expectation_suite.
2019-10-07T17:17:31-0700 - INFO - 	1 expectation(s) included in expectation_suite.
2019-10-07T17:17:31-0700 - INFO - 	1 expectation(s) included in expectation_suite.


In [16]:
results

{'success': True,
 'details': {{'data_asset_name': mysql_db/default/npi_import_raw,
      'result': {'observed_value': 10000},
      'expectation_config': {'expectation_type': 'expect_table_row_count_to_equal',
      'exception_info': {'raised_exception': False,
       'exception_message': None,
       'exception_traceback': None}}],
    'success': True,
    'statistics': {'evaluated_expectations': 1,
     'successful_expectations': 1,
     'unsuccessful_expectations': 0,
     'success_percent': 100.0},
    'meta': {'great_expectations.__version__': '0.8.0a3+0.g48d38d56.dirty',
     'data_asset_name': 'mysql_db/default/npi_import_raw',
     'run_id': '2019-10-08T001730.397685Z',
     'batch_kwargs': {'table': 'npi_import_raw'}}},
   'actions_results': {'store_validation_result': {},
    'store_evaluation_params': {}}}}}

## Integrate data validation into your pipeline

[**Watch a short tutorial video**](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#video)


[**Read more in the tutorial**](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation)

**Reach out for help on** [**Great Expectations Slack**](https://greatexpectations.io/slack)




### Get a DataContext object


In [None]:
context = ge.data_context.DataContext()

### Get a pipeline run id

[Read more in the tutorial](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#set-a-run-id)


In [None]:
# Generate a run id - a pipeline run id, a timestamp or any other string that is meaningful to you 
# and will help you refer to validation results. We recommend they be chronologically sortable.
run_id = datetime.utcnow().isoformat().replace(":", "") + "Z"
run_id

### Choose data asset name and expectation suite name

[Read more in the tutorial](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#choose-data-asset-and-expectation-suite)


In [10]:
great_expectations.jupyter_ux.list_available_data_asset_names(context)

data_source: npi_data__dir (PandasDatasource)
  generator_name: default (SubdirReaderGenerator)
    generator_asset: npi_files
data_source: mysql_db (SqlAlchemyDatasource)
  generator_name: default (TableGenerator)
    generator_asset: npi_import_raw


In [None]:
data_asset_name_0 = "npi_data__dir/default/npi_files" # TODO: replace with your value!
expectation_suite_name_0 = "warning" # TODO: replace with your value!

### Obtain the batch to validate

Learn about `get_batch` in [this tutorial]](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#obtain-a-batch-to-validate)



##### If your pipeline processes Pandas Dataframes:

```
import pandas as pd
df = pd.read_csv(file_path_to_validate)
batch = context.get_batch(data_asset_name, expectation_suite_name, BatchKwargs(df=df))
batch.head()
```

##### If your pipeline processes Spark Dataframes:
```
from pyspark.sql import SparkSession
from great_expectations.dataset import PandasDataset, SqlAlchemyDataset, SparkDFDataset
spark = SparkSession.builder.getOrCreate()
df = SparkDFDataset(spark.read.csv(file_path_to_validate))
batch = context.get_batch(data_asset_name, expectation_suite_name, BatchKwargs(df=df))
batch.head()
```

##### If your pipeline processes SQL querues:

* A. To validate an existing table:

```
data_asset_name = 'USE THE TABLE NAME'
batch = context.get_batch(data_asset_name, 
                        expectation_suite_name=expectation_suite_name,
                        BatchKwargs(table=data_asset_name)) 
batch.head()
```

* B. To validate a query result set:

```
data_asset_name = 'USE THE NAME YOU SPECIFIED WHEN YOU CREATED THE EXPECTATION SUITE FOR THIS QUERY'
batch = context.get_batch(data_asset_name, 
                        expectation_suite_name=expectation_suite_name,
                        BatchKwargs(query='SQL FOR YOUR QUERY'))
batch.head()
```


In [None]:
import pandas as pd
file_path_to_validate = "../../new_data/npidata_pfile_20050523-20191001_1.csv"
df = pd.read_csv(file_path_to_validate)
batch0 = context.get_batch(data_asset_name_0, expectation_suite_name_0, BatchKwargs(df=df))
batch0.head()

### Validate the batch


[Read more in the tutorial](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#validate)



### Review the validation results

[Read more in the tutorial](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#review-validation-results)


### Validation Operators

The `validate` method evaluates one batch of data against one expectation suite and returns a dictionary of validation results. This is sufficient when you explore your data and get to know Great Expectations.
When deploying Great Expectations in a real data pipeline, you will typically discover additional needs:

* validating a group of batches that are logically related
* validating a batch against several expectation suites
* doing something with the validation results (e.g., saving them for a later review, sending notifications in case of failures, etc.).

Validation Operators provide a convenient abstraction for both bundling the validation of multiple expectation suites and the actions that should be taken after the validation.

[Read more about Validation Operators](https://docs.greatexpectations.io/en/latest/features/validation_operators_and_actions.html?utm_source=notebook&utm_medium=integrate_validation)




In [None]:
# This is an example of invoking a validation operator that is configured by default in the great_expectations.yml file

results = context.run_validation_operator(
    assets_to_validate=[batch],
    run_identifier=run_id,
    validation_operator_name="action_list_operator",
)

results