# Data-centric ML pipeline walkthrough

In [1]:
import pandas as pd
import numpy as np
import os
import sys
import joblib
import time

sys.path.append('../')

from data_centric_preprocessing.create_schema import *
from data_centric_preprocessing.data_schema import *
from data_centric_preprocessing.util import *
from data_centric_preprocessing.config import *

In hardware verification, all columns of raw data often have object types, and thus we need to correctly infer data types. Besides, the raw data are often heterogeneous and feature meanings are obscure, which makes monitoring and tracking true type inference important. Inferring correct dtypes also helps applying different preprocessing methods for each dtype. Below is a toy example of raw data with mixed data types.

In [2]:
X_train = pd.DataFrame(
    {
        "col_bool": [np.nan, True, False],
        "col_num": [np.nan, 0.5, 10],
        "col_str": [np.nan, "A", "B"],
        "col_array": [np.nan, ["A"], ["B"]],
        "col_PATTERN1": [1, 2, 3],
        "col_null": [np.nan, np.nan, np.nan],
        "col_invariant": [1, 1, 1],
        "col_invariant_missing": [np.nan, 0, 0],
    }, 
    dtype=object
)
X_train

Unnamed: 0,col_bool,col_num,col_str,col_array,col_PATTERN1,col_null,col_invariant,col_invariant_missing
0,,,,,1,,1,
1,True,0.5,A,[A],2,,1,0.0
2,False,10.0,B,[B],3,,1,0.0


This walkthrough provides an overview of a training-/serving-aware data prepcoessing pipeline for raw data with following challenges:
- Schemas are absent and feature meanings are obscure.
- Data are highly heterogenous and have high dimentionality.
- During model serving, data type and shape are likely to have changed compared to model training.

## Model training stage

### Step 1. Infer a schema from a training data

(We first drop non-informative columns (features) from the raw data (seed, invariant, all-missing))

In [3]:
X_train = drop_non_informative_features(X_train, seed_patterns=["SEED$"])
X_train

Unnamed: 0,col_bool,col_num,col_str,col_array,col_PATTERN1,col_invariant_missing
0,,,,,1,
1,True,0.5,A,[A],2,0.0
2,False,10.0,B,[B],3,0.0


We need to infer correct data types from the raw data because they are heterogenous but all labeled as `object` dtypes.

In [4]:
X_train.dtypes

col_bool                 object
col_num                  object
col_str                  object
col_array                object
col_PATTERN1             object
col_invariant_missing    object
dtype: object

I choose `pandas.api.types.infer_dtype` because of its type granularity and capability to ignore nulls.

In [5]:
inferred_dtype = X_train.apply(lambda x:pd.api.types.infer_dtype(x, skipna=True))
inferred_dtype

col_bool                             boolean
col_num                  mixed-integer-float
col_str                               string
col_array                              mixed
col_PATTERN1                         integer
col_invariant_missing                integer
dtype: object

As you can see, the method can ignore na values and correctly identify types of remaining elements in each column. However, the problem is that the returned inferred values are string not python type:

In [6]:
print(inferred_dtype.values)

['boolean' 'mixed-integer-float' 'string' 'mixed' 'integer' 'integer']


Because the method returns string values (e.g., `"boolean"` not type `bool`), I need a dictionary to match these values to numpy dtypes for correcting dtypes.

In [7]:
numpy_dtype_map

{'boolean': float,
 'integer': float,
 'string': str,
 'floating': float,
 'mixed-integer-float': float,
 'mixed-integer': float,
 'mixed': str}

To implement the true meaning of the columns, we need a custom dtype map. The values of this map become the foundtation of column transformers in the preprocessing pipeline.

In [8]:
custom_dtype_map

{'boolean': 'numeric',
 'integer': 'numeric',
 'string': 'nominal_str',
 'floating': 'numeric',
 'mixed-integer-float': 'numeric',
 'mixed-integer': 'numeric',
 'mixed': 'arrays'}

`build_schema` function creates a schema, a collection of column names and their inferred dtypes. This function needs the above-mentioned dtype maps, but also `nominal_num_patterns` and `catch_invariant_with_missing` arguments.

In the raw data, we have numeric columns that do not have ordinal nature such as seeds or hash values. We call them as nominal-numeric columns. A bottom-up approach wouldn't be useful to identify these columns because we can't distinguish them from regular numeric columns. Luckily, we can identify them by their name patterns. That's where `nominal_num_patterns` comes in.

In [9]:
nominal_num_patterns

['PATTERN1$', 'PATTERN2$']

