# Cookbook 2: Validate data during ingestion (take action on failures)

This cookbook showcases a sample GX data validation workflow characteristic of data ingestion at the start of the data pipeline. Data is loaded into a Pandas DataFrame, cleaned, validated, invalid data is identified and removed, and then valid data is ingested into a Postgres database table.

This cookbook explores the validation workflow first in a notebook setting, then embedded within an Airflow pipeline. Airflow pipelines are also referred to as directed acyclic graphs, or DAGs.

This cookbook features a scenario in which a subset of data fails validation and must be handled in the pipeline.

This cookbook builds on [Cookbook 1: Validate data during ingestion (happy path)](Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb) and focuses on how data validation failures can be programmatically handled in the pipeline based on GX Validation Results. This cookbook assumes basic familiarity with GX Core workflows; for a step-by-step explanation of the GX data validation workflow, refer to [Cookbook 1](Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb). 

## Imports

This tutorial features the `great_expectations` library.

The `tutorial_code` module contains helper functions used within this notebook and the associated Airflow pipeline.

The `airflow_dags` submodule is included so that you can inspect the code used in the related Airflow DAG directly from this notebook.

In [None]:
import pathlib
import inspect
import time

import great_expectations as gx
import great_expectations.expectations as gxe
import pandas as pd

import tutorial_code as tutorial
import airflow_dags.cookbook2_validate_and_handle_invalid_data as dag

## Load raw data

In this tutorial, you will clean and validate a dataset containing synthesized product data. The data is loaded from a CSV file into a Pandas DataFrame.

In [None]:
DATA_DIR = pathlib.Path("/cookbooks/data/raw")

df_products_raw = pd.read_csv(DATA_DIR / "products.csv", encoding="unicode_escape")

In [None]:
print(f"Loaded {df_products_raw.shape[0]} product rows into dataframe.\n")

display(df_products_raw.head())

## Examine destination tables

The product data will be normalized and loaded into multiple Postgres tables:
* `products`
* `product_category`
* `product_subcategory`

Examine the schema of the destination tables and compare to the initial schema and contents of the raw product data.

In [None]:
tutorial.db.get_table_schema(table_name="products")

In [None]:
tutorial.db.get_table_schema(table_name="product_category")

In [None]:
tutorial.db.get_table_schema(table_name="product_subcategory")

## Clean product data

To clean the product data and separate it into three DataFrames to normalize the data, you will use a pre-prepared function, `clean_product_data`. The cleaning code is displayed below, and then invoked to clean the raw product data.

In [None]:
%pycat inspect.getsource(tutorial.cookbook2.clean_product_data)

In [None]:
df_products, df_product_categories, df_product_subcategories = (
    tutorial.cookbook2.clean_product_data(df_products_raw)
)

In [None]:
print(f"Loaded {df_products.shape[0]} cleaned product rows.\n")

df_products.head()

In [None]:
print(f"Loaded {df_product_categories.shape[0]} cleaned product category rows.\n")

df_product_categories.head()

In [None]:
print(f"Loaded {df_product_subcategories.shape[0]} cleaned product subcategory rows.\n")

df_product_subcategories.head()

## GX data validation workflow

You will validate the cleaned product data using GX prior to loading it into a Postgres database table.

The GX data validation workflow was introduced in [Cookbook 1](Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb), which provided a walkthrough of the following GX components:
* Data Context
* Data Source
* Data Asset
* Batch Definition
* Batch
* Expectation
* Expectation Suite
* Validation Result

This cookbook will extend the GX validation workflow to include the Validation Definition and Checkpoint components, and will further explore the validation metadata returned in the Validation Result.

