# ДЗ_8: проверка изменений распределений еженедельных метрик (drift) на логах сервиса

## Метрики
Нужно 1 непрерывную и 1 дискретную (можно бинарную).

- **Непрерывная:** `share_high_week` = доля оценок ≥ 4 за неделю (в диапазоне [0, 1]).  
  Для пользователей без событий в неделю ставим 0 — так получаем метрику для всей аудитории.

- **Дискретная (бинарная):** `did_high_week` = 1, если за неделю была хотя бы 1 оценка ≥ 4, иначе 0.

## Drift-проверка
Сравниваем распределения **неделя 0 (база)** vs каждая следующая неделя:
- `share_high_week`: KS-test, Mann–Whitney, Wasserstein distance, PSI
- `did_high_week`: χ² / Fisher для 2×2 таблицы

Плюс: поправка на множественные сравнения (Holm).


**0. Импорт и настройки**


In [1]:
import numpy as np
import pandas as pd
from scipy import stats
import matplotlib.pyplot as plt

pd.set_option("display.max_columns", 50)
pd.set_option("display.width", 140)

RANDOM_STATE = 42
rng = np.random.default_rng(RANDOM_STATE)


**1. Загрузка логов MovieLens 10M + сэмплирование событий**

В Colab лучше не брать все 10M событий сразу: берём подвыборку (сохраняя большую аудиторию пользователей).


In [None]:
import io, zipfile, requests

MOVIELENS_URL = "https://files.grouplens.org/datasets/movielens/ml-10m.zip"

resp = requests.get(MOVIELENS_URL, stream=True)
resp.raise_for_status()

z = zipfile.ZipFile(io.BytesIO(resp.content))
names = z.namelist()

ratings_path = [p for p in names if p.endswith("/ratings.dat")][0]
with z.open(ratings_path) as f:
    ratings = pd.read_csv(
        f, sep="::", engine="python", header=None,
        names=["userId","movieId","rating","timestamp"],
        dtype={"userId":"int32","movieId":"int32","rating":"float32","timestamp":"int64"},
    )

N_EVENTS = 1_000_000
ratings = ratings.sample(n=N_EVENTS, random_state=RANDOM_STATE).reset_index(drop=True)

ratings.head(), ratings.shape


**2. Формируем недели (12 недель наблюдения)**

- `week = floor((timestamp - t0) / 7 дней)`
- оставляем недели `0..11` (12 недель ≈ 3 месяца)


