In [30]:
import numpy as np
import pandas as pd

from scipy.stats import truncnorm
from collections import defaultdict
import random
from scipy.stats import ks_2samp, chi2_contingency, mannwhitneyu
from scipy.stats.contingency import association
import scipy.stats as stats
import matplotlib.pyplot as plt
import os

from scipy.spatial.distance import jensenshannon

# Data Drift Framework

In this notebook we will try to implement a basic framework to detect data drift. We will go step by step.

Before we can start building the framework, we need to have a clear idea of what we mean by data drift.

### Step 1: Defining Data drift

The data we use for our models are not static. The characteristics od data may change over time. This change could be due to `change in distribution, data quality or structure`, leading to difference between the data we used to train our models and the data it now encounters in production. This drift could be due to change in data sources, change in user behaviour, changes in environment in which data was collected.

Data drift can occur if the relationship between the input variables and the target variable changes over time. This is called `Concept Drift`. Data drift can also occur when the distibution of input variables changes over time, but the relationship between input and the target variable remains same. This is called `Covariate Drift`. The underlying problem or context of the data itself might change. This is called `Domain Drift`.

Now, we can proceed with defining some functions to simulate syntheic data and then introduce the different data drift in them. We can use them to test our framework.

In [2]:
def generate_data(n_samples, new_sample = True):
    # Initialize
    age_mean = 35
    age_std = 10
    hourly_rate_mean = 65
    hourly_rate_std = 15

    distance_param = 0.3 
    work_life_bal_param = 3

    education_field_cat = ['High School', 'Undergraduate', 'Masters', 'Phd']
    education_field_prob = [0.2, 0.4, 0.2, 0.2]
    marital_status_cat = ['Married', 'Single', 'Divorced']
    marital_status_prob = [0.4, 0.4, 0.2]
    gender_cat = ['Male', 'Female']
    gender_prob = [0.6, 0.4]

    if new_sample:   
        # generate continuous variables
        age = truncnorm((18 - age_mean) / age_std, (65 - age_mean) / age_std, loc=age_mean, scale=age_std).rvs(size=n_samples).astype(int)
        hourly_rate = np.random.normal(hourly_rate_mean, hourly_rate_std, n_samples)
        
        
        # generate discrete variables
        distance_from_home = np.random.negative_binomial(distance_param, 1-distance_param, n_samples)
        work_life_balance = np.random.poisson(work_life_bal_param, n_samples)

        # generate categorical variables
        education_level = np.random.choice(education_field_cat, n_samples, p=education_field_prob)
        marital_status = np.random.choice(marital_status_cat, n_samples, p=marital_status_prob)
        gender = np.random.choice(gender_cat, n_samples, p=gender_prob)

    else:
        np.random.seed(123)
        age = truncnorm((18 - age_mean) / age_std, (65 - age_mean) / age_std, loc=age_mean, scale=age_std).rvs(size=n_samples).astype(int)
        hourly_rate = np.random.normal(hourly_rate_mean, hourly_rate_std, n_samples)
        
        # generate discrete variables
        distance_from_home = np.random.negative_binomial(distance_param, 1-distance_param, n_samples)
        work_life_balance = np.random.poisson(work_life_bal_param, n_samples)

        # generate categorical variables
        education_level = np.random.choice(education_field_cat, n_samples, p=education_field_prob)
        marital_status = np.random.choice(marital_status_cat, n_samples, p=marital_status_prob)
        gender = np.random.choice(gender_cat, n_samples, p=gender_prob)

    # combine all variables into a pandas dataframe
    data = pd.DataFrame({'Age': age,
                        'DistanceFromHome': distance_from_home,
                        'Education': education_level,
                        'Gender': gender,
                        'HourlyRate': hourly_rate,
                        'MaritalStatus': marital_status,
                        'WorkLifeBalance': work_life_balance,
    })

    return data

The function above generates a dataset with 7 features, each with its own distribution. We can introduce sudden changes in data or gradual changes to a feature in this dataset and have our framework try detect it. This will help us with testing but keeping in mind that the framework should be able to work for any dataset and not just for these features. In other words, this is just one example of a dataset but we will generalise our work and findings for all use cases.

We have a function to generate a dataset. Now we need to come up with our target variable. We will have a seperate funtion for that:

