# A Brief Intro to Data Validation in Python with Pandera

## What is Data Validation?

- Verifying that data is appropriate for analysis
  - correct format?
  - right data types?
  - unique, no duplicates?
  - missing values?
  - data in expected range?
- Particularly important to automate in situations where you are ingesting new data

## Pandera: Data Validation Package for Python

- https://pandera.readthedocs.io/en/latest/index.html#
- Data validation package designed to work with dataframes
- Checks data in dataframes against schema (set of rules) 
- Schema (DataFrameSchema object) contains suite of tests that you specify
- You test data in dataframe against schema to see if it conforms to expectations
- Can be used with dataframes from:
  - pandas
  - polars
  - PySpark SQL
- A lot of functionality, we'll just cover basics here
- "validate" package for R is similar:
  - https://cran.r-project.org/web/packages/validate/vignettes/cookbook.html

In [None]:
# import needed packages/classes/functions
import json

import numpy as np
import pandas as pd
import pandera as pa
from pandera import Column, DataFrameSchema, Check, check_input, check_output, check_io

### We'll use the Titanic Dataset for this Brief Tutorial

In [None]:
# read in titanic dataset 
df = pd.read_csv('../datasets/titanic.csv')

In [None]:
# browse df
df.head(10)

In [None]:
# check data types of df
df.dtypes

## We'll Create a simple DataFrameSchema for the Titanic Dataset

### We'll use these datatypes:

- survived: int
- pclass: int
- name: str
- sex: str
- age: float
- fare: float
- sibsp: int
- parch: int

In [None]:
# define function to create simple DataFrameSchema for titanic dataset
def create_titanic_schema() -> DataFrameSchema:
    """Creates simple pandera DataFrameSchema for titanic dataset"""
    
    schema = DataFrameSchema(
                            {"survived": Column(int),
                             "pclass": Column(int),
                             "name": Column(str),
                             "sex": Column(str),
                             "age": Column(float),
                             "fare": Column(float),
                             "sibsp": Column(int),
                             "parch": Column(int)
                            }
                        )
    
    return schema

In [None]:
# create the titanic schema
schema = create_titanic_schema()

In [None]:
try:
    schema.validate(df)
    print('Validated!') # just to give us an affirmative output
except pa.errors.SchemaError as schema_error:
    print(schema_error)

## Column-Level Rules

### Testing for Presence/Absence of Columns
- required = True (default)
- required = False (allows absence of column)

In [None]:
# By default, all columns specified by schema are required
# implicit 'required = True'
# dropping a column raises error

try:
    schema.validate(df.drop('survived', axis = 1))
    print('Validated!')
except pa.errors.SchemaError as schema_error:
    print(schema_error)

In [None]:
# using required=False within  
# column definition makes it optional
schema = DataFrameSchema(
                            {"survived": Column(int, required=False),
                             "pclass": Column(int),
                             "name": Column(str),
                             "sex": Column(str),
                             "age": Column(float),
                             "fare": Column(float),
                             "sibsp": Column(int),
                             "parch": Column(int)
                            }
                        )

In [None]:
# after change to schema, we can validate the dataframe with missing column

try:
    schema.validate(df.drop('survived', axis = 1))
    print('Validated!')
except pa.errors.SchemaError as schema_error:
    print(schema_error)

### Testing for Data Type of Column

In [None]:
# What if I change the data type of the age column to str 
# and try to validate the df with our schema?
df['age'] = df['age'].astype(str)

try:
    schema.validate(df)
    print('Validated!')
except pa.errors.SchemaError as schema_error:
    print(schema_error)

In [None]:
# Let's revert the data type of pclass to be int as expected by schema
df['age'] = df['age'].astype(float)

try:
    schema.validate(df)
    print('Validated!')
except pa.errors.SchemaError as schema_error:
    print(schema_error)

### Testing for Null Values in Column
- by default, columns are non-nullable
- Can change this for given column in column definition 
  - nullable=True

In [None]:
# show there are no null values in df
df.isnull().sum()

In [None]:
df['age'].head()

In [None]:
# by default, null values won't validate
# let's create a null value to demonstrate this

df.loc[0, 'age'] = None
df['age'].head()

In [None]:
try:
    schema.validate(df)
    print('Validated!')
except pa.errors.SchemaError as schema_error:
    print(schema_error)

