# Part 2D - CUPED with heavy tails and outliers (ARPU)

This notebook focuses on a very common real-world issue: **ARPU / revenue is heavy-tailed**.

When revenue has outliers, simple t-tests on raw ARPU may behave poorly:
- variance becomes huge,
- confidence intervals are wide,
- empirical power is low at realistic sample sizes.

We compare several practical approaches and show how they interact with CUPED:

- **Raw ARPU**
- **Winsorized ARPU** (cap extreme values)
- **Log-ARPU** (`log1p(revenue)` then interpret in log-units)
- **Trimmed mean ARPU** (remove a small fraction of extremes)

For each approach, we compare:
- base (no CUPED)
- CUPED using pre-period revenue as a covariate

Outputs:
- estimated CUPED theta and variance reduction
- CI widths and p-values
- empirical power / type I error in a small simulation loop


## 0) Imports and setup

Expected repo layout:
- notebooks/
- src/tecore/
- data/synthetic/

If you run in DataLore without the repository mounted, create local modules:
- `tecore/simulate.py`
- `tecore/variance_reduction.py`
(as you already do with `%%writefile`).


In [None]:
import sys
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats

ALPHA = 0.05
RNG = np.random.default_rng(123)

# If running from repo notebooks/, this makes `src/` importable:
repo_root = Path("..").resolve()
src_path = (repo_root / "src").resolve()
if (src_path / "tecore").exists() and str(src_path) not in sys.path:
    sys.path.insert(0, str(src_path))

from tecore.simulate import SyntheticB2CConfig, generate_user_level_data
from tecore.variance_reduction import cuped_split_adjust


## 1) Generate a heavy-tailed dataset

We intentionally increase tail heaviness by:
- increasing `revps_lognorm_sigma`
- increasing `high_spender_share`
- increasing `high_revps_multiplier`

We keep the dataset user-level and aggregated over the post period.


In [None]:
cfg = SyntheticB2CConfig(
    n_users=80_000,
    seed=42,
    effect_type="mixed",
    lift_sessions=0.03,
    lift_rev_per_session=0.03,
    include_binary=False,

    # Make tails heavier:
    revps_lognorm_sigma=1.35,
    high_spender_share=0.05,
    high_revps_multiplier=10.0,

    # Keep decent pre/post correlation for CUPED to work:
    monetization_latent_sigma=0.8,
)

df = generate_user_level_data(cfg)
df.shape, df.head()

## 2) Quick distribution check

We look at:
- histogram of revenue (clipped for visibility)
- log-histogram
- a simple boxplot (showing how extreme the tail is)


In [None]:
rev = df["revenue"].to_numpy(dtype=float)

p99 = np.quantile(rev, 0.99)
p999 = np.quantile(rev, 0.999)

fig = plt.figure(figsize=(10, 4))
plt.hist(np.clip(rev, 0, p99), bins=60)
plt.title("Revenue per user (clipped at p99 for visibility)")
plt.xlabel("revenue")
plt.ylabel("count")
plt.tight_layout()
plt.show()

plt.figure(figsize=(10, 4))
plt.hist(np.log1p(rev), bins=60)
plt.title("log1p(revenue) distribution")
plt.xlabel("log1p(revenue)")
plt.ylabel("count")
plt.tight_layout()
plt.show()

plt.figure(figsize=(8, 3))
plt.boxplot(np.clip(rev, 0, p999), vert=False, showfliers=False)
plt.title("Revenue per user (clipped at p99.9)")
plt.xlabel("revenue")
plt.tight_layout()
plt.show()

p99, p999

## 3) Utility functions

We implement:
- Welch t-test effect + CI
- winsorization and trimming helpers
- a unified runner: base vs CUPED for a chosen transformation


In [None]:
def split_groups(df: pd.DataFrame):
    c = df[df["group"] == "control"].copy()
    t = df[df["group"] == "test"].copy()
    return c, t


