# Data Validation in Training Pipelines


In this notebook, we will go through the process of validating dataframes in a training pipeline using Pandera.

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/NatanMish/data_validation/blob/main/notebooks/2_training_pipeline_data_validation.ipynb)


#### Install the required packages and import them to the notebook

In [None]:
!pip install sklearn pandera\[strategies\]

In [None]:
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.base import BaseEstimator
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
import pandera as pa
from typing import Optional

#### Load the data

In [None]:
home_data = pd.read_csv('https://github.com/NatanMish/data_validation/blob/a77b247b25c6622ce0c8f8cbc505228161c31a3c/data/train.csv?raw=true')

#### Train basic model
We'll start by setting up a training pipeline using Scikit Learn's native class. We only want to select a few basic features for the purpose of this example, so we'll set up a pipeline step class that will select only those features.

In [None]:
class ChooseFeatures(BaseEstimator):
    def __init__(self, features=None):
        self.features = features
    def fit(self, X, y=None):
        return self
    def transform(self, X, y=None):
        return X[self.features]

In [None]:
feature_names = ['LotArea','YearBuilt','1stFlrSF','2ndFlrSF','FullBath','BedroomAbvGr','TotRmsAbvGrd', 'LotFrontage']

Now we set up the pipeline and fit it to the data.

In [None]:
pipe = Pipeline([
     ('feature_selection', ChooseFeatures(features=feature_names)),
     ('scaler', StandardScaler()),
     ('rf', RandomForestRegressor())
])

In [None]:
X = home_data
y = home_data.SalePrice
pipe.fit(home_data, y)

Looks like our data has null values and this causes the model to break. Let's take a look at Pandera to see how it can help us with this.

<div>
<img src="https://raw.githubusercontent.com/pandera-dev/pandera/master/docs/source/_static/pandera-banner.png" width="500"/>
</div>

Pandera provides a flexible and expressive API for performing data validation on dataframes to make data processing pipelines more readable and robust. Dataframes contain information that pandera explicitly validates at runtime. This is useful in production-critical data pipelines or reproducible research settings. We'll take a look at these Pandera features:

1. Check the types and properties of columns in a pd.DataFrame or values in a pd.Series.

2. Perform more complex statistical validation like hypothesis testing.

3. Integrate with existing data analysis/processing pipelines via function decorators.

4. Define schema models with the class-based API with pydantic-style syntax and validate dataframes using the typing syntax.

5. Synthesize data from schema objects for property-based testing with pandas data structures.

6. Lazily Validate dataframes so that all validation rules are executed before raising an error.