In [None]:
# use of nullable = True
# allows null values in column to pass validation
schema = DataFrameSchema(
                            {"survived": Column(int),
                             "pclass": Column(int),
                             "name": Column(str),
                             "sex": Column(str),
                             "age": Column(float, nullable=True),
                             "fare": Column(float),
                             "sibsp": Column(int),
                             "parch": Column(int)
                            }
                        )

In [None]:
try:
    schema.validate(df)
    print('Validated!')
except pa.errors.SchemaError as schema_error:
    print(schema_error)

In [None]:
# changing back to original value
df.loc[0, 'age'] = 22.0
df['age'].head()

### Testing for Duplicates in a Column
- by default, duplicate values within a column are allowed
- Can raise error when duplicate values appear in column:
  - unique = True

In [None]:
schema = DataFrameSchema(
                            {"survived": Column(int), 
                             "pclass": Column(int),
                             "name": Column(str),
                             "sex": Column(str),
                             "age": Column(float, unique=True),
                             "fare": Column(float),
                             "sibsp": Column(int),
                             "parch": Column(int)
                            }
                        )

try:
    schema.validate(df)
    print('Validated!')
except pa.errors.SchemaError as schema_error:
    print(schema_error)

### Testing that Values Are Greater Than/Less Than Specific Value
- Check.greater_than()
- Check.greater_than_or_equal_to()
- Check.less_than()
- Check.less_than_or_equal_to()

In [None]:
schema = DataFrameSchema(
                            {"survived": Column(int), 
                             "pclass": Column(int),
                             "name": Column(str),
                             "sex": Column(str),
                             "age": Column(float, Check.greater_than(0)),
                             "fare": Column(float),
                             "sibsp": Column(int),
                             "parch": Column(int)
                            }
                        )

try:
    schema.validate(df)
    print('Validated!')
except pa.errors.SchemaError as schema_error:
    print(schema_error)

### Testing that Values Are In a Specific Range

In [None]:
schema = DataFrameSchema(
                            {"survived": Column(int), 
                             "pclass": Column(int),
                             "name": Column(str),
                             "sex": Column(str),
                             "age": Column(float, Check.between(0, 125, include_min=True, include_max=True)),
                             "fare": Column(float),
                             "sibsp": Column(int),
                             "parch": Column(int)
                            }
                        )

try:
    schema.validate(df)
    print('Validated!')
except pa.errors.SchemaError as schema_error:
    print(schema_error)

### Testing that Values in Column are Members of Specified Set

In [None]:
schema = DataFrameSchema(
                            {"survived": Column(int), 
                             "pclass": Column(int, Check.isin([1, 2, 3])),
                             "name": Column(str),
                             "sex": Column(str),
                             "age": Column(float),
                             "fare": Column(float),
                             "sibsp": Column(int),
                             "parch": Column(int)
                            }
                        )

try:
    schema.validate(df)
    print('Validated!')
except pa.errors.SchemaError as schema_error:
    print(schema_error)

### Tests on Column of Strings

In [None]:
# Might want to check that names contain a title
schema = DataFrameSchema(
                            {"survived": Column(int), 
                             "pclass": Column(int),
                             "name": Column(str, Check.str_contains('Mr|Mrs|Ms|Miss|Master|Dr|Rev')),
                             "sex": Column(str),
                             "age": Column(float),
                             "fare": Column(float),
                             "sibsp": Column(int),
                             "parch": Column(int)
                            }
                        )

try:
    schema.validate(df)
    print('Validated!')
except pa.errors.SchemaError as schema_error:
    print(schema_error)

### Create Custom Tests 
- Can use lambda functions to create custom column checks

In [None]:
schema = DataFrameSchema(
                            {"survived": Column(int), 
                             "pclass": Column(int),
                             "name": Column(str),
                             "sex": Column(str),
                             "age": Column(float),
                             "fare": Column(float, Check(lambda x: (x >= 0) & (x < 500), 
                                                        error = "fare out of bounds")
                                            ),  # same as Check.between()
                             "sibsp": Column(int),
                             "parch": Column(int)
                            }
                        )

try:
    schema.validate(df)
    print('Validated!')
except pa.errors.SchemaError as schema_error:
    print(schema_error)

In [None]:
# define custom function to use as column check in DataFrameSchema
def within_3_std(series: pd.Series) -> pd.Series:
    """Check if values in col are within 3 sds of mean of col"""
    mean = series.mean()
    std = series.std()
    lower_bound = mean - 3*std
    upper_bound = mean + 3*std
    return series.between(lower_bound, upper_bound)

