<center> Data Drifts

# PSI visualization

### Принятые границы значений PSI:
- `< 0.1`: Нет значимого дрейфа (данные стабильны)
- `0.1 – 0.25`: Возможен дрейф
- `> 0.25`: Сильный дрейф

In [1]:
import numpy as np
from scipy.stats import skewnorm
import matplotlib.pyplot as plt
from matplotlib.axes import Axes
from ipywidgets import interact, FloatSlider, IntSlider, Layout, Dropdown
from typing import Literal

In [2]:
def feature_generator (mean_: float, std_: float, size: int, distribution: Literal["normal", "skewed"] = "normal", **kwargs):
    """ 
    Генератор ряда значений фичи
    """
    np.random.seed(0)
    if distribution == "normal":
        return np.random.normal(loc=mean_, scale=std_, size=size)
    
    elif distribution == "skewed":
        return np.array(skewnorm.rvs(a=kwargs.get("skew"), loc=mean_, scale=std_, size=size))

# Population Stability Index

In [3]:
class PSI:
    def __init__(
        self,
        distribution: Literal["normal", "skewed"],
        n_batches: int,
        train_mean: float,
        train_std: float,
        train_size: int,
        current_mean: float,
        current_std: float,
        current_size: int,
        **kwargs
    ):
        self.distribution = distribution
        self.n_batches = n_batches
        self.train_mean = train_mean
        self.train_std = train_std
        self.train_size = train_size
        self.current_mean = current_mean
        self.current_std = current_std
        self.current_size = current_size
        self.train = feature_generator (self.train_mean, self.train_std, self.train_size)
        self.current = feature_generator (self.current_mean, self.current_std, self.current_size, self.distribution, **kwargs)
        self.bins = np.linspace (
            start = min(self.train),
            stop = max(self.train),
            num = self.n_batches + 1
        )

        self.train_ratio = self.calculate_ratio (self.train, self.bins)
        self.current_ratio = self.calculate_ratio (self.current, self.bins)
        self.value = self.calculate_psi(self.train_ratio, self.current_ratio)

    def __call__(self) -> float:
        return self.value
    
    def calculate_ratio (self, data: np.ndarray, bins: np.ndarray) -> np.ndarray:
        """ 
        Вычисляет долю наблюдений в каждом бине с защитой от нулевых значений.
        Параметры:
        ----------
        - data: Одномерный массив данных, для которого нужно рассчитать доли вхождений по бинам.
        - bins: массив бинов (интервалов) для разбиения диапазона значений.

        Возвращает:
        ----------
        - массив нормированных долей по каждому бину, в котором нули заменены на 1e-6 (защита от логарифма нуля).
        """
        counts, _ = np.histogram(data, bins= bins, density=False)
        train_ratio = counts / len(data)
        zero_protected = np.clip(train_ratio, 1e-6, None)
        return zero_protected  

    @staticmethod
    def calculate_psi (train_ratio, current_ratio) -> float:
        """ 
        Векторный расчет PSI

        Параметры:
        ---------
        - train_ration: массив нормированных долей по каждому бину в ОБУЧАЮЩЕЙ ВЫБОРКЕ. Число элементов в массиве определено числом бинов
        - current_ratio: массив нормированных долей по каждому бину в ТЕКУЩЕЙ ВЫБОРКЕ. Число элементов в массиве определено числом бинов
        """
        psi_values = (train_ratio - current_ratio) * np.log(train_ratio / current_ratio)
        return np.sum(psi_values)



# Wasserstein distance