def welch_ttest_effect(y_c: np.ndarray, y_t: np.ndarray, alpha: float = ALPHA):
    stat, pval = stats.ttest_ind(y_t, y_c, equal_var=False)

    eff = float(np.mean(y_t) - np.mean(y_c))
    se = float(np.sqrt(np.var(y_t, ddof=1)/len(y_t) + np.var(y_c, ddof=1)/len(y_c)))
    z = stats.norm.ppf(1 - alpha/2)
    ci = (eff - z*se, eff + z*se)

    return {"effect": eff, "ci_low": ci[0], "ci_high": ci[1], "p_value": float(pval), "se": se}


def winsorize(x: np.ndarray, lower_q: float = 0.0, upper_q: float = 0.99) -> np.ndarray:
    x = np.asarray(x, dtype=float)
    lo = np.quantile(x, lower_q)
    hi = np.quantile(x, upper_q)
    return np.clip(x, lo, hi)


def trim_mask(x: np.ndarray, lower_q: float = 0.0, upper_q: float = 0.99) -> np.ndarray:
    x = np.asarray(x, dtype=float)
    lo = np.quantile(x, lower_q)
    hi = np.quantile(x, upper_q)
    return (x >= lo) & (x <= hi)


def run_base_vs_cuped(
    y_c: np.ndarray,
    y_t: np.ndarray,
    x_c: np.ndarray,
    x_t: np.ndarray,
    alpha: float = ALPHA,
):
    base = welch_ttest_effect(y_c, y_t, alpha=alpha)
    cup_c, cup_t = cuped_split_adjust(y_c, x_c, y_t, x_t)
    adj = welch_ttest_effect(cup_c.y_adj, cup_t.y_adj, alpha=alpha)

    return {
        "base": base,
        "cuped": {**adj, "theta": cup_c.theta, "var_reduction_control": cup_c.var_reduction},
    }

## 4) Compare approaches for ARPU

Covariate for CUPED:
- pre-period revenue (`revenue_pre`)

Approaches:
1) raw revenue (ARPU)
2) winsorized revenue (cap at p99)
3) log1p(revenue)
4) trimmed mean (drop top 1%)


In [None]:
c, t = split_groups(df)

y_c_raw = c["revenue"].to_numpy(dtype=float)
y_t_raw = t["revenue"].to_numpy(dtype=float)
x_c_raw = c["revenue_pre"].to_numpy(dtype=float)
x_t_raw = t["revenue_pre"].to_numpy(dtype=float)

# 1) Raw
res_raw = run_base_vs_cuped(y_c_raw, y_t_raw, x_c_raw, x_t_raw)

# 2) Winsorized at p99 (apply caps independently per full sample for simplicity)
y_all_win = winsorize(df["revenue"].to_numpy(dtype=float), upper_q=0.99)
x_all_win = winsorize(df["revenue_pre"].to_numpy(dtype=float), upper_q=0.99)

y_c_win = y_all_win[df["group"].to_numpy() == "control"]
y_t_win = y_all_win[df["group"].to_numpy() == "test"]
x_c_win = x_all_win[df["group"].to_numpy() == "control"]
x_t_win = x_all_win[df["group"].to_numpy() == "test"]
res_win = run_base_vs_cuped(y_c_win, y_t_win, x_c_win, x_t_win)

# 3) log1p
y_c_log = np.log1p(y_c_raw); y_t_log = np.log1p(y_t_raw)
x_c_log = np.log1p(x_c_raw); x_t_log = np.log1p(x_t_raw)
res_log = run_base_vs_cuped(y_c_log, y_t_log, x_c_log, x_t_log)

# 4) Trimmed (drop top 1% based on each period separately; keep only users within caps)
mask_y = trim_mask(df["revenue"].to_numpy(dtype=float), upper_q=0.99)
mask_x = trim_mask(df["revenue_pre"].to_numpy(dtype=float), upper_q=0.99)
mask = mask_y & mask_x