Because our data change their shapes (number of columns) over time, when we create a training dataset over multiple time periods, we end up having many missing data because some columns simply did not exist in a certain time period. For instance, on Day 1, we have columns A and B, but on Day 2, we have columns A, B, and C. When we create a batch of traning data from Days 1 and 2, then Day 1 data will have missing data for column C.

Among the columns like these (i.e., sparsity due to partial presence), when a column has at least one nan, (e.g., `[0, 0, 0, nan, nan]`), and only two values remain after removing dups from it, (e.g., `[0, nan]`), these two values are nan & the single invariant value. A column like this is referred to as "invariant with missing". We use sklearn's `MissingIndicator` to encode these columns to encode the presence of missing value. To identify these columns, we can set `catch_invariant_with_missing=True`.

In [10]:
schema_train = build_schema(
        X_train,
        numpy_dtype_map=numpy_dtype_map,
        custom_dtype_map=custom_dtype_map,
        nominal_num_patterns=nominal_num_patterns,
        catch_invariant_with_missing=True
    )

In [11]:
schema_train

Unnamed: 0,pandas_dtype,numpy_dtype,custom_dtype
col_bool,boolean,<class 'float'>,numeric
col_num,mixed-integer-float,<class 'float'>,numeric
col_str,string,<class 'str'>,nominal_str
col_array,mixed,<class 'str'>,arrays
col_PATTERN1,integer,<class 'float'>,nominal_num
col_invariant_missing,integer,<class 'float'>,invariant_with_missing


Once we inferred the numpy dtypes of the columns, we can cast those to the raw data to correct their data types.

In [12]:
X_train = cast_numpy_dtype(X_train, schema_train)
X_train.dtypes

col_bool                 float64
col_num                  float64
col_str                   object
col_array                 object
col_PATTERN1              object
col_invariant_missing    float64
dtype: object

Note that `col_PATTERN1`'s dtype is object (not float) because we are treating them as nominal (categorical).

### Step 2. Build a preprocessor based on the schema

A key feature of this pipeline is that we can use the schema, which is inferred from raw data, to build a preprocessor. We use `build_preprocessor` function here. It checks existing custom dtypes of the data and fetches corresponding methods for each custom dtype to build a collection of column transformers:


```python
known_custom_dtypes = set(schema_training["custom_dtype"])
transformers = []
    for custom_dtype in sorted(known_custom_dtypes):
        cols = schema_training[
            schema_training["custom_dtype"] == custom_dtype
        ].index.values
        transformer = (custom_dtype, transformer_map[custom_dtype], cols)
        transformers.append(transformer)
    column_transformers = ColumnTransformer(transformers)
    preprocessor = Pipeline(preprocessor_steps)
    preprocessor.set_params(column_transformers=column_transformers)
```

The corresponding methods (data preprocessing methods) are stored in `transformer_map`, which is a dictionary where each key is the custom dtype name and its value is a sklearn pipeline object with a stack of various preprocessing methods.

In [13]:
transformer_map

{'invariant_with_missing': Pipeline(steps=[('imputer', MissingIndicator(error_on_new=False))]),
 'numeric': Pipeline(steps=[('imputer', SimpleImputer(strategy='median'))]),
 'arrays': Pipeline(steps=[('imputer',
                  SimpleImputer(fill_value='missing', strategy='constant')),
                 ('encoder', OrdinalEncoder())]),
 'nominal_str': Pipeline(steps=[('imputer',
                  SimpleImputer(fill_value='missing', strategy='constant')),
                 ('encoder', OrdinalEncoder())]),
 'nominal_num': Pipeline(steps=[('imputer',
                  SimpleImputer(fill_value='missing', strategy='constant')),
                 ('encoder', OrdinalEncoder())])}

For instance, as above-mentioned, for the `"invariant_with_missing"` columns, we apply `MissingIndicator`:

In [14]:
print(transformer_map['invariant_with_missing'])

Pipeline(steps=[('imputer', MissingIndicator(error_on_new=False))])


Even though the column transformer is a key part of the preprocessor, we might still need other preprocessors if we want to apply them to all columns. This information can be added to `preprocessor_steps`:

In [15]:
preprocessor_steps

[('column_transformers', None), ('feature_selection', VarianceThreshold())]

Thus, `build_preprocessor` function requires both `transformer_map` and `preprocessor_steps`:

In [16]:
preprocessor = build_preprocessor(
        schema_train,
        transformer_map=transformer_map,
        preprocessor_steps=preprocessor_steps
    )
preprocessor

This shows that each custom dtype from the training schema is represented as an independent column transformer and its associated method is applied to each. And the constructed preprocessor is simply a sklearn pipeline object, so we can apply `fit_transform` to transform the raw data.