For more information, see [Pandera's documentation](https://pandera.readthedocs.io/en/latest/).

#### 1. DataFrame Schemas - Type Validation

In [None]:
# We'll add one more feature to make it more interesting
feature_names.append('LotConfig')

In [None]:
# Create a basic schema for the home_data DataFrame to check types for just 2 of the feature
basic_types_schema = pa.DataFrameSchema({
    "LotArea": pa.Column(int),
    "LotConfig": pa.Column(str),
    })

In [None]:
# Validate the home_data DataFrame against the basic_schema
# notice that although we only defined two of the features in the dataframe, and Pandera ignored the rest.
basic_types_schema.validate(home_data[feature_names])

There is an output from the validation, this means that the data is valid.
There are different ways we can specify the type:
- a string alias, as long as it is recognized by pandas.
- a python type: int, float, double, bool, str
- a numpy data type
- a pandas extension type: it can be an instance (e.g pd.CategoricalDtype([“a”, “b”])) or a class (e.g pandas.CategoricalDtype) if it can be initialized with default values.
- a pandera DataType: it can also be an instance or a class.

In [None]:
# Now let's create a schema that does not fit the data types in home data
bad_types_schema = pa.DataFrameSchema({
    "LotArea": pa.Column(int),
    "LotConfig": pa.Column(float),
})

In [None]:
# The bad schema validation will throw an error
bad_types_schema.validate(home_data[feature_names])

#### 2. DataFrame Schemas - Value Ranges Validation

In [None]:
# Pandera also allows validating value ranges for numerical columns
value_range_schema = pa.DataFrameSchema({
    "LotArea": pa.Column(int, pa.Check(lambda s: s <= 1000000), nullable=False),
    "YearBuilt": pa.Column(int, [pa.Check.in_range(1800, 2022)]),
})

In [None]:
# Validate the home_data DataFrame against the value_range_schema
value_range_schema.validate(home_data[feature_names])

#### 3. DataFrame Schemas - Catch Bad Data

What if instead of breaking on error we want to continue processing the dataframe? or we want to skip the bad data? we can use the `failure_cases` attribute of the error message to capture the bad data indices and the `lazy` argument for going over the entire dataframe instead of failing on the first bad row. We can do that by utilizing a try-except block.

In [None]:
# We'll use a small sample of the data to make the example more clear
sample_data = home_data.sample(n=10)

In [None]:
# Create a schema that will fail on the first bad data point
catch_bad_data_schema = pa.DataFrameSchema({
    "LotArea": pa.Column(int, pa.Check(lambda s: s <= 1000000)),
    "YearBuilt": pa.Column(int, pa.Check.in_range(1900,1990)),  # notice that the year built has a restrictive range
})

In [None]:
# Validating the home_data DataFrame against the catch_bad_data_schema will throw an error
catch_bad_data_schema.validate(sample_data[feature_names])

Now let's use a try except block to catch the bad data indices. This is a common and valid practice in Python called EAFP - "easier to ask for forgiveness than permission" which might not be as well recieved in other languages.

In [None]:
try:
    catch_bad_data_schema.validate(sample_data[feature_names], lazy=True)
except pa.errors.SchemaErrors as e:
    failure_cases = e.failure_cases

# Failure cases is a dataframe of the bad data only
failure_cases.head()

In [None]:
# We can easily filter out the bad data from the original dataframe using the failure_cases dataframe
filtered_df = sample_data[~sample_data.index.isin(failure_cases["index"])]

In [None]:
# Let's see that the filtered data passes the validation test
catch_bad_data_schema.validate(filtered_df[feature_names])

#### 4. DataFrame Schemas - Validate acceptable categorical values

In [None]:
lot_config_values = ["Inside", "Corner", "CulDSac", "FR3"]

In [None]:
lot_config_values_schema = pa.DataFrameSchema({
    "LotArea": pa.Column(int, pa.Check(lambda s: s <= 1000000)),
    "LotConfig": pa.Column(str, pa.Check.isin(lot_config_values)),
})

In [None]:
# Validating the home_data DataFrame against the lot_config_values_schema will throw an error
lot_config_values_schema.validate(home_data[feature_names])

Other useful methods for `pa.Check` are:

<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.eq.html">pandera.checks.Check.eq</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.equal_to.html">pandera.checks.Check.equal_to</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.ge.html">pandera.checks.Check.ge</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.greater_than.html">pandera.checks.Check.greater_than</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.greater_than_or_equal_to.html">pandera.checks.Check.greater_than_or_equal_to</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.gt.html">pandera.checks.Check.gt</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.in_range.html">pandera.checks.Check.in_range</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.isin.html">pandera.checks.Check.isin</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.le.html">pandera.checks.Check.le</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.less_than.html">pandera.checks.Check.less_than</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.less_than_or_equal_to.html">pandera.checks.Check.less_than_or_equal_to</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.lt.html">pandera.checks.Check.lt</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.ne.html">pandera.checks.Check.ne</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.not_equal_to.html">pandera.checks.Check.not_equal_to</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.notin.html">pandera.checks.Check.notin</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.str_contains.html">pandera.checks.Check.str_contains</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.str_endswith.html">pandera.checks.Check.str_endswith</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.str_length.html">pandera.checks.Check.str_length</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.str_matches.html">pandera.checks.Check.str_matches</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.str_startswith.html">pandera.checks.Check.str_startswith</a></li>
<li class="toctree-l4"><a class="reference internal" href="methods/pandera.checks.Check.__call__.html">pandera.checks.Check.__call__</a></li>

#### 5. DataFrame Schemas - `Coerce`

`Coerce` allows forcing type onto a specific dataframe column

In [None]:
home_data.LotArea.dtype

In [None]:
coerce_schema = pa.DataFrameSchema(
    columns={"LotArea": pa.Column(float)},
    coerce=False,
)

In [None]:
coerce_schema.validate(home_data)

In [None]:
# and if we set coerce to True, we can coerce the dataframe to the schema
coerce_schema = pa.DataFrameSchema(
    columns={"LotArea": pa.Column(float)},
    coerce=True,
)

In [None]:
coerce_schema.validate(home_data)

#### 6. DataFrame Schemas - `Strict`

In [None]:
# Using `strict` we can specify that the dataframe must have the exact columns specified in the schema
strict_schema = pa.DataFrameSchema(
    columns={"LotArea": pa.Column(int), "YearBuilt": pa.Column(int)},
    strict=True,
)

In [None]:
# Another useful feature is setting `strict` to 'filter' which will filter out any columns that are not in the schema
strict_filter_schema = pa.DataFrameSchema(
    columns={"LotArea": pa.Column(int), "YearBuilt": pa.Column(int)},
    strict="filter",
)
filtered_df = strict_filter_schema.validate(home_data)
filtered_df.head()

### Exercise 1 - DataFrame Schemas

Create a pa.DataFrameSchema object for the `home_data` DataFrame. Not all the checks requested were shown above, for some of them you'll need to have a quick search in the Pandera documentation. It should have the following columns and rules:
1. Id is a required and unique column of an integer type and cannot be null.
2. MSZoning is a non-required column of a string type and can be null. If not null it can only accept these values - 'RL', 'RM', 'C (all)', 'RH' and 'FV'.
3. OverallQual is a required column of an integer type, cannot be null and must be in the range 1-10.
4. BsmtCond is a non-required column of a string type and can be null. If not null it can only accept a string of a length of 2.

Bonus:

5. Add the 1stFlrSF and 2ndFlrSF columns to the schema and validate that on average 1stFlrSF>=2ndFlrSF.

Create the schema such that it filters out any other columns that are not in the schema.


In [None]:
exercise_schema = pa.DataFrameSchema(
    columns={
        "Id": <YOUR ANSWER HERE>,
        "MSZoning": <YOUR ANSWER HERE>,
        "OverallQual": <YOUR ANSWER HERE>,
        "BsmtCond": <YOUR ANSWER HERE>,
        "1stFlrSF": <YOUR ANSWER HERE>,
        "2ndFlrSF": <YOUR ANSWER HERE>,
    },
    strict=<YOUR ANSWER HERE>,
    checks=<YOUR ANSWER HERE>,
)

In [None]:
exercise_schema.validate(home_data)

*Exercise solutions can be found in the exercise solutions file in the current directory.*

### 7. Pandera Decorators

Pandera offers decorators which allow a seamless integration of Pandera validations with our code. The available decorators are:
- @check_input
- @check_output
- @check_io
- @check_types

We will use a different way for defining the schemas in the next example, but the same principles apply. Here we will construct a class based Pandera model which we can use to validate inputs and outputs to our data in Pydantic style syntax.

In [None]:
from pandera.typing import Series

# Define a class based Pandera model for the input data to the feature engineering step
class FeaturesSchemaPreEngineering(pa.SchemaModel):
    LotArea: Series[int] = pa.Field(nullable=False, ge=0)
    YearBuilt: Series[int] = pa.Field(nullable=False, ge=1700)
    FirstFlrSF: Series[int] = pa.Field(nullable=False, ge=0, alias="1stFlrSF") # alias is used to give the column a different name in the schema because the column name starts with a number
    SecondFlrSF: Series[int] = pa.Field(nullable=False, ge=0, alias="2ndFlrSF")
    FullBath: Series[int] = pa.Field(nullable=False, ge=0)
    BedroomAbvGr: Series[int] = pa.Field(nullable=False, ge=0)
    TotRmsAbvGrd: Series[int] = pa.Field(nullable=False, ge=0)
    LotFrontage: Series[float] = pa.Field(nullable=True, ge=0)
    LotConfig: Series[str] = pa.Field(nullable=True, isin=["Inside", "Corner", "FR2", "FR3", "CulDSac"])
    class Config:
        strict=True

# Define a class based Pandera model for the output data to the feature engineering step, notice how we inherit the FeaturesSchemaPreEngineering class and extend it with the output data schema.
class FeaturesSchemaPostEngineering(FeaturesSchemaPreEngineering):
    HouseAge: Series[int] = pa.Field(nullable=False, ge=0)
    AllFloorsSF: Series[int] = pa.Field(nullable=False, ge=0)
    NonBedRmAbvGrd: Series[int] = pa.Field(nullable=False, ge=0)

    class Config:
        strict=True

In [None]:
from pandera import check_types
from pandera.typing import DataFrame as DataFramePa
@check_types
def feature_engineering(df: DataFramePa[FeaturesSchemaPreEngineering]) -> DataFramePa[FeaturesSchemaPostEngineering]:
    df = df.copy()
    df["HouseAge"] = 2022 - df["YearBuilt"]
    df["AllFloorsSF"] = df["1stFlrSF"] + df["2ndFlrSF"]
    df["NonBedRmAbvGrd"] = df["TotRmsAbvGrd"] - df["BedroomAbvGr"]
    return df

In [None]:
# This run should complete without error
feature_engineering(home_data[feature_names])

In [None]:
# If we don't filter out any columns, we should get an error, since it is incompatible with the schema
feature_engineering(home_data)

### 8. Data Synthesis

Pandera offers a simple way to generate synthetic data. We can use the `example` method to generate a DataFrame with a given schema.

In [None]:
FeaturesSchemaPreEngineering.example(size=5)

Notice how some of the columns have 'crazy' values, this is because the random data generating process is using the checks we have defined in the schema for detecting the acceptable ranges possible.

We can use the hypothesis library to generate data for our schema and then use it in a unit test:

In [None]:
import hypothesis
@hypothesis.given(FeaturesSchemaPreEngineering.strategy(size=5))
def test_processing_fn(dataframe):
    feature_engineering(dataframe)

### 9. Schema Inference

Pandera can infer schemas from data. This is useful when you have a large dataset and you don't want to define a schema manually.

In [None]:
schema = pa.infer_schema(home_data)
print(schema)

### Exercise 2 - Incorporating validation in a training pipeline

Let's use what we have learned to implement data validation in a training pipeline.
Choose which features to use for training. You can use these, or add your own:

In [None]:
feature_names = ['LotArea','YearBuilt','1stFlrSF','2ndFlrSF','FullBath','BedroomAbvGr','TotRmsAbvGrd', 'LotFrontage']

If you chose other features, make sure to include the ones that are required for the schema to be valid.

In [None]:
# class FeaturesSchemaPreEngineering(pa.SchemaModel):
#     ...

# class FeaturesSchemaPostEngineering(FeaturesSchemaPreEngineering):
#     ...

In [None]:
@check_types
def feat_eng_step_1(df: DataFramePa[FeaturesSchemaPreEngineering]) -> DataFramePa[FeaturesSchemaPostEngineering]:
    df = df.copy()
    df["HouseAge"] = 2022 - df["YearBuilt"]
    df["AllFloorsSF"] = df["1stFlrSF"] + df["2ndFlrSF"]
    df["NonBedRmAbvGrd"] = df["TotRmsAbvGrd"] - df["BedroomAbvGr"]
    return df

Fill in the missing steps below:
1. First try to run the feat_eng_step_1 function on the X dataframe, and return a tuple of the feature engineered dataframe and the y series.
2. If a pa.errors.SchemaError is raised, capture the exception and extract the `failure_cases` property from the exception. Filter out the invalid data indices from X and y, and then return a tuple comprised of:
    - the filtered X dataframe
    - the filtered y series
    - the failure cases

In [None]:
def feat_eng_all_steps(X: pd.DataFrame, y: pd.Series) -> (pd.DataFrame, pd.Series, Optional[pd.DataFrame]):
    try:
        <YOUR CODE HERE>
    except <YOUR CODE HERE> as e:
        <YOUR CODE HERE>

Same pipeline as defined previously:

In [None]:
pipe = Pipeline([
    ('feature_selection', ChooseFeatures(features=feature_names)),
    ('scaler', StandardScaler()),
    ('rf', RandomForestRegressor())
])

Running the cell below should fit the data to the pipeline, this time with no errors:

In [None]:
X, y, isolated_invalid_data = feat_eng_all_steps(home_data[feature_names], home_data["SalePrice"])
pipe.fit(X, y)

In [None]:
# And we can see the invalid data we were unable to fit:
isolated_invalid_data.head()