df_trim = df.loc[mask].copy()
c2, t2 = split_groups(df_trim)

res_trim = run_base_vs_cuped(
    c2["revenue"].to_numpy(dtype=float),
    t2["revenue"].to_numpy(dtype=float),
    c2["revenue_pre"].to_numpy(dtype=float),
    t2["revenue_pre"].to_numpy(dtype=float),
)

def to_row(name: str, res: dict):
    return [
        {"approach": name, "variant": "base", **res["base"]},
        {"approach": name, "variant": "CUPED", **res["cuped"]},
    ]

rows = []
rows += to_row("raw_ARPU", res_raw)
rows += to_row("winsorized_p99", res_win)
rows += to_row("log1p_ARPU", res_log)
rows += to_row("trimmed_top1pct", res_trim)

out = pd.DataFrame(rows)
out

## 5) CI width comparison

A quick way to communicate practical impact is CI width reduction.


In [None]:
def ci_width(ci_low: float, ci_high: float) -> float:
    return float(ci_high - ci_low)

wide = []
for appr in out["approach"].unique():
    base = out[(out["approach"] == appr) & (out["variant"] == "base")].iloc[0]
    cup = out[(out["approach"] == appr) & (out["variant"] == "CUPED")].iloc[0]
    w0 = ci_width(base["ci_low"], base["ci_high"])
    w1 = ci_width(cup["ci_low"], cup["ci_high"])
    wide.append({"approach": appr, "ci_width_base": w0, "ci_width_cuped": w1, "reduction": 1 - w1 / w0})

ci_tbl = pd.DataFrame(wide).sort_values("reduction", ascending=False)
ci_tbl

In [None]:
plt.figure(figsize=(10, 4))
x = np.arange(len(ci_tbl))
plt.bar(x - 0.2, ci_tbl["ci_width_base"], width=0.4, label="base")
plt.bar(x + 0.2, ci_tbl["ci_width_cuped"], width=0.4, label="CUPED", alpha=0.7)
plt.xticks(x, ci_tbl["approach"], rotation=20, ha="right")
plt.ylabel("CI width (in approach units)")
plt.title("CI width: base vs CUPED under heavy tails")
plt.legend()
plt.tight_layout()
plt.show()

## 6) Empirical power and type I error (small simulation)

We estimate rejection rates at fixed sample size.

To keep runtime reasonable:
- start with N_ITER = 25
- no bootstrap inside the loop

We compare:
- raw ARPU (base vs CUPED)
- winsorized p99 (base vs CUPED)
- log1p ARPU (base vs CUPED)


In [None]:
def pvals_raw_base_cuped(df: pd.DataFrame, alpha: float = ALPHA):
    c, t = split_groups(df)

    y_c = c["revenue"].to_numpy(float); y_t = t["revenue"].to_numpy(float)
    x_c = c["revenue_pre"].to_numpy(float); x_t = t["revenue_pre"].to_numpy(float)

    base = welch_ttest_effect(y_c, y_t, alpha=alpha)["p_value"]
    cup_c, cup_t = cuped_split_adjust(y_c, x_c, y_t, x_t)
    cup = welch_ttest_effect(cup_c.y_adj, cup_t.y_adj, alpha=alpha)["p_value"]

    return base, cup


def pvals_winsor_base_cuped(df: pd.DataFrame, alpha: float = ALPHA):
    y_all = winsorize(df["revenue"].to_numpy(float), upper_q=0.99)
    x_all = winsorize(df["revenue_pre"].to_numpy(float), upper_q=0.99)
    g = df["group"].to_numpy()

    y_c = y_all[g == "control"]; y_t = y_all[g == "test"]
    x_c = x_all[g == "control"]; x_t = x_all[g == "test"]

    base = welch_ttest_effect(y_c, y_t, alpha=alpha)["p_value"]
    cup_c, cup_t = cuped_split_adjust(y_c, x_c, y_t, x_t)
    cup = welch_ttest_effect(cup_c.y_adj, cup_t.y_adj, alpha=alpha)["p_value"]

    return base, cup