In [3]:
def create_label(target_var):
    if (target_var['Age'] < 35) and (target_var['Education'] == 'Undergraduate') and (target_var['MaritalStatus'] == 'single'):
        return 'Yes'
    elif (target_var['Age'] > 45) and (target_var['HourlyRate'] < 45) and (target_var['MaritalStatus'] == 'married'):
        return 'Yes'
    elif (target_var['WorkLifeBalance'] < 2) and (target_var['Gender'] == 'Male'):
        return 'Yes'
    elif (target_var['Age'] < 30) and (target_var['Gender'] == 'Female') and (target_var['HourlyRate'] < 35):
        return 'Yes'
    else:
        return 'No'

The code above will create the target variable for the dummy dataset. Now that we have everything, we can go ahead with creating our dummy dataset:

In [46]:
drift_df = generate_data(1500, new_sample=False)

drift_df['Atrrition'] = drift_df.apply(create_label, axis=1)
drift_df.head(5)

Unnamed: 0,Age,DistanceFromHome,Education,Gender,HourlyRate,MaritalStatus,WorkLifeBalance,Atrrition
0,40,0,Masters,Male,51.420496,Divorced,0,Yes
1,30,2,Phd,Male,63.67212,Married,1,Yes
2,28,0,Undergraduate,Female,68.944386,Divorced,2,No
3,36,0,High School,Female,74.630632,Single,3,No
4,41,0,Masters,Male,23.287441,Single,2,No


This concludes `Step 1`. In this step we defined what we mean by data drift and then create a data generating function to create a synthetic dataset that can be used to test our code as we start build it in the new steps.

### Step 2: Creating a function to detect covariate drift

We will need a way to compare the distribution of the input features between two sets of data. This will help us determine if the features have the same underlying distribution.

#### Step 2.1: Checking the distribution of a single variable between two sets of data to determine if they come from the same underlying distribution.

In order to compare the distribution of a single variable between two sets of data, we can use the Kolmogorov-Smirnov test (KS-test). The Null hypothesis for the KS-test is that there is no significant difference between the distribution of the variable between the two datasets.