In [4]:
class Wasserstein:
    def __init__(
            self,
            distribution: Literal["normal", "skewed"],
            train_mean: float,
            train_std: float,
            train_size: int,
            current_mean: float,
            current_std: float,
            current_size: int,
            **kwargs
        ):
            self.distribution = distribution
            self.train_mean = train_mean
            self.train_std = train_std
            self.train_size = train_size
            self.current_mean = current_mean
            self.current_std = current_std
            self.current_size = current_size

            self.train = feature_generator (self.train_mean, self.train_std, self.train_size)
            self.current = feature_generator (self.current_mean, self.current_std, self.current_size, self.distribution, **kwargs)
            self.common_x = self.make_common_space()

            # Эмпирическая функция распределения -> ЕCDF
            self.train_iecdf = self.make_iecdf(self.train, self.common_x)
            self.current_iecdf = self.make_iecdf(self.current, self.common_x)
            self.value = self.wasserstein()
            self.value_norm = self.wasserstein_norm()
            
    def __call__(self) -> float:
        return self.value_norm
    
    def make_common_space (self):
        """ 
        Возвращает массив равномерно распределенных точек в диапазоне от 0 до 1.
        Число элементов в массиве соответствует наибольшей выборке (из двух) 
        """
        return np.linspace(0, 1, max(self.current_size, self.train_size))

    @staticmethod
    def make_iecdf (data: np.ndarray, common_x: np.ndarray):
        """ 
        Возвращает значения квантильной функции (inverse ECDF (empirical cumulative distribution function)).
        - В условиях нашей задачи это нужно, чтобы "достроить" недостающие значения обеим выборкам так, чтобы они легли на одну OX и полностью ее заполнили.
        - Это даст 2 IECDF, по которым можно будет посчитать интеграл разницы.

        Логика:
        - np.interp: функция интерполяции. Принимает:
            - x: массив значений аргумента, ДЛЯ КОТОРЫХ нужно вернуть значения функции
            - xp: массив значений аргумента, для которых ИЗВЕСТНЫ значения функции
            - fp: массив ИЗВЕСТНЫХ значений функции
        """
        return np.interp(
            x = common_x, 
            xp = np.linspace(0, 1, len(data), endpoint=False),
            fp = np.sort(data)
            )
        
    def wasserstein (self):
        """ 
        Благодаря тому, что на предыдущем шаге рассчитаны обратные CDF, теперь есть 2 массива квантилей от 0 до 1 с соответствующими значениями.
        Т.к. площадь считается на всем интервале вероятностей (от 0 до 1), то интегрирование сводится к рассчету среднего значения по OY, 
            что в приближении равно просто среднему значению разницы квантильных функций двух распределений. 
        """
        return np.mean(np.abs(self.train_iecdf - self.current_iecdf))

    def wasserstein_norm (self):
        """ 
        Нормализует расстояние по диапазону значений фичи
        """
        scale = max(self.train.max(), self.current.max()) - min(self.train.min(), self.current.min())
        return self.value / scale if scale else 0.0

# Функция визуализации

In [15]:

