# PSI

In [1]:
import numpy as np
import pandas as pd
pd.options.display.float_format = '{:,.4f}'.format

def _psi(expected: np.ndarray, actual: np.ndarray, bucket_type: str = "bins", n_bins: int = 10) -> float:
    """Calculate PSI metric for two arrays.
    
    Parameters
    ----------
        expected : list-like
            Array of expected values
        actual : list-like
            Array of actual values
        bucket_type : str
            Binning strategy. Accepts two options: 'bins' and 'quantiles'. Defaults to 'bins'.
            'bins': input arrays are splitted into bins with equal
                and fixed steps based on 'expected' array
            'quantiles': input arrays are binned according to 'expected' array
                with given number of n_bins
        n_bins : int
            Number of buckets for binning. Defaults to 10.

    Returns
    -------
        A single float number
    """
    breakpoints = np.arange(0, n_bins + 1) / (n_bins) * 100
    if bucket_type == "bins":
        breakpoints = np.histogram(expected, n_bins)[1]
    elif bucket_type == "quantiles":
        breakpoints = np.percentile(expected, breakpoints)

    # Calculate frequencies
    expected_percents = np.histogram(expected, breakpoints)[0] / len(expected)
    actual_percents = np.histogram(actual, breakpoints)[0] / len(actual)
    # Clip freaquencies to avoid zero division
    expected_percents = np.clip(expected_percents, a_min=0.0001, a_max=None)
    actual_percents = np.clip(actual_percents, a_min=0.0001, a_max=None)
    # Calculate PSI
    psi_value = (expected_percents - actual_percents) * np.log(expected_percents / actual_percents)
    psi_value = sum(psi_value)

    return psi_value


def calculate_psi(
        expected: np.ndarray, actual: np.ndarray, bucket_type: str = "bins", n_bins: int = 10, axis: int = 0
) -> np.ndarray:
    """Apply PSI calculation to 2 1-d or 2-d arrays.

    Parameters
    ----------
    expected : list-like
        Array of expected values
    actual : list-like
        Array of actual values
    bucket_type : str
        Binning strategy. Accepts two options: 'bins' and 'quantiles'. Defaults to 'bins'.
            'bins' - input arrays are splitted into bins with equal
                and fixed steps based on ’expected' array
            'quantiles' - input arrays are binned according to ’expected’ array
                with given number of n_bins
    n_bins : int
        Number of buckets for binning. Defaults to 10.

    Returns
    -------
        np.ndarray
    """
    if len(expected.shape) == 1:
        psi_values = np.empty(len(expected.shape))
    else:
        psi_values = np.empty(expected.shape[axis])

    for i in range(0, len(psi_values)):
        if len(psi_values) == 1:
            psi_values = _psi(expected, actual, bucket_type, n_bins)
        elif axis == 0:
            psi_values[i] = _psi(expected[:, i], actual[:, i], bucket_type, n_bins)
        elif axis == 1:
            psi_values[i] = _psi(expected[i, :], actual[i, :], bucket_type, n_bins)
        return np.array(psi_values)

In [5]:
df = pd.read_csv('IEA-EV-dataEV salesHistoricalCars.csv')

In [6]:
df

Unnamed: 0,region,category,parameter,mode,powertrain,year,unit,value
0,Australia,Historical,EV sales,Cars,BEV,2011,Vehicles,49.0000
1,Australia,Historical,EV stock share,Cars,EV,2011,percent,0.0004
2,Australia,Historical,EV sales share,Cars,EV,2011,percent,0.0065
3,Australia,Historical,EV stock,Cars,BEV,2011,Vehicles,49.0000
4,Australia,Historical,EV stock,Cars,BEV,2012,Vehicles,220.0000
...,...,...,...,...,...,...,...,...
3793,World,Historical,EV stock,Cars,BEV,2023,Vehicles,28000000.0000
3794,World,Historical,EV sales,Cars,BEV,2023,Vehicles,9500000.0000
3795,World,Historical,EV sales share,Cars,EV,2023,percent,18.0000
3796,World,Historical,EV stock share,Cars,EV,2023,percent,3.2000


In [13]:
df_expected = df[(df['year']==2022) & (df['parameter']=="EV sales")]['value']
df_expected.describe()

count         129.0000
mean      194,961.5891
std       818,227.6242
min             1.0000
25%           620.0000
50%         6,400.0000
75%        34,000.0000
max     7,300,000.0000
Name: value, dtype: float64

In [15]:
df_actual = df[(df['year']==2023) & (df['parameter']=="EV sales")]['value']
df_actual.describe()

count         125.0000
mean      264,790.9920
std     1,091,618.4093
min             1.0000
25%           810.0000
50%        10,000.0000
75%        63,000.0000
max     9,500,000.0000
Name: value, dtype: float64

In [16]:
calculate_psi(df_expected, df_actual)

array(0.14855845)

In [21]:
df_actual = np.where(df['value'] > 100000, 0, df['value'])
pd.DataFrame(df_actual).describe()

Unnamed: 0,0
count,3798.0
mean,6056.9186
std,16196.8627
min,0.0
25%,0.47
50%,46.0
75%,2200.0
max,100000.0


In [22]:
calculate_psi(df_expected, df_actual)

array(0.40838092)