# Снижение дисперсии на реальных данных

## 📋 Оглавление

1. [**Загрузка данных**](#DATA) - подготовка датасета для эксперимента
2. [**Модели снижения дисперсии**](#model):
   - [AutoCUPAC](#autocupac) - наш улучшенный метод с автоматическим выбором признаков
3. [**Результаты**](#RESULTS) - сводная таблица с показателями снижения дисперсии
4. [**Размер выборки для пилота**](#pilot-sample) - расчет необходимого количества участников
5. [**Резюме**](#resume) - итоговые результаты и выводы

## DATA
Используем реальные данные для демонстрации эффективности различных методов снижения дисперсии.

In [None]:
import pandas as pd
pd.set_option(
    'display.max_rows', None,
    'display.max_columns', None,
    'display.float_format', '{:.2f}'.format
)
import numpy as np
import time
from functools import wraps

In [None]:
df.head()

In [23]:
# Назначаем treatment случайным образом
df['treatment_flg'] = np.random.choice([0, 1], size=df.shape[0])

Необходимо заполнить `имена соответсвующих колонок` как снизу в примере

In [24]:
CUPAC_FEATURE = df.filter(like='_lag').columns.tolist() 
TREATMENT = 'treatment_flg'
TARGET = 'cltv'

In [25]:
GLOBAL_RESULTS = pd.DataFrame(columns=[
    "experiment", "df",
    "control mean", "test mean", "control means diff", "test means diff",
    "var_red (%)", "execution time (sec)"
])

def log_execution_time(func):
    @wraps(func)
    def wrapper(self, df, *args, **kwargs):
        start = time.time()
        result = func(self, df, *args, **kwargs)
        self.execution_time = time.time() - start
        return result
    return wrapper

<a id="model"></a>
## Модель

<a id="autocupac"></a>
### AutoCUPAC

автоматический выбор признаков и модели для максимального снижения дисперсии. Использует ML-подходы для оптимальной комбинации контрольных переменных.

In [26]:
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from catboost import CatBoostRegressor
from sklearn.model_selection import KFold
from sklearn.metrics import r2_score
from typing import Optional, Dict, List, Union


class CUPACTransformer:
    """
    Улучшенный CUPAC трансформер с расширенной отчетностью
    """

    def __init__(
        self,
        target_col: str,
        lag_suffix: str = "_lag",
        target_counterfactual_suffix="0",
        models: Optional[Dict] = None,
        n_folds: int = 5,
        random_state: Optional[int] = None,
    ):
        self.target_col = target_col
        self.target_counterfactual_suffix = target_counterfactual_suffix
        self.lag_suffix = lag_suffix
        self.n_folds = n_folds
        self.random_state = random_state

        self.models = models or {
            "Linear": LinearRegression(),
            "Ridge": Ridge(alpha=0.5),
            "Lasso": Lasso(alpha=0.01, max_iter=10000),
            "CatBoost": CatBoostRegressor(
                iterations=100,
                depth=4,
                learning_rate=0.1,
                silent=True,
                random_state=random_state,
                allow_writing_files=False,
            ),
        }

        self.best_model = None
        self.best_model_name = None
        self.best_score = -np.inf
        self.variance_reduction = None
        self.lag_features = None
        self.current_features = None
        self.is_fitted = False
        self.model_results_ = {}
        self.feature_importances_ = None

    def _prepare_train_data(self, df: pd.DataFrame) -> tuple:
        """Подготовка данных для обучения"""
        target_counterfactual_name = (
            f"{self.target_col}{self.target_counterfactual_suffix}{self.lag_suffix}"
        )

        self.lag_features = [
            col
            for col in df.columns
            if col.endswith(self.lag_suffix)
            and col != f"{self.target_col}{self.lag_suffix}"
        ]

        if not self.lag_features:
            raise ValueError("Не найдены лаговые признаки для обучения")

        self.current_features = [
            col.replace(self.lag_suffix, "") for col in self.lag_features
        ]

        self.current_features.append(f"{target_counterfactual_name}_1")

        return df[self.lag_features], df[f"{target_counterfactual_name}_1"]

    def _prepare_inference_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """Подготовка данных для применения"""
        if not self.current_features:
            raise RuntimeError("Сначала обучите модель (fit())")

        missing = [col for col in self.current_features if col not in df.columns]
        if missing:
            raise ValueError(f"Отсутствуют признаки: {missing}")

        self.current_features = ['X1_lag', 'X2_lag']

        return df[self.current_features].rename(
            columns=dict(zip(self.current_features, self.lag_features))
        )

    def _calculate_variance_reduction(self, y: pd.Series, pred: pd.Series) -> float:
        """Расчет снижения дисперсии"""
        pred_centered = pred - pred.mean()
        if pred_centered.var() < 1e-10:
            return 0.0

        theta = np.cov(y, pred_centered)[0, 1] / pred_centered.var()
        y_adj = y - theta * pred_centered
        return max(0, (1 - y_adj.var() / y.var()) * 100)

    def fit(self, df: pd.DataFrame) -> "CUPACTransformer":
        """Обучение модели на исторических данных"""
        X, y = df[CUPAC_FEATURE], df[TARGET]

        kf = KFold(n_splits=self.n_folds, shuffle=True, random_state=self.random_state)
        results = {}

        for name, model in self.models.items():
            fold_scores = []
            fold_var_reductions = []
            status = "success"

            try:
                for train_idx, val_idx in kf.split(X):
                    X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
                    y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

                    if name == "CatBoost":
                        m = CatBoostRegressor(**model.get_params())
                        m.fit(X_train, y_train, verbose=False)
                    else:
                        m = model.__class__(**model.get_params())
                        m.fit(X_train, y_train)

                    pred = m.predict(X_val)
                    fold_scores.append(r2_score(y_val, pred))
                    fold_var_reductions.append(
                        self._calculate_variance_reduction(y_val, pred)
                    )

                results[name] = {
                    "r2": np.nanmean(fold_scores),
                    "var_reduction": np.nanmean(fold_var_reductions),
                    "status": status,
                }

            except Exception as e:
                error_msg = f"{type(e).__name__}: {str(e)}"
                results[name] = {
                    "r2": None,
                    "var_reduction": None,
                    "status": f"failed: {error_msg}",
                }
                print(f"Ошибка в {name}: {error_msg}")

        self.model_results_ = results

        successful_models = {
            k: v for k, v in results.items() if v["status"] == "success"
        }
        if not successful_models:
            raise RuntimeError("Все модели завершились с ошибкой")

        self.best_model_name = max(
            successful_models, key=lambda x: successful_models[x]["var_reduction"]
        )
        self.best_score = successful_models[self.best_model_name]["r2"]
        self.variance_reduction = successful_models[self.best_model_name][
            "var_reduction"
        ]

        X, y = df[CUPAC_FEATURE], df[TARGET]
        best_model_params = self.models[self.best_model_name].get_params()

        if self.best_model_name == "CatBoost":
            self.best_model = CatBoostRegressor(**best_model_params)
            self.best_model.fit(X, y, verbose=False)
            self.feature_importances_ = dict(
                zip(X.columns, self.best_model.get_feature_importance())
            )
        else:
            self.best_model = self.models[self.best_model_name].__class__(
                **best_model_params
            )
            self.best_model.fit(X, y)
            if hasattr(self.best_model, "coef_"):
                self.feature_importances_ = dict(zip(X.columns, self.best_model.coef_))
            else:
                self.feature_importances_ = None

        self.is_fitted = True
        return self

    def transform(self, df: pd.DataFrame, inplace: bool = False) -> pd.DataFrame:
        """Применение модели к новым данным"""
        if not self.is_fitted:
            raise RuntimeError("Сначала вызовите fit()")

        X, y = df[CUPAC_FEATURE], df[TARGET]
        pred = self.best_model.predict(X)

        y_adj = y - pred + y.mean()

        if inplace:
            df[f"{self.target_col}_cupac"] = y_adj
            return df
        return df.assign(**{f"{self.target_col}_cupac": y_adj})

    def get_report(self) -> str:
        """Генерация расширенного отчета"""
        if not self.is_fitted:
            return "Модель не обучена. Сначала вызовите fit()."

        sorted_features = (
            sorted(
                self.feature_importances_.items(), key=lambda x: abs(x[1]), reverse=True
            )[:10]
            if self.feature_importances_
            else []
        )

        model_comparison = []
        for name, data in self.model_results_.items():
            if data["status"] != "success":
                line = f"{name}: {data['status']}"
            else:
                line = (
                    f"{name}: R²={data['r2']:.3f}, "
                    f"Var.Red.={data['var_reduction']:.1f}%"
                )
            model_comparison.append(line)

        feature_analysis = []
        if sorted_features:
            max_coef = max(abs(v) for _, v in sorted_features)
            for feat, coef in sorted_features:
                rel_impact = abs(coef) / max_coef if max_coef != 0 else 0
                feature_analysis.append(
                    f"- {feat:<25} {coef:>7.3f} {'▇'*int(10*rel_impact)}"
                )

        report = [
            "Расширенный CUPAC Report",
            "=" * 40,
            "Сравнение моделей:",
            *model_comparison,
            "",
            f"Лучшая модель: {self.best_model_name}",
            f"Снижение дисперсии: {self.variance_reduction:.1f}%",
            f"Качество предсказания (R²): {self.best_score:.3f}",
            "",
            "Топ-10 значимых признаков:",
            *(
                feature_analysis
                if feature_analysis
                else ["Нет данных о важности признаков"]
            ),
            "",
            "Интерпретация:",
            "▇▇▇▇▇▇▇▇▇▇ - максимальное влияние",
            "Коэффициенты > 0: положительная связь с целевой переменной",
            "Коэффициенты < 0: отрицательная связь",
        ]
        return "\n".join(report)

    def fit_transform(
        self,
        df_train: pd.DataFrame,
        df_apply: Optional[pd.DataFrame] = None,
        inplace: bool = False,
    ) -> pd.DataFrame:
        self.fit(df_train)
        df_apply = df_train if df_apply is None else df_apply
        return self.transform(df_apply, inplace=inplace)

    def get_feature_mapping(self) -> Dict[str, str]:
        return dict(zip(self.lag_features, self.current_features))

In [27]:
class Experiment:
    name = None
    counter = 1
    transformated_column = None
    execution_time = None

    def __init__(self):
        self.name = 'BaseLine'

    @log_execution_time
    def execute(self, df: pd.DataFrame):
        self.transformated_column = df[TARGET]

    def save_result(self, df: pd.DataFrame):
        global GLOBAL_RESULTS
        if self.transformated_column is None:
            return
        
        df = df.copy()
        df['y_transform'] = self.transformated_column
        control = df[df[TREATMENT] == 0]
        test = df[df[TREATMENT] == 1]

        control_mean_original = control[TARGET].mean()
        test_mean_original = test[TARGET].mean()

        control_mean_transformed = control['y_transform'].mean()
        test_mean_transformed = test['y_transform'].mean()

        var_original = df[TARGET].var()
        var_transformed = df['y_transform'].var()

        var_reduction = (var_original - var_transformed) / var_original if var_original != 0 else None

        control_means_diff = control_mean_transformed - control_mean_original
        test_means_diff = test_mean_transformed - test_mean_original

        summary = {
            "experiment": self.name,
            "df": f"df{self.counter}",
            "control mean": control_mean_original,
            "test mean": test_mean_original,
            "var_red (%)": round(var_reduction * 100, 2) if var_reduction is not None else None,
            "control means diff": control_means_diff,
            "test means diff": test_means_diff,
            "execution time (sec)": self.execution_time
        }

        GLOBAL_RESULTS = pd.concat([GLOBAL_RESULTS, pd.DataFrame([summary])], ignore_index=True)
        self.counter += 1
    
    def execute_and_save(self, df):
        self.execute(df)
        self.save_result(df)

In [28]:
class AutoCupacExperiment(Experiment):
    def __init__(self):
        self.name = 'AutoCupac'
        self.report = None

    @log_execution_time
    def execute(self, df: pd.DataFrame):
        transformer = CUPACTransformer(target_col=TARGET)
        transformer.fit(df)
        transformed_data = transformer.transform(df)
        
        self.report = transformer.get_report()
        self.transformated_column = transformed_data[f'{TARGET}_cupac']

## RESULTS

**Сводная таблица результатов** - эффективность метода CUPAC по снижению дисперсии и времени выполнения.

In [None]:
experiment = AutoCupacExperiment()
experiment.execute_and_save(df)

In [30]:
GLOBAL_RESULTS

Unnamed: 0,experiment,df,control mean,test mean,control means diff,test means diff,var_red (%),execution time (sec)
0,AutoCupac,df1,31379.96,31428.34,-62.45,64.16,34.12,9573.43


In [31]:
print(experiment.report)

Расширенный CUPAC Report
Сравнение моделей:
Linear: R²=0.130, Var.Red.=16.7%
Ridge: R²=0.038, Var.Red.=12.9%
Lasso: R²=0.192, Var.Red.=21.8%
CatBoost: R²=0.286, Var.Red.=28.6%

Лучшая модель: CatBoost
Снижение дисперсии: 28.6%
Качество предсказания (R²): 0.286

Топ-10 значимых признаков:
- cltv_lag2                  22.091 ▇▇▇▇▇▇▇▇▇▇
- cltv_lag                   18.753 ▇▇▇▇▇▇▇▇
- rsdl_npv_lag                9.276 ▇▇▇▇
- rsdl_npv_lag2               6.924 ▇▇▇
- avg_pos_cc_lag2             5.797 ▇▇
- avg_pos_cc_lag              3.918 ▇
- avg_pl_broker_service_lag   2.738 ▇
- avg_pl_broker_service_lag2   2.605 ▇
- lifecycle_segment_lag_11. полный отток   2.237 ▇
- sum_pl_val_lag2             1.888 

Интерпретация:
▇▇▇▇▇▇▇▇▇▇ - максимальное влияние
Коэффициенты > 0: положительная связь с целевой переменной
Коэффициенты < 0: отрицательная связь


<a id="pilot-sample"></a>
## Сколько клиентов нужно для пилота?

In [116]:
from scipy import stats

In [113]:
def calculate_sample_size(
    baseline: float,
    std: float,
    mde: float = 0.01, 
    alpha: float = 0.05, 
    beta: float = 0.2, 
    s: float = 0.5
):
    """
    Расчёт требуемого размера выборки при заданных уровнях значимости (alpha) и мощности (1-beta)
    
    :std: Стандартное отклонение целевой метрики в генеральной совокупности
    :baseline: Среднее значение метрики в генеральной совокупности в предпилотный период
    :mde: Минимальный ожидаемый эффект пилота в процентах от baseline-среднего
    :alpha: Уровень значимости (вероятность ошибка первого рода)
    :beta: Вероятность ошибки второго рода
    
    :return: Размер выборки
    """
    
    # Расчёт размера выборки по формуле для t-критерия
    sample_size = (stats.norm.ppf(1 - alpha/2) + stats.norm.ppf(1 - beta))**2*(std**2/s + std**2/(1-s))/((mde*baseline)**2)
    
    return np.ceil(sample_size).astype(int)

In [None]:
# кол-во клиентов до применения cupac
calculate_sample_size(
    baseline=df_final['cltv'].mean(),
    std=df_final['cltv'].std(),
    mde=0.01, 
    alpha=0.05, 
    beta=0.2, 
    s=0.5
)

2943926

In [121]:
# кол-во клиентов после применением cupac
calculate_sample_size(
    baseline=df_final['cltv_cupac'].mean(),
    std=df_final['cltv_cupac'].std(),
    mde=0.01, 
    alpha=0.05, 
    beta=0.2, 
    s=0.5
)

1939389

<a id="resume"></a>
## Резюме
### 📊 Снижение дисперсии

**Итоговое снижение дисперсии CUPAC – 34%** 

**Значительно лучше классического CUPED (-9%)**

### 👥 Влияние на размер выборки для пилота

| Метод | Размер выборки | Снижение |
|-------|---------------|----------|
| **Без применения методов снижения дисперсии** | 2943926 | - |
| **С применением CUPAC** | 1939389 | **34%** |