In [5]:
drift_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1500 entries, 0 to 1499
Data columns (total 8 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   Age               1500 non-null   int32  
 1   DistanceFromHome  1500 non-null   int32  
 2   Education         1500 non-null   object 
 3   Gender            1500 non-null   object 
 4   HourlyRate        1500 non-null   float64
 5   MaritalStatus     1500 non-null   object 
 6   WorkLifeBalance   1500 non-null   int32  
 7   Atrrition         1500 non-null   object 
dtypes: float64(1), int32(3), object(4)
memory usage: 76.3+ KB


In [6]:
# Getting the categorical and numerical feature names

def get_num_cat_columns(data):
    # Initialize
    continuous_columns = []
    # all numeric columns
    numerical_columns = data.select_dtypes(include=['float', 'int']).columns.tolist()
    # all discrete cols
    discrete_columns = [col for col in data.columns if data[col].dtype == 'int' and len(data[col].unique()) <= 25]
    # all categorical features
    categorical_columns = data.select_dtypes(include=['object']).columns.tolist()
    # all continuous numeric variables
    for i in numerical_columns:
        if i not in discrete_columns:
            continuous_columns.append(i)
    return continuous_columns, discrete_columns, categorical_columns


The above function will help us determine which columns have numerical variables and which fetaures are categorical. KS-test assumes that the variables are continuous and follow a specific distribution. So we will only use it to compare the distribution of the continuous numerical variables between the two datasets.

In [7]:
def ks_check(feature,source_one, source_two):
    """ 
    ks_check(feature,source_one, source_two): compares the distribution of feature between the two datasets source_one and source_two
        and returns True if there is a signifiacnt difference and False otherwise
    ks_check: Str DataFrame DataFrame -> Bool   
    """
    old_set = source_one[feature]
    new_set = source_two[feature]
    # if the feature is continuous:
    _, p_val = ks_2samp(old_set, new_set)

    if p_val < 0.05:
        return True
    else:
        return False

In order to compare the distribution of the same discrete numerical feature in two datasets, we can use Mann-Whitney U test. It is a non parametric test. It assumes that the two sets being compared are independent and drwan from the same underlying distribution.So, in this case we will assume any difference is due to drift and not because of other factors like data preprocessing and would keep all those factors constant.

The Null hypothesis for this test is that the two data sets have the same underlying distribution

In [8]:
def mann_whitney_u_check(feature,source_one, source_two):
    """ 
    mann_whitney_u_check(feature,source_one, source_two): compares the distribution of feature between the two datasets source_one and source_two
        and returns True if there is a signifiacnt difference and False otherwise
    mann_whitney_u_check: Str DataFrame DataFrame -> Bool   
    """
    old_set = source_one[feature]
    new_set = source_two[feature]
    # if the feature is continuous:
    _, p_val = mannwhitneyu(old_set, new_set)

    if p_val < 0.05:
        return True
    else:
        return False

Now that we have a way to check covariate drift in numeric features, we will shift our focus to categorical features.

We will use Jensen-Shannon divergence (JSD) to look for differences in the underlying distribution of a categorical feature between two datasets by measuring the similarity or difference between two probability distributions.

Note that this can fail if there are categories taht are present in one data set but is missing in the other.

In [81]:
def jsd_check(feature,source_one, source_two):
    """ 
    jsd_check(feature,source_one, source_two): compares the distribution of feature between the two datasets source_one and source_two
        and returns True if the difference is high and False otherwise
    jsd_check: Str DataFrame DataFrame -> Bool   
    """
    old_set = source_one[feature]
    new_set = source_two[feature]
    
    old_factor, _ = pd.factorize(old_set)
    new_factor, _ = pd.factorize(new_set)

    old_count = np.bincount(old_factor)
    new_count = np.bincount(new_factor)

    d = 0.5 * (old_count + new_count)
    metric = 0.5 * (jensenshannon(old_count, d) + jensenshannon(new_count, d))

    if metric > 0.1:
        return True
    else:
        return False

#### Step 2.2: Checking the distribution of all variables between two sets of data to determine if they come from the same underlying distributions.

Now that we have a way to compare the distributions of all types of features between two datasets, we will combine it all into a single function:

In [80]:
def check_distribution(first_batch,second_batch):
    """
    check_distribution(first_batch,second_batch) takes two sets of data and checks if the underlying distribution of each features
      in the two sets of data first_batch and second_batch are same or not. If atleast one feature has significantly different
      underlying feature between the two sets of data, it will return the list of names all such featuresfetaure name. If no such feature
      is detected is found it will return an empty list
    check_distribution: DataFrame DataFrame -> Listof Str
    """
    cont, dis, cat = get_num_cat_columns(first_batch)
    cov_drift = []
    for i in cont:
        if ks_check(i,first_batch, second_batch):
            cov_drift.append(i)
    for j in dis:
        if mann_whitney_u_check(j,first_batch, second_batch):
            cov_drift.append(j)
    for k in cat:
        if jsd_check(k,first_batch, second_batch):
            cov_drift.append(k)
    return cov_drift


#### Step 2.3: Testing our covariate detection function

Now that we have the function to detect covariate drift, we need to start testing:

Adding drift to `Age`, `Education` and `DistanceFromHome`:

In [55]:
new_drifted_df = pd.DataFrame()

new_drifted_df = drift_df.copy()

# Adding drift to Age
delta = np.linspace(0, new_drifted_df['Age'].mean() * 0.5, len(new_drifted_df))
new_drift_data = new_drifted_df['Age'] + delta
new_drifted_df['Age'] = new_drift_data

# Adding drift to DistanceFromHome
n_samples = new_drifted_df.shape[0]
distance_param = 0.1
distance_from_home = np.random.negative_binomial(distance_param, 1-distance_param, n_samples)

education_field_cat = ['High School', 'Undergraduate', 'Masters', 'Phd']
education_field_prob = [0.5, 0.1, 0.3, 0.1]
education_level = np.random.choice(education_field_cat, n_samples, p=education_field_prob)
   
new_drifted_df['DistanceFromHome'] = distance_from_home
new_drifted_df['Education'] = education_level
      


Defining two more sets of data that have same underlying distributions since they were generated using the same function.

In [106]:
## No drift added
no_drift_df = generate_data(1500, new_sample=True)
no_drift_df['Atrrition'] = drift_df.apply(create_label, axis=1)
## Anoter set with no drift
no_drift_df_2 = generate_data(1500, new_sample=True)
no_drift_df_2['Atrrition'] = drift_df.apply(create_label, axis=1)

In the first case, we are checking if our function can detect the three features taht we added drifts to.

In [109]:
check_distribution(drift_df, new_drifted_df)

['Age', 'DistanceFromHome', 'Education']

It seem to successfully detect the covariate drift.

We will try our drift detection function on another set:

In [107]:
check_distribution(no_drift_df, new_drifted_df)

['Age', 'DistanceFromHome', 'Education']

It seem to work as expected and now we can check to see if it returns an empty list when we try to detect drift in features between two sets that were genrated using the same function.

In [108]:
check_distribution(no_drift_df_2, no_drift_df)

[]

This also works and expected. Therefore, now that we have tested a few cases, we can move forward with our nect steps:

Step 2.4: Implementing PSI

The PSI method will help us ensure that the model we develop using one dataset can be applied to a different dataset. Inorder to implement our PSI function, we will divide the feature we are testing into several bins based on the distribution of the covariate in the old dataset. The bins will then be used to compute the proportion of obsevations in each bin in both sets of data. Finally, the PSI is computed as the sum of the difference between the proportion of observations in each bin in the two sets of data, multiplied by the logarith ratio of the proportion of obervations in each bin.

We will compute the PSI for the two different types of features and then combine the two. We will start with numeric features:

In [111]:
def psi_numeric(observed, expected):
    # Calculate the difference between the mean of the observed and expected data
    diff = abs(observed.mean() - expected.mean())
    
    # Set a threshold for detecting drift
    threshold = 0.1 * abs(expected.mean())
    
    # If the difference exceeds the threshold, return the PSI value
    if diff > threshold:
        buckets = 10

        # Calculate the bucket boundaries
        boundaries = np.quantile(observed.index.values, np.linspace(0, 1, buckets + 1))
        boundaries[0] = -np.inf
        boundaries[-1] = np.inf

        # Group the observed and expected data based on the bucket boundaries
        observed_groups = observed.groupby(pd.cut(observed.index.values, boundaries)).count()
        expected_groups = expected.groupby(pd.cut(expected.index.values, boundaries)).count()

        # Calculate the observed and expected proportions for each group
        observed_proportions = observed_groups / observed.sum()
        expected_proportions = expected_groups / expected.sum()

        # Add missing buckets to expected proportions
        missing_buckets = observed_proportions.index.difference(expected_proportions.index)
        for bucket in missing_buckets:
            expected_proportions[bucket] = 0

        # Sort the data by the bucket boundaries
        observed_proportions.sort_index(inplace=True)
        expected_proportions.sort_index(inplace=True)

        # Calculate the PSI value
        psi_value = np.sum((observed_proportions - expected_proportions) * np.log(observed_proportions / expected_proportions)) * 100

        return psi_value
    else:
        return 0.0

We will now compute the PSI value for categorical features:

In [110]:
def psi_cat(ref_feature, new_feature):
    # Calculate distribution of actual and expected values
    actual_counts = ref_feature.value_counts(normalize=True, sort=False)
    expected_counts = new_feature.value_counts(normalize=True, sort=False)

    # Calculate the proportion of each distribution
    actual_prop = actual_counts / actual_counts.sum()
    expected_prop = expected_counts / expected_counts.sum()

    # Calculate the PSI value
    psi_value = ((expected_prop - actual_prop) * np.log(expected_prop / actual_prop)).sum()

    return psi_value

Combining everything:

If the PSI value is 0 then there is no change in distribution of the covariate between the two datasets. A PSI value of greater than 0 indicates that the distribution of the covariate has changed between the two datasets, with higher values indicating grater strength.

In [None]:
def calculate_psi(old_batch, current_batch):
    """
    calculate_psi(old_batch, current_batch) takes two sets of data and returns the features that may have covariate drift
    calculate_psi: DataFrame DataFrame -> Listof Str
    """
    drift_features = []

    for feature in old_batch.columns:
        df_1 = old_batch[feature]
        df_2 = current_batch[feature]

        if np.issubdtype(df_1.dtype, np.number):
            psi_threshold = 0.1 * abs(df_1.mean())
        else:
            psi_threshold = 0.1

        if np.issubdtype(df_1.dtype, np.number):
            stat = psi_numeric(df_1, df_2)

            # if stat > optimal_threshold:
            if stat > psi_threshold:
                drift_features.append(feature)
        else:
            stat = psi_cat(df_1, df_2)

            if stat > psi_threshold:
                drift_features.append(feature)
    return drift_features