schema = DataFrameSchema(
                            {"survived": Column(int), 
                             "pclass": Column(int),
                             "name": Column(str),
                             "sex": Column(str),
                             "age": Column(float),
                             "fare": Column(float, Check(within_3_std)),
                             "sibsp": Column(int),
                             "parch": Column(int)
                            }
                        )

try:
    schema.validate(df)
    print('Validated!')
except pa.errors.SchemaError as schema_error:
    print(schema_error)

- Tests can also involve multiple columns in single test
- These are really dataframe-level tests
  - Use lambda functions on whole df (below the column dictionary)

In [None]:
# schema dataframe-level check across cols
schema = DataFrameSchema(
                            {"survived": Column(int), 
                             "pclass": Column(int),
                             "name": Column(str),
                             "sex": Column(str),
                             "age": Column(float),
                             "fare": Column(float),
                             "sibsp": Column(int),
                             "parch": Column(int)
                            },
                            Check(lambda df: (df["sibsp"] + df["parch"] <= 10),
                            error="Family size (sibsp + parch) is too large")
                        )

try:
    schema.validate(df)
    print('Validated!')
except pa.errors.SchemaError as schema_error:
    print(schema_error)

### More Sophisticated Tests on Columns are Possible

- Can use hypothesis and scipy packages for statistical tests
  - Useful for detecting data drift

## Dataframe-Level Tests
- Tests that apply to all columns/rows of dataframe

### Testing Presence/Absence of All Columns

- Columns not specified in schema aren't check
- Can add tests at dataframe level to check for presence/absence of columns
  - strict = True
  - Only columns listed in schema can be present

In [None]:
# add column not in our schema
df['is_child'] = df['age'] < 14

In [None]:
df.head(10)

In [None]:
# try to validate with new column (should validate successfully)
try:
    schema.validate(df)
    print('Validated!')
except pa.errors.SchemaError as schema_error:
    print(schema_error)

In [None]:
# add strict=True to schema
schema = DataFrameSchema(
                            {"survived": Column(int), 
                             "pclass": Column(int),
                             "name": Column(str),
                             "sex": Column(str),
                             "age": Column(float),
                             "fare": Column(float),
                             "sibsp": Column(int),
                             "parch": Column(int)
                            },
                            strict=True
                        )

# with strict = True, won't validate with unspecified column
try:
    schema.validate(df)
    print('Validated!')
except pa.errors.SchemaError as schema_error:
    print(schema_error)

In [None]:
# git rid of column not in schema
df = df.drop('is_child', axis=1)

### Testing Order of Cols

- Used for enforcing column order specified in schema
  - ordered = True

In [None]:
schema = DataFrameSchema(
                            {"survived": Column(int), 
                             "pclass": Column(int),
                             "name": Column(str),
                             "sex": Column(str),
                             "age": Column(float),
                             "fare": Column(float),
                             "sibsp": Column(int),
                             "parch": Column(int)
                            },
                            ordered = True
                        )

In [None]:
# change order of columns
# 'survived' col becomes last column
df = df[['pclass',
        'name',
        'sex',
        'age',
        'fare',
        'sibsp',
        'parch',
        'survived']]

In [None]:
# with ordered = True, new order of cols won't validate
try:
    schema.validate(df)
    print('Validated!')
except pa.errors.SchemaError as schema_error:
    print(schema_error)

In [None]:
# change order of columns
# back to match schema
df = df[['survived',
        'pclass',
        'name',
        'sex',
        'age',
        'fare',
        'sibsp',
        'parch']]

### Testing Uniqueness of Sets of Columns
- To make sure you don't have columns with duplicate values
- unique = ["col_1", "col_2"]  

## Pandera can attempt to infer a schema automatically

In [None]:
# have pandera automatically infer the DataFrameSchema from the dataframe
schema_inferred = pa.infer_schema(df)

# inspect schema
print(schema_inferred)

## Lazy Validation 
- Lazy Validation
  - Typically, you'll have many tests you want to perform on a dataframe
  - By default, the first failed test raises an error
  - Can set lazy=True in validate function to accumulate results of all tests and see all errors that have occurred
  - schema.validate(df, lazy=True)
  - Easiest to view accumulated results as json