In [None]:
t0 = int(ratings["timestamp"].min())
ratings["week"] = ((ratings["timestamp"] - t0) // (7 * 24 * 3600)).astype("int16")

W = 12
ratings = ratings[(ratings["week"] >= 0) & (ratings["week"] < W)].copy()

ratings["week"].value_counts().sort_index(), ratings.shape


**3. Еженедельные метрики на уровне user-week**

Считаем:
- `n_events_week` — сколько событий (оценок) в неделю
- `n_high_week` — сколько оценок ≥ 4
- `share_high_week = n_high_week / n_events_week` (если `n_events_week = 0` → 0)
- `did_high_week = 1(n_high_week > 0)` — бинарная

Чтобы сравнивать распределения по неделям для **одной и той же аудитории**, строим **полную сетку** `user × week` и заполняем пропуски нулями.


In [None]:
uw = ratings.groupby(["userId","week"], sort=False).agg(
    n_events_week=("movieId","size"),
    n_high_week=("rating", lambda x: int(np.sum(x >= 4.0))),
).reset_index()

users = uw["userId"].unique()
all_index = pd.MultiIndex.from_product([users, np.arange(W, dtype=int)], names=["userId","week"])

uw_full = uw.set_index(["userId","week"]).reindex(all_index).fillna(0).reset_index()
uw_full["n_events_week"] = uw_full["n_events_week"].astype("int32")
uw_full["n_high_week"] = uw_full["n_high_week"].astype("int32")

uw_full["share_high_week"] = np.where(
    uw_full["n_events_week"] > 0,
    uw_full["n_high_week"] / uw_full["n_events_week"],
    0.0
).astype("float32")

uw_full["did_high_week"] = (uw_full["n_high_week"] > 0).astype("int8")

uw_full.head(), uw_full.shape


**Sanity-check: размер аудитории**


In [None]:
uw_full["userId"].nunique(), uw_full["week"].nunique()


---

## Визуализация динамики метрик


In [None]:
weekly_summary = uw_full.groupby("week").agg(
    users=("userId","nunique"),
    mean_share=("share_high_week","mean"),
    median_share=("share_high_week","median"),
    p_did_high=("did_high_week","mean"),
    mean_events=("n_events_week","mean"),
).reset_index()

weekly_summary


In [None]:
plt.figure(figsize=(8,4))
plt.plot(weekly_summary["week"], weekly_summary["mean_share"], marker="o", label="mean(share_high_week)")
plt.plot(weekly_summary["week"], weekly_summary["median_share"], marker="o", label="median(share_high_week)")
plt.title("Непрерывная метрика: доля оценок >=4 по неделям")
plt.xlabel("week")
plt.ylabel("share_high_week")
plt.grid(True, alpha=0.3)
plt.legend()
plt.show()


In [None]:
plt.figure(figsize=(8,4))
plt.plot(weekly_summary["week"], weekly_summary["p_did_high"], marker="o")
plt.title("Бинарная метрика: доля пользователей с >=1 оценкой >=4")
plt.xlabel("week")
plt.ylabel("P(did_high_week=1)")
plt.grid(True, alpha=0.3)
plt.show()


---

## Проверка drift распределений: week 0 vs week w

- **share_high_week (непрерывная):** KS, Mann–Whitney, Wasserstein
- **did_high_week (бинарная):** χ² или Fisher (если ожидаемые частоты < 5)

Поправка на множественные сравнения: Holm.


In [None]:
from scipy.stats import wasserstein_distance

def holm_adjust(pvals):
    pvals = np.asarray(pvals, dtype=float)
    m = len(pvals)
    order = np.argsort(pvals)
    adj = np.empty(m, dtype=float)
    for k, idx in enumerate(order):
        adj[idx] = (m - k) * pvals[idx]
    # monotone in sorted order
    adj_sorted = np.minimum.accumulate(adj[order][::-1])[::-1]
    adj[order] = np.clip(adj_sorted, 0, 1)
    return adj

base_w = 0
base_cont = uw_full.loc[uw_full["week"]==base_w, "share_high_week"].to_numpy()
base_bin  = uw_full.loc[uw_full["week"]==base_w, "did_high_week"].to_numpy()

rows = []
for w in range(1, W):
    cur_cont = uw_full.loc[uw_full["week"]==w, "share_high_week"].to_numpy()
    cur_bin  = uw_full.loc[uw_full["week"]==w, "did_high_week"].to_numpy()

    ks = stats.ks_2samp(base_cont, cur_cont, alternative="two-sided", method="asymp")
    mw = stats.mannwhitneyu(base_cont, cur_cont, alternative="two-sided", method="asymptotic")
    wd = wasserstein_distance(base_cont, cur_cont)

    a1 = int(np.sum(base_bin==1)); a0 = int(np.sum(base_bin==0))
    b1 = int(np.sum(cur_bin==1));  b0 = int(np.sum(cur_bin==0))
    table = np.array([[a1, a0],[b1, b0]], dtype=int)

    expected = stats.contingency.expected_freq(table)
    if np.any(expected < 5):
        _, p_bin = stats.fisher_exact(table, alternative="two-sided")
        test_name = "Fisher"
    else:
        _, p_bin, _, _ = stats.chi2_contingency(table, correction=False)
        test_name = "Chi2"

    rows.append({
        "week": w,
        "KS_p": float(ks.pvalue),
        "MW_p": float(mw.pvalue),
        "Wasserstein": float(wd),
        "bin_test": test_name,
        "bin_p": float(p_bin),
        "p_base": float(np.mean(base_bin)),
        "p_week": float(np.mean(cur_bin)),
    })

res = pd.DataFrame(rows)
res["KS_p_holm"] = holm_adjust(res["KS_p"].values)
res["MW_p_holm"] = holm_adjust(res["MW_p"].values)
res["bin_p_holm"] = holm_adjust(res["bin_p"].values)

res


### Графики p-value (Holm) по неделям


In [None]:
plt.figure(figsize=(9,4))
plt.plot(res["week"], res["KS_p_holm"], marker="o", label="KS p (Holm)")
plt.plot(res["week"], res["MW_p_holm"], marker="o", label="MW p (Holm)")
plt.axhline(0.05, linestyle="--")
plt.title("Непрерывная метрика: p-value (Holm) vs week 0")
plt.xlabel("week")
plt.ylabel("adjusted p-value")
plt.grid(True, alpha=0.3)
plt.legend()
plt.show()


In [None]:
plt.figure(figsize=(9,4))
plt.plot(res["week"], res["bin_p_holm"], marker="o")
plt.axhline(0.05, linestyle="--")
plt.title("Бинарная метрика: p-value (Holm) vs week 0")
plt.xlabel("week")
plt.ylabel("adjusted p-value")
plt.grid(True, alpha=0.3)
plt.show()


## Дополнительная техника: PSI (Population Stability Index) для `share_high_week`

PSI считают по бинам: если метрика стала по-другому распределяться по “корзинам”, PSI растет.


In [None]:
def psi(expected, actual, bins=10, eps=1e-6):
    expected = np.asarray(expected, dtype=float)
    actual = np.asarray(actual, dtype=float)

    qs = np.quantile(expected, np.linspace(0, 1, bins+1))
    qs[0] = -np.inf
    qs[-1] = np.inf

    e_hist, _ = np.histogram(expected, bins=qs)
    a_hist, _ = np.histogram(actual, bins=qs)

    e_pct = e_hist / max(e_hist.sum(), 1)
    a_pct = a_hist / max(a_hist.sum(), 1)

    e_pct = np.clip(e_pct, eps, 1)
    a_pct = np.clip(a_pct, eps, 1)

    return float(np.sum((a_pct - e_pct) * np.log(a_pct / e_pct)))

psi_rows = []
for w in range(1, W):
    cur = uw_full.loc[uw_full["week"]==w, "share_high_week"].to_numpy()
    psi_rows.append({"week": w, "PSI": psi(base_cont, cur, bins=10)})

psi_df = pd.DataFrame(psi_rows)
psi_df


In [None]:
plt.figure(figsize=(8,4))
plt.plot(psi_df["week"], psi_df["PSI"], marker="o")
plt.title("PSI для share_high_week: week 0 vs week w")
plt.xlabel("week")
plt.ylabel("PSI")
plt.grid(True, alpha=0.3)
plt.show()


---

## Итоговые выводы

1) **Непрерывная метрика (`share_high_week`)**  
- если `KS_p_holm < 0.05` → распределение изменилось (в целом, CDF).  
- если `MW_p_holm < 0.05` → есть сдвиг распределения по рангам.  
- смотрим на **Wasserstein** и **PSI**, чтобы оценить **размер** изменений.

2) **Бинарная метрика (`did_high_week`)**  
- если `bin_p_holm < 0.05` → доля пользователей с событием (≥1 high) изменилась относительно базы.

3) При больших выборках p-value может быть маленьким даже при небольшом эффекте, поэтому в выводах обязательно упоминать **эффект** (PSI/Wasserstein/разница долей).