class Visualizer:
    def __init__(self, **kwargs):
        self.psi = PSI(**kwargs)
        self.wass = Wasserstein(**kwargs) 

    def plot_psi(self, ax: Axes):
        psi_value = self.psi()
        ax.bar(["PSI"], [psi_value], color='skyblue')
        ax.set_ylim(-0.5, 50)
        ax.axhline(0.25, color='red', linestyle='--', label='PSI = 0.25 - порог значительного дрифта')
        ax.text(0, psi_value + 0.2, f'{psi_value:.4f}', ha='center', va='bottom', fontsize=12, fontweight='bold', color='black')
        ax.set_title("PSI (Population Stability Index)")
        ax.set_ylim(0, 20)
        ax.legend()

    def plot_psi_bins (self, ax: Axes):
        bin_centers = (self.psi.bins[:-1] + self.psi.bins[1:]) / 2
        width = (self.psi.bins[1] - self.psi.bins[0]) * 0.4  # ширина столбиков
        ax.bar(bin_centers - width / 2, self.psi.train_ratio, width=width, label='Train', alpha=0.7, color='skyblue')
        ax.bar(bin_centers + width / 2, self.psi.current_ratio, width=width, label='Current', alpha=0.7, color='orange')
        ax.set_title("Доли по бинам (для PSI)")
        ax.set_xlabel("Значения признака (по бинам)")
        ax.set_ylabel("Доля наблюдений")
        ax.grid(True, linestyle='--', alpha=0.3)
        ax.legend()

    def plot_wasserstein (self,  ax: Axes):
        value = self.wass()
        ax.bar(["W1"], [value], color='lightgreen')
        ax.axhline(0, color='gray', linestyle='--')
        ax.axhline(0.1, color='red', linestyle='--', label='EMD = 0.1 - порог значительного дрифта')
        ax.text(0, value + 0.02, f'{value:.4f}', ha='center', va='bottom', fontsize=12, fontweight='bold', color='black')
        ax.set_title("W1 (Wasserstein distance) / EMD (Earth Mover's Distance)")
        ax.set_ylim(0, 1)
        ax.legend()
    
    def plot_wasserstein_geom (self, ax: Axes,**kwargs):       
        ax.plot(self.wass.common_x, self.wass.train_iecdf, label='Train IECDF', color='blue')
        ax.plot(self.wass.common_x, self.wass.current_iecdf, label='Current IECDF', color='orange')
        ax.fill_between(self.wass.common_x, self.wass.train_iecdf, self.wass.current_iecdf, color='lightcoral', alpha=0.4, label='|Train - Current|')
        ax.set_title("Разность IECDF двух выборок")
        ax.set_xlabel("Probability (quantiles)")
        ax.set_ylabel("Feature value")
        ax.grid(True)
        ax.legend()


    def visualize (self, **kwargs):
        """ 
        Функция визуализации
        """
        _, axs = plt.subplots(2, 2, figsize=(20, 8), gridspec_kw={"width_ratios": [1, 2]})

        # Верхний ряд: PSI
        ax1, ax2 = axs[0]
        self.plot_psi(ax1)
        # self.plot_psi_dependency(ax2, **kwargs)
        self.plot_psi_bins(ax2)

        # Нижний ряд: Wasserstein
        ax3, ax4 = axs[1]
        self.plot_wasserstein(ax3)
        self.plot_wasserstein_geom(ax4)

        plt.tight_layout()
        plt.show()


In [16]:
l = Layout(width='1000px')
style = {'description_width': '200px'}

def run_visualizer (**kwargs):
    viz = Visualizer(**kwargs)
    return viz.visualize(**kwargs)

interact(
    run_visualizer,
    distribution = Dropdown(options=['normal', 'skewed'],value='normal', description='Распределение:'),
    skew = IntSlider(value=10, min=-10, max=10, step=1, description='Смещение распределения', layout = l, style = style),

    n_batches = IntSlider(value=10, min=5, max=100, step=1, description='ЧИСЛО БИНОВ', layout = l, style = style),
    
    train_size = IntSlider(value=1000, min=1000, max=100000, step=100, description='TRAIN: размер выборки', layout = l, style = style),
    current_size = IntSlider(value=1000, min=1000, max=100000, step=100, description='CURRENT: размер выборки', layout = l, style = style),
    
    train_mean = FloatSlider(value=1.0, min=0, max=100.0, step=1.0, description='TRAIN: среднее', layout = l, style = style),
    current_mean = FloatSlider(value=1.0, min=0, max=100.0, step=1.0, description='CURRENT: среднее', layout = l, style = style),
    
    train_std = FloatSlider(value=1.0, min=0.01, max=10.0, step=0.1, description='TRAIN: стандартное отклонение', layout = l, style = style),
    current_std = FloatSlider(value=1.0, min=0.01, max=10.0, step=0.1, description='CURRENT: стандартное отклонение', layout = l, style = style),
)


interactive(children=(Dropdown(description='Распределение:', options=('normal', 'skewed'), value='normal'), In…

<function __main__.run_visualizer(**kwargs)>

# DEV