In [None]:
schema = DataFrameSchema(
                            {"survived": Column(int), 
                             "pclass": Column(int, Check.isin([2, 3])),
                             "name": Column(str),
                             "sex": Column(str),
                             "age": Column(float, Check.between(5, 65, include_min=True, include_max=True)),
                             "fare": Column(float),
                             "sibsp": Column(int),
                             "parch": Column(int)
                            }
                        )

# toggle between lazy=False and lazy=True to see difference
try:
    schema.validate(df, lazy=True)
except pa.errors.SchemaErrors as e:
    print(json.dumps(e.message, indent=2))

## Generating Error Reports
- Can write error reports to json file

In [None]:
schema = DataFrameSchema(
                            {"survived": Column(int), 
                             "pclass": Column(int, Check.isin([2, 3])),
                             "name": Column(str),
                             "sex": Column(str),
                             "age": Column(float, Check.between(5, 65, include_min=True, include_max=True)),
                             "fare": Column(float),
                             "sibsp": Column(int),
                             "parch": Column(int)
                            }
                        )


try:
    schema.validate(df, lazy=True)
except pa.errors.SchemaErrors as e:
    json_string = json.dumps(e.message, indent=2)
    print(json_string)

with open("../schema_error_reports/schema_error_log.json", "w") as file:
    file.write(json_string)

## Schema Transformations
- Can be used to modify schemas without needing to redefine entire schema
  - schema.add_columns()
  - schema.remove_columns()
  - schema.update_columns()
  - schema.rename_columns()
  - schema.set_index()
  - schema.reset_index()

## Saving Schema Specifications
- Schema specifications can be saved to file
  - .py 
  - .yaml Files

In [None]:
# storing schema specification as python script
schema.to_script('../schemas/titanic_schema.py')

# storing schema specification as yaml file
schema.to_yaml('../schemas/titanic_schema.yaml')

In [None]:
# Read schema from yaml

schema_from_yaml = pa.io.from_yaml('../schemas/titanic_schema.yaml')
print(schema)

## Using Pandera decorators for integration of data validation into data pipelines

### What's a decorator?
- a function (higher-order function) that modifies the behavior of a function without direct changing the code in the function or class
- essentially a wrapper that can be used to extend/alter original function
- denoted in python by the @ symbol

In [None]:
# do not execute code in this cell, just for understanding

# toy example of what a decorated function looks like
@my_decorator
def my_function():
    pass

# behind the scenes, the function is modified by the decorator
# decorator takes the function as argument
def my_function():
    pass
my_function = my_decorator(my_function)

### Pandera Decorators for Integrating Validation Into Data Pipeline Functions
- @check_input
- @check_output
- @check_io

In [None]:
def compute_family_size(df: pd.DataFrame) -> pd.DataFrame:
    """Compute passenger family size as sum of sibsp + parch
    and append this sum to df as new 'family_size' column"""

    df = df.copy()
    df['family_size'] = df['sibsp'] + df['parch']

    return(df)

In [None]:
# define input schema for example
titanic_input_schema = create_titanic_schema()

In [None]:
# define output schema for example
titanic_output_schema = titanic_input_schema.add_columns({"family_size": Column(int),
                                                          "can_swim": Column(int)})

print(titanic_output_schema)

In [None]:
# define function with validation decorator
# toggle between
# @check_input(titanic_input_schema) and @check_output(titanic_output_schema)
@check_input(titanic_input_schema)
def compute_family_size(df: pd.DataFrame) -> pd.DataFrame:
    """Compute passenger family size as sum of sibsp + parch
    and append this sum to df as new 'family_size' column"""

    df = df.copy() 
    df['family_size'] = df['sibsp'] + df['parch'] + 1

    return(df)

In [None]:
# validate inputs/outputs and add computed family size to df 
df_out = compute_family_size(df)
df_out.head()

In [None]:
# @check_io needs schemas associated with both input df(s) and output df
@check_io(df=titanic_input_schema, out=titanic_output_schema)
def compute_family_size(df: pd.DataFrame) -> pd.DataFrame:
    """Compute passenger family size as sum of sibsp + parch
    and append this sum to df as new 'family_size' column"""

    df = df.copy()
    df['family_size'] = df['sibsp'] + df['parch']

    return(df)

In [None]:
# won't validate due to 'can_swim' column
# being expected in output schema
df_out = compute_family_size(df)
df_out.head()

## The End! Thanks for your attention!