In [7]:
import numpy as np


def _psi(expected: np.ndarray, actual: np.ndarray, bucket_type: str = "bins", n_bins: int = 10) -> float:
    """Calculate PSI metric for two arrays.
    
    Parameters
    ----------
        expected : list-like
            Array of expected values
        actual : list-like
            Array of actual values
        bucket_type : str
            Binning strategy. Accepts two options: 'bins' and 'quantiles'. Defaults to 'bins'.
            'bins': input arrays are splitted into bins with equal
                and fixed steps based on 'expected' array
            'quantiles': input arrays are binned according to 'expected' array
                with given number of n_bins
        n_bins : int
            Number of buckets for binning. Defaults to 10.

    Returns
    -------
        A single float number
    """
    breakpoints = np.arange(0, n_bins + 1) / (n_bins) * 100
    if bucket_type == "bins":
        breakpoints = np.histogram(expected, n_bins)[1]
    elif bucket_type == "quantiles":
        breakpoints = np.percentile(expected, breakpoints)

    # Calculate frequencies
    expected_percents = np.histogram(expected, breakpoints)[0] / len(expected)
    actual_percents = np.histogram(actual, breakpoints)[0] / len(actual)
    # Clip freaquencies to avoid zero division
    expected_percents = np.clip(expected_percents, a_min=0.0001, a_max=None)
    actual_percents = np.clip(actual_percents, a_min=0.0001, a_max=None)
    # Calculate PSI
    psi_value = (expected_percents - actual_percents) * np.log(expected_percents / actual_percents)
    psi_value = sum(psi_value)

    return psi_value


def calculate_psi(
        expected: np.ndarray, actual: np.ndarray, bucket_type: str = "bins", n_bins: int = 10, axis: int = 0
) -> np.ndarray:
    if len(expected.shape) == 1:
        return _psi(expected, actual, bucket_type, n_bins)
    if axis not in (0, 1):
        raise ValueError("axis must be 0 or 1")
    n_features = expected.shape[1] if axis == 0 else expected.shape[0]
    psi_values = np.empty(n_features)
    for i in range(n_features):
        if axis == 0:
            exp_feature = expected[:, i]
            act_feature = actual[:, i]
        else:
            exp_feature = expected[i, :]
            act_feature = actual[i, :]
        psi_values[i] = _psi(exp_feature, act_feature, bucket_type, n_bins)
    return psi_values
    
np.random.seed(44)
SAMPLE_SIZE = 100
data_control = -np.random.normal(1, 1, SAMPLE_SIZE)
data_pilot = -np.random.normal(1.2, 1, SAMPLE_SIZE)
data_pilot_anomalous = np.concatenate([data_pilot, [10.0, 15.0, -5.0, 20.0, -15.0]])
psi_anomalous = calculate_psi(data_control, data_pilot_anomalous, bucket_type="bins", n_bins=10, axis=0)
print("PSI with anomalies:", psi_anomalous)
print(np.isclose(calculate_psi(data_control, data_pilot, bucket_type="bins", n_bins=10, axis=0), 0.2315847887596773))


PSI with anomalies: 0.23422282715385404
True