This tutorial contains concise explanations of GX components and workflows. For more detail, visit the [Introduction to GX Core](https://docs.greatexpectations.io/docs/core/introduction/) in the GX docs.

### Set up the GX validation workflow

Create a data validation workflow, up to Expectation Suite definition, that expects the following of your product data.
* Expect that the product dataset contains the following columns, in the specified order
* Expect that all product unit prices are at least $1 USD
* Expect that all products have a higher unit price than unit cost

```{admonition} Reminder: Adding GX components to the Data Context
GX components are unique on name. Once a component is created with the Data Context, adding another component with the same name will cause an error. To enable repeated execution of cookbook cells that add GX workflow components, you will see the following pattern:

    try:
        Add a new component(s) to the context
    except:
        Get component(s) from the context by name, or delete and recreate the component(s)
```

In [None]:
# Create the Data Context.
context = gx.get_context()

# Create the Data Source, Data Asset, and Batch Definition.
try:
    data_source = context.data_sources.add_pandas("pandas")
    data_asset = data_source.add_dataframe_asset(name="customer data")
    batch_definition = data_asset.add_batch_definition_whole_dataframe(
        "batch definition"
    )

except:
    data_source = context.data_sources.get("pandas")
    data_asset = data_source.get_asset(name="customer data")
    batch_definition = data_asset.get_batch_definition("batch definition")

# Get the Batch from the Batch Definition.
batch = batch_definition.get_batch(batch_parameters={"dataframe": df_products})

# Create the Expectation Suite.
try:
    expectation_suite = context.suites.add(
        gx.ExpectationSuite(name="product expectations")
    )
except:
    expectation_suite = context.suites.delete(name="product expectations")
    expectation_suite = context.suites.add(
        gx.ExpectationSuite(name="product expectations")
    )

expectations = [
    gxe.ExpectTableColumnsToMatchOrderedList(
        column_list=[
            "product_id",
            "name",
            "brand",
            "color",
            "unit_cost_usd",
            "unit_price_usd",
            "product_category_id",
            "product_subcategory_id",
        ]
    ),
    gxe.ExpectColumnValuesToBeBetween(column="unit_price_usd", min_value=1.0),
    gxe.ExpectColumnPairValuesAToBeGreaterThanB(
        column_A="unit_price_usd", column_B="unit_cost_usd"
    ),
]

for expectation in expectations:
    expectation_suite.add_expectation(expectation)

### Extend the validation workflow to include Validation Definition and Checkpoint

A **Validation Definition** pairs a Batch Definition with an Expectation Suite. It defines what data you want to validate using which Expectations.

In [None]:
# Create the Validation Definition.
try:
    validation_definition = context.validation_definitions.add(
        gx.ValidationDefinition(
            name="product validation definition",
            data=batch_definition,
            suite=expectation_suite,
        )
    )
except:
    context.validation_definitions.delete(name="product validation definition")
    validation_definition = context.validation_definitions.add(
        gx.ValidationDefinition(
            name="product validation definition",
            data=batch_definition,
            suite=expectation_suite,
        )
    )

A **Checkpoint** executes data validation based on the specifications of the Validation Definition. 

Checkpoints return Checkpoint Result objects, which contain Validation Result objects for individual runs. Checkpoints also enable you to specify the level of detail that is returned in your Validation Results. This is done using the `result_format` parameter. 

For a comprehensive description on levels of detail offered by result format settings, visit [Choose result format](https://docs.greatexpectations.io/docs/core/trigger_actions_based_on_results/choose_a_result_format/) in the GX docs.

In the code below:
* The result format is set to `COMPLETE`, which returns all possible validation metadata in the Validation Result.
* `unexpected_index_column_names` is set to the product_id column, meaning that any unexpected (failing) rows will be identified by the `product_id` column in the Validation Result.

In [None]:
# Create Checkpoint.
try:
    checkpoint = context.checkpoints.add(
        gx.Checkpoint(
            name="checkpoint",
            validation_definitions=[validation_definition],
            result_format={
                "result_format": "COMPLETE",
                "unexpected_index_column_names": ["product_id"],
            },
        )
    )
except:
    context.checkpoints.delete(name="checkpoint")
    checkpoint = context.checkpoints.add(
        gx.Checkpoint(
            name="checkpoint",
            validation_definitions=[validation_definition],
            result_format={
                "result_format": "COMPLETE",
                "unexpected_index_column_names": ["product_id"],
            },
        )
    )

Next, run the Checkpoint. When validating DataFrame Data Assets, the DataFrame must be supplied to the Checkpoint at runtime.

In [None]:
checkpoint_result = checkpoint.run(batch_parameters={"dataframe": df_products})

The Checkpoint run returns a `CheckpointResult` object.

In [None]:
type(checkpoint_result)

## Examine Validation Result

The Validation Result object can be extracted from the Checkpoint Result object.

In [None]:
# Extract the Validation Result object from the Checkpoint results.
validation_result = checkpoint_result.run_results[
    list(checkpoint_result.run_results.keys())[0]
]

### Get summary information

The Validation Result `success` field indicates whether or not all Expectations passed.

In [None]:
validation_result["success"]

Another useful Validation Results summary field is `statistics`, which provides an overview of how many Expectations passed and how many failed.

In [None]:
validation_result["statistics"]

In [None]:
expectations_run = validation_result["statistics"]["evaluated_expectations"]
expectations_failed = validation_result["statistics"]["unsuccessful_expectations"]

print(
    f"{expectations_run} Expectations were run, {expectations_failed} Expectations failed."
)

### Retrieve results for individual Expectations 

The `results` field contains individual results for each Expectation. You can use the information contained in `results` (the level of detail of results is specified by the Checkpoint `result_format` parameter) to identify why Expectations failed, and what rows failed validation.

In [None]:
failed_expectations = []
for result in validation_result["results"]:
    if result["success"] is False:
        failed_expectations.append(result)

If you examine the results of one of the failed Expectation, you can see that the Validation Result provides a `unexpected_index_list` field containing a list of the values that failed validation. Each element of the list represents a row in the original dataset, and the fields present are those directly used in the Expectation (for example, `unit_price_usd` and `unit_cost_usd` for the failed `ExpectColumnPairValuesAToBeGreaterThanB` Expectation), and any other fields specified using the `result_format` `unexpected_index_column_names` field.

Based on the definition of the Checkpoint above, the records in the `unexpected_index_list` will also contain the `product_id` field, so that they can be used to identify the original rows in the Data Asset.

In [None]:
failed_expectations[0]

In [None]:
failed_expectations[1]

### Use the Validation Result to separate valid and invalid rows

You can use the metadata provided in the Validation Results `unexpected_index_list` to identify the original rows in the Data Asset, enabling separation of valid and invalid rows.

As an example, extract the `product_ids` of the product rows that failed the first Expectation and use those ids to create two separate dataframes, one containing valid product rows and one containing the rows that failed the Expectation.

In [None]:
# Show the metadata contained in the Validation Resultsunexpected_index_list field
# for the failing Expectation.
failed_expectations[0]["result"]["unexpected_index_list"]

In [None]:
# Extract the product ids.
product_ids_for_invalid_rows = [
    x["product_id"] for x in failed_expectations[0]["result"]["unexpected_index_list"]
]

product_ids_for_invalid_rows

In [None]:
# Pull out bad rows from original product dataset.
df_invalid_rows = df_products[
    df_products["product_id"].isin(product_ids_for_invalid_rows)
].reset_index(drop=True)

display(df_invalid_rows)

In [None]:
# Drop the invalid rows from the product data.
df_products_validated = df_products.drop(
    df_products[df_products["product_id"].isin(product_ids_for_invalid_rows)].index
).reset_index(drop=True)

# Verify that product data contains the correct number of rows.
assert df_products_validated.shape[0] == (
    df_products.shape[0] - len(product_ids_for_invalid_rows)
)

display(df_products_validated.head())

## Integrate GX validation and programmatic handling of invalid rows in the Airflow DAG

You will use the `success` and `unexpected_index_list` metadata of the GX Validation Result object to control the actions of the `cookbook2_validate_and_handle_invalid_data` Airflow pipeline.

### Inspect DAG code

Examine the DAG code below that defines the `cookbook2_validate_and_handle_invalid_data` pipeline.

The DAG code cleans the incoming product data and then validates the data. Rows that pass validation are written to Postgres, but rows that fail validation trigger an action in the pipeline:
* Failing product category and subcategory rows cause the pipeline to raise an error and halt.
* Failing product rows are automatically separated from the valid rows and written to an error file (`bad_product_rows.csv`).

In [None]:
%pycat inspect.getsource(dag)

## View the Airflow pipeline

To view the `cookbook2_validate_and_handle_invalid_data` pipeline in the Airflow UI, log into the locally running Airflow instance.

1. Open [http://localhost:8080/](http://localhost:8080/) in a browser window.
2. Log in with these credentials:
  * Username: `admin`
  * Password: `gx`

You will see the pipeline under **DAGs** on login.

![Log in to tutorial Airflow UI](static/images/cookbook1_log_in_to_airflow_ui.gif)

You can trigger the DAG from this notebook, using the provided convenience function in the cell below, or you can trigger the DAG manually in the Airflow UI.

In [None]:
tutorial.airflow.trigger_airflow_dag_and_wait_for_run("cookbook2_validate_and_handle_invalid_data")

To trigger the `cookbook2_validate_and_handle_invalid_data` DAG from the Airflow UI, click the **Trigger DAG** button (with a play icon) under Actions. This will queue the DAG and it will execute shortly. The successful run is indicated by the run count inside the green circle under Runs.

![Trigger the Airflow DAG](static/images/cookbook1_trigger_dag.gif)

The `cookbook2_validate_and_handle_invalid_data` DAG can be rerun multiple times; you can experiment with running it from this notebook or from the Airflow UI. The pipeline insert ignores into the Postgres `products`, `product_category`, and `product_subcategory` tables, meaning that it will not attempt to insert a row with the same primary key as an existing row.

## View pipeline results

Once the pipeline has been run, the `products` table is populated with the cleaned product data, a total of 15,266 rows. You can view the updated table counts and a sampling of rows below.

* `products` is populated with 2510 rows
* `product_category` is populated with 8 rows 
* `product_subcategory` is populated with 32 rows

In [None]:
tutorial.db.get_table_row_count(table_name="products")

In [None]:
pd.read_sql_query(
    "select * from products limit 5", con=tutorial.db.get_local_postgres_engine()
)

In [None]:
tutorial.db.get_table_row_count(table_name="product_category")

In [None]:
pd.read_sql_query(
    "select * from product_category limit 5",
    con=tutorial.db.get_local_postgres_engine(),
)

In [None]:
tutorial.db.get_table_row_count(table_name="product_subcategory")

In [None]:
pd.read_sql_query(
    "select * from product_subcategory limit 5",
    con=tutorial.db.get_local_postgres_engine(),
)

Additionally, you can see that the invalid rows were written to the error file.

It can also be helpful to view the pipeline logs to investigate the details of a successful (or unsuccessful run). To examine these logs in the Airflow UI:
1. On the DAGs screen, click on the run(s) of interest under Runs.
2. Click the name of the individual run you want to examine. This will load the DAG execution details.
3. Click the Graph tab, and then the `cookbook2_validate_and_handle_invalid_data` task box on the visual rendering.
4. Click the Logs tab to load the DAG logs.

You can see in the screen capture below that the logs reflect the row insertion print statement that was included in the DAG code.

![Check logs for successful pipeline run](static/images/cookbook1_check_pipeline_logs.gif)

## Summary

This cookbook has walked you through the process of validating data using GX, integrating the data validation workflow in an Airflow pipeline, and then programmatically handling invalid data in the pipeline when validation fails.

[Cookbook 1](Cookbook_1_Validate_data_during_ingestion_happy_path.ipynb) and Cookbook 2 (this notebook) have focused on usage of [GX Core](https://docs.greatexpectations.io/docs/core/introduction/) to implement data validation in a data pipeline. Subsequent cookbooks will explore integrating [GX Cloud](https://docs.greatexpectations.io/docs/cloud/overview/gx_cloud_overview), GX Core, and an Airflow data pipeline to achieve end-to-end data validation workflows that make validation results available and shareable in the GX Cloud web UI.