In [17]:
X_train_transformed = preprocessor.fit_transform(X_train)
X_train_transformed

array([[ 2.  ,  1.  ,  0.  ,  2.  ,  0.5 ,  5.25],
       [ 0.  ,  0.  ,  1.  ,  0.  ,  1.  ,  0.5 ],
       [ 1.  ,  0.  ,  2.  ,  1.  ,  0.  , 10.  ]])

## Model serving stage

During serving, the feature set is likely to have changed. The mismatch between training and serving is resolved by schema comparison. In this walkthrough, let's create a toy serving data where we have both shape (`"col_bool"` has string values) and type (`"col_extra"` is added) mismatches.

In [18]:
X_serve = X_train.copy()
X_serve["col_extra"] = [np.nan, "C", "D"]
X_serve["col_bool"] = [np.nan, "A", "B"]

In [19]:
X_serve

Unnamed: 0,col_bool,col_num,col_str,col_array,col_PATTERN1,col_invariant_missing,col_extra
0,,,,,1,,
1,A,0.5,A,['A'],2,0.0,C
2,B,10.0,B,['B'],3,0.0,D


### Step 1. Infer a schema from a serving data

To compare schemas between training and serving, we need to first create a schema during serving. This step is identical to the model training stage.

In [20]:
X_serve = drop_non_informative_features(X_serve, seed_patterns=["SEED$"])
schema_serve = build_schema(
    X_serve,
    numpy_dtype_map=numpy_dtype_map,
    custom_dtype_map=custom_dtype_map,
    nominal_num_patterns=nominal_num_patterns,
    catch_invariant_with_missing=True
    )
X_serve = cast_numpy_dtype(X_serve, schema_serve)

In [21]:
schema_serve

Unnamed: 0,pandas_dtype,numpy_dtype,custom_dtype
col_bool,string,<class 'str'>,nominal_str
col_num,floating,<class 'float'>,numeric
col_str,string,<class 'str'>,nominal_str
col_array,string,<class 'str'>,nominal_str
col_PATTERN1,string,<class 'str'>,nominal_num
col_invariant_missing,floating,<class 'float'>,invariant_with_missing
col_extra,string,<class 'str'>,nominal_str


### Step 2. Resolve mismatches

First, we resolve any shape (feature-set) mismatches. Unseen columns are dropped and missing columns are added back as nulls, and later imputed by a fitted preprocessor. 

In [22]:
X_serve = match_cols(X_serve, schema_train.index)
X_serve

Columns ['col_extra'] were dropped because they are not in X.


Unnamed: 0,col_bool,col_num,col_str,col_array,col_PATTERN1,col_invariant_missing
0,,,,,1,
1,A,0.5,A,['A'],2,0.0
2,B,10.0,B,['B'],3,0.0


Next, we resolve the type mismatch. When there is type mismatch, model prediction will fail, but there is no easy way to resolve this automatically. Thus, we replace the mismatched columns as nulls, which will be imputed by fitted preprocessor later. **Note that the intention behind this simple resolution is NOT to ignore data drifts but to avoid frequent pipeline failures because these mismatches are very common. Whenever mismatches occur, we create an alert to start manual data inspection.**

In [23]:
X_serve = replace_numpy_dtype_mismatch(schema_train, schema_serve, X_serve)
X_serve



Unnamed: 0,col_bool,col_num,col_str,col_array,col_PATTERN1,col_invariant_missing
0,,,,,1,
1,,0.5,A,['A'],2,0.0
2,,10.0,B,['B'],3,0.0


Here, you can see that `"col_bool"` which has a different dtype (string) during serving now have null values.

### Step 3. Transform the serving data

Once mismatches are resolved, we apply the fitted preprocessor to transform the serving data.

In [24]:
X_serve_transformed = preprocessor.transform(X_serve)
X_serve_transformed

array([[ 2.  ,  1.  ,  0.  ,  2.  ,  0.5 ,  5.25],
       [ 0.  ,  0.  ,  1.  ,  0.  ,  0.5 ,  0.5 ],
       [ 1.  ,  0.  ,  2.  ,  1.  ,  0.5 , 10.  ]])

## Conclusions

Many data-related challengs exist in hardware verification. There's no schema, data drifts are frequent, and we need to infer data types correctly. The data-centric ML pipeline shown in this walkthrough can address this issue by inferring schema from raw data, building preprocessors directly from it, and quickly resolving shape and type mismatches during model serving. This approach makes data preprocessing modular and transparent, and it tracks granular information of the contents of the data over time. Since it uses common pythong packages such as pandas and sklearn, it can be easily integrated into other existing data and model pipelines.