def pvals_log_base_cuped(df: pd.DataFrame, alpha: float = ALPHA):
    c, t = split_groups(df)

    y_c = np.log1p(c["revenue"].to_numpy(float)); y_t = np.log1p(t["revenue"].to_numpy(float))
    x_c = np.log1p(c["revenue_pre"].to_numpy(float)); x_t = np.log1p(t["revenue_pre"].to_numpy(float))

    base = welch_ttest_effect(y_c, y_t, alpha=alpha)["p_value"]
    cup_c, cup_t = cuped_split_adjust(y_c, x_c, y_t, x_t)
    cup = welch_ttest_effect(cup_c.y_adj, cup_t.y_adj, alpha=alpha)["p_value"]

    return base, cup


def rejection_rates(cfg: SyntheticB2CConfig, n_iter: int, alpha: float, seed0: int):
    rej = {
        "raw_base": 0, "raw_cuped": 0,
        "win_base": 0, "win_cuped": 0,
        "log_base": 0, "log_cuped": 0,
    }

    for i in range(n_iter):
        cfg_i = SyntheticB2CConfig(**{**cfg.__dict__, "seed": seed0 + i})
        df_i = generate_user_level_data(cfg_i)

        p0, p1 = pvals_raw_base_cuped(df_i, alpha=alpha)
        rej["raw_base"] += (p0 < alpha); rej["raw_cuped"] += (p1 < alpha)

        p0, p1 = pvals_winsor_base_cuped(df_i, alpha=alpha)
        rej["win_base"] += (p0 < alpha); rej["win_cuped"] += (p1 < alpha)

        p0, p1 = pvals_log_base_cuped(df_i, alpha=alpha)
        rej["log_base"] += (p0 < alpha); rej["log_cuped"] += (p1 < alpha)

    for k in rej:
        rej[k] = rej[k] / n_iter

    return rej


N_ITER = 25

cfg_eff = SyntheticB2CConfig(
    n_users=60_000,
    seed=10,
    effect_type="mixed",
    lift_sessions=0.02,
    lift_rev_per_session=0.02,
    include_binary=False,
    revps_lognorm_sigma=1.35,
    high_spender_share=0.05,
    high_revps_multiplier=10.0,
    monetization_latent_sigma=0.8,
)

cfg_null = SyntheticB2CConfig(
    n_users=60_000,
    seed=999,
    effect_type="none",
    lift_sessions=0.0,
    lift_rev_per_session=0.0,
    include_binary=False,
    revps_lognorm_sigma=1.35,
    high_spender_share=0.05,
    high_revps_multiplier=10.0,
    monetization_latent_sigma=0.8,
)

power = rejection_rates(cfg_eff, n_iter=N_ITER, alpha=ALPHA, seed0=5000)
type1 = rejection_rates(cfg_null, n_iter=N_ITER, alpha=ALPHA, seed0=6000)

pd.DataFrame([
    {"case": "power (effect)", **power},
    {"case": "type I error (null)", **type1},
])

## Notes on interpretation

- **Raw ARPU** may have low power under heavy tails because variance explodes.
- **Winsorization** often improves stability at the cost of introducing a small bias in the estimand
  (you are no longer estimating the true mean, but a capped mean).
- **log1p transform** changes the estimand. You test differences in expected log-revenue, which is closer
  to relative changes and is often more robust.
- **CUPED** can stack with winsorization/log transforms: you reduce variance via both
  (a) a better-behaved metric and (b) correlated pre-period information.

In production experiments, choose the approach based on:
- your official KPI definition (do you need the mean, or a robust proxy?)
- sensitivity vs interpretability trade-offs
- stability of pre-period covariate and absence of treatment leakage into pre-period data
