In [None]:
import os
import json, glob
from dataclasses import dataclass, field
from typing import Dict, Any, Iterator, Optional

import pandas as pd
from jinja2 import Template
from omegaconf import OmegaConf
from pathlib import Path

import arena
import report_model

from signal_noise import signal_to_noise
import json
import numpy as np
import pandas as pd
import scipy.stats as stats

import plotly.express as px
import plotly.graph_objects as go
from jinja2 import Template

import importlib
importlib.reload(arena)

class ReportArgs:
    out_dir: str = "gh-pages/"
    data: str = "data/*.jsonl"
    recompute: bool = True # generate results for all data and summary line
    write_summary: bool = True # use results in out_dir/tmp to generate the summary table
    
    max_diff: float = 0.1 # skip models that are more than max_diff in performance
    sigma_thres: float = 5.0 # how many std to consider as not close
    min_perf: float = 0.05 # too bad for inconlusion, including near 0 models does give some extreme results

def run_arena(args: ReportArgs):
    records = []
    for fname in glob.glob(args.data):
        with open(fname, 'rt') as f:
            records.extend([json.loads(l) for l in f.readlines()])
    eval_results = pd.DataFrame(records)
    benchmarks = set(eval_results['benchmark_id'])
    print(benchmarks)
    return eval_results

args = ReportArgs()
results = run_arena(args)
display(results)

In [None]:
bid = "gsm8k"
bid = "lcb_codegen_v6"
bid = "humaneval+"
bid = "mbpp"
bid = "mgsm_cot"
bid = "math_cot"
bid = "mmlu"
bid = "human_eval_plus"
bid = "math500_cot"
bid = "mmlu_pro_cot"
bid = "mbpp_vllm"
bid = "cruxeval_output_cot"
bid = "swebench-verified"
bid = "CRUXEval-output-T0.8"
selected = results[(results["benchmark_id"] == bid) & (~results["model"].str.contains("_distill_", case=False))].copy()
args.max_diff = 0.1 
importlib.reload(arena)
arena_res: arena.ArenaResult = arena.summarize_benchmark(selected, args)
display(arena_res)
df_model = arena.model_table(selected)
df_example = arena.example_table(selected, df_model)
df_input = selected.copy()

In [None]:
def fig_example_vs_model(df_input, df_model, df_example, use_acc_as_position=False):
    df = df_input[["model", "example_id", "pass1", "N"]].merge(df_example[["example_id", "pass1_of_ex"]], on="example_id")
    model_table = df_model[["model", "pass1"]].rename(columns={"pass1": "pass1_of_model"})
    df = df.merge(model_table, on="model")
    df.sort_values(by=["pass1_of_ex", "example_id", "pass1_of_model", "model"], inplace=True)
    if not use_acc_as_position:
        yid, xid = "example_id", "model"
    else:
        yid, xid = "example_id", "pass1_of_model"

    fig = px.scatter(df, y=yid, x=xid, color="pass1",
                     opacity=0.75,
                     color_continuous_scale=["red", "yellow", "green"],
                     hover_data=["pass1", "pass1_of_ex", "pass1_of_model", "model", "example_id", "N"])
    fig.update_xaxes(autorange="reversed")
    fig.update_traces(marker={"symbol": "square"})
    fig.update_layout(
            width=900, height=1200,
            xaxis = dict(side ="top"),
        )
    return fig

fig = fig_example_vs_model(df_input, df_model, df_example, use_acc_as_position=True)
display(fig)


In [None]:
importlib.reload(report_model)
# fig = report_model.fig_marginals(df_input, df_model, df_example, xkey="rank_of_ex")
# display(fig)
# display(arena_res.summary)
fig = report_model.fig_cov_baseline("test", arena_res.summary, arena_res.input_table)
betas = report_model.show_betas(df_input, df_model, df_example)
display(betas)
df_betas = pd.DataFrame(betas)
df_betas["norm"] = df_betas["expected_std"] / np.sqrt(len(df_example))
fig.add_scatter(x=df_betas["mu"], y=df_betas["norm"])
display(fig)

display(px.scatter(df_betas, x="mu", y="alpha+beta"))

fig = report_model.fig_marginals(df_input, df_model, df_example, xkey="rank")
display(fig)

# fig = px.histogram(df_example, x="pass1_of_ex")
# display(fig)

In [None]:
def trend_df(selected: pd.DataFrame):
    marginals = selected.groupby(["example_id"]).agg({'pass1': 'mean'}).reset_index().sort_values(by="pass1")
    display(marginals)
    m1 = marginals["pass1"].to_numpy().copy()
    def independent_var(p: np.ndarray, alpha=1) -> float:
        """
        calculate the variance of two independent draws from p, X_i, Y_i ~ Bernoulli(p_i), I want E[(X_i - Y_i)**2]
        """
        N = len(p)
        assert np.all(p >= 0) and np.all(p <= 1)
        return np.sqrt(1 / N * np.mean(p * (1 - p)))

    df = pd.DataFrame({"alpha": np.logspace(-5, 5, 1000)})

    df["p_mean"] = df["alpha"].map(lambda alpha: np.power(m1, alpha).mean())
    df["vars"] = df["alpha"].map(lambda alpha: independent_var(np.power(m1, alpha)))
    m2 = np.where((0 < m1) & (m1 < 1), 0.5, m1)
    df["p_mean_const"] = df["alpha"].map(lambda alpha: np.power(m2, alpha).mean())
    df["vars_const"] = df["alpha"].map(lambda alpha: independent_var(np.power(m2, alpha)))
    return df

example_means = selected.groupby(["example_id"]).agg({'pass1': 'mean'}).reset_index().sort_values(by="pass1")
# 
# fig = px.line(y=m1)
m1 = example_means["pass1"].to_numpy().copy()
fig = go.Figure()
for alpha in []:
    fig.add_scatter(y=m1**alpha, mode="lines", name=f'm*{alpha}')

# Calculate which models are in top/bottom half based on overall performance
model_means = selected.groupby(["model"]).agg({'pass1': 'mean'}).reset_index().sort_values(by="pass1")
display(model_means)
nzs = np.sum(example_means["pass1"] == 0)
interval_size = 0.125
for start in np.linspace(0, 1, 8):
    models = model_means[(model_means["pass1"] >= start) & (model_means["pass1"] < start + interval_size)]
    # display(models)
    data_inside = selected[selected['model'].isin(models["model"])]
    if len(data_inside) == 0:
        continue
    data_means = data_inside.groupby(["example_id"]).agg({'pass1': 'mean'}).reset_index()
    # display(data_means, model_means)
    # Merge with original marginals to ensure same sorting
    data_means_sorted = example_means[['example_id']].merge(data_means, on="example_id", how="left")
    data_means_sorted = data_means_sorted.sort_values("pass1")
    # wsz = 1
    # smoothed = data_means_sorted["pass1"].rolling(window=1, center=True).mean()
    smoothed = data_means_sorted["pass1"]
    mu = data_means_sorted["pass1"].mean()
    n = len(models)
    fig.add_scatter(x=smoothed, y=np.arange(len(smoothed))/len(smoothed), name=f"{start:.2f}-{start+interval_size:.2f} ({n=}, {mu=:.3f})", mode='lines')

    x = np.linspace(0, 1, 100)
    alpha = smoothed.mean() / (1 - nzs / len(smoothed)) 
    beta_param = 1 - alpha
    from scipy.stats import beta
    s = 1
    cdf_values = beta.cdf(x, s*alpha, s*beta_param)
    beta_mean = alpha * (1 - nzs / len(smoothed))
    fig.add_trace(go.Scatter(
        x=x, 
        y= nzs/len(smoothed) + cdf_values * (1 - nzs/len(smoothed)), 
        # y= cdf_values * (len(smoothed)), 
        mode='lines',
        name=f'Beta({alpha:.2f}, {beta_param:.2f}) {beta_mean=:.3f}',
        line=dict(dash='dash')
    ))
# Add the first 3 model curves to the plot
# for i, (model_name, curve_data) in enumerate(list(model_curves.items())[:10]):
#     fig.add_scatter(y=curve_data, name=model_name, mode='lines')
fig.update_layout(title=bid)
fig.update_layout(
    width=800,
    height=600
)

display(fig)

In [None]:
model_means = selected.groupby(["model"]).agg({'pass1': 'mean'}).reset_index().sort_values(by="pass1")
# display(model_means)
nzs = np.sum(example_means["pass1"] == 0)

bottom_models = model_means[:10]["model"]
top_models = model_means[-10:]["model"]
mid_models = model_means[-20:-10]["model"]


def get_mean(models):
    data_inside = selected[selected['model'].isin(models)]
    data_means = data_inside.groupby(["example_id"]).agg({'pass1': 'mean'}).reset_index()
    data_means_sorted = example_means[['example_id']].merge(data_means, on="example_id", how="left")
    data_means_sorted = data_means_sorted.sort_values("pass1")
    return data_means_sorted, data_means_sorted["pass1"].mean()

data_means_bot, bot_mean = get_mean(bottom_models)
data_means_top, top_mean  = get_mean(top_models)
data_means_mid, mid_mean = get_mean(mid_models)


# mid_mean = alpha * top_mean + (1-alpha) * bot_mean
alpha = (mid_mean - bot_mean) / (top_mean - bot_mean)
fig = go.Figure()
cdf_bot = stats.ecdf(data_means_bot["pass1"])
cdf_top = stats.ecdf(data_means_top["pass1"])
fig.add_scatter(x=data_means_bot["pass1"], name="bot")
x = np.linspace(0, 1, 1000)
fig.add_scatter(x=x, y=len(data_means_top)*cdf_bot.cdf.evaluate(x), name="bot_cdf")
print(alpha)
fig.add_scatter(x=x, y=beta.cdf(x, alpha, 1-alpha)*len(data_means_bot), name="mid_beta")
fig.add_scatter(x=data_means_top["pass1"], name="top")
fig.add_scatter(x=data_means_mid["pass1"], name="mid_empirical")
mid_paired = alpha*(data_means_top["pass1"]) + (1-alpha)*(data_means_bot["pass1"])
mid_unpaired = alpha*(data_means_top["pass1"].to_numpy()) + (1-alpha)*(data_means_bot["pass1"].to_numpy())
mid_paired = mid_paired.sort_values()
fig.add_scatter(x=mid_unpaired, name="mid_unpaired")
fig.add_scatter(x=mid_paired, name="mid_paired")

# alpha = 0.5
mid_cdf = alpha*(cdf_top.cdf.evaluate(x)) + (1-alpha)*(cdf_bot.cdf.evaluate(x))
fig.add_scatter(x=x, y=len(data_means_top)*mid_cdf, name="mid_cdf")
print(cdf_bot.cdf.probabilities)
def expectation_ecdf_from_steps(cdf, f):
    probs = np.diff(np.concatenate(([0.0], cdf.quantiles)))
    print(probs)
    return np.sum(f(cdf.quantiles) * probs)

evar = expectation_ecdf_from_steps(cdf_bot.cdf, lambda x: x*(1-x))
print(evar, 0.25*0.75)

fig.update_layout(
    width=800,
    height=600,
    title=bid
)
display(fig)

In [None]:
def fig_cov_baseline(bmname: str, diffvsum: pd.DataFrame, dfmodel):
    df = diffvsum
    # df["is_close"] = np.where(df["sum(A-B)"].abs() < df["total"] / 20, "close", "not_close")
    df = df[df["accA"] >= df["accB"]]
    df["is_close"] = np.where(np.abs(df["accA"] - df["accB"]) / df["std(A-B)"] <= 3, "close: ≤3σ", "not close: >3σ")
    color_map = {
        "close: ≤3σ": "blue",      # Bright red
        "not close: >3σ": "#CCCCCC"     # Light gray
    } 
    figs = px.scatter(df,
                    x=np.maximum(df["accB"], df["accA"]), y="std(A-B)",
                    color="is_close",
                    color_discrete_map=color_map,
                    custom_data=["model_a", "model_b", "sum(A!=B)", "sum(A-B)", "pvalue", "std(A-B)", "accA", "accB", "corr(A,B)"])
    figs.for_each_trace(lambda trace: trace.update(opacity=0.5) 
                   if trace.name == "not close: >3σ" else None)
    
    figs.update_traces(hovertemplate=
        "<br>".join([
        "Model A: %{customdata[0]} (acc: %{customdata[6]:.1%})",
        "Model B: %{customdata[1]} (acc: %{customdata[7]:.1%})", 
        "total A≠B: %{customdata[2]:.1f}",
        "total A-B: %{customdata[3]:.1f}", 
        "std(A-B): %{customdata[5]:.2%}", 
        "p-value: %{customdata[4]:.3g}",
        "corr(A,B): %{customdata[8]:.3g}",
        ])  + "<extra></extra>")

    figs.update_traces(
        marker=dict(
            size=3,
            opacity=0.5, 
        )
    )

    data_sz = diffvsum.iloc[0]["total"]
    x = np.linspace(0, 1, 100)
    y = np.sqrt(x*(1-x) / data_sz)

    figl = go.Figure()

    figl.add_trace(go.Scatter(
        x=x, y=y, name="σ(acc)",
        # hoverinfo="skip",
        line=dict(color="lightgreen")
    ))

    figl.add_trace(go.Scatter(
        x=x, y=np.sqrt(2)*y, name="sqrt(2) σ(acc)",
        # hoverinfo="skip",
        line=dict(color="darkgreen")
    ))

    figl.add_trace(go.Scatter(
        x=dfmodel["p_mean"], y=dfmodel["vars"],
        # hoverinfo="skip",
        line=dict(color="red"),
        name="exp"
    ))
    figl.add_trace(go.Scatter(
        x=dfmodel["p_mean_const"], y=dfmodel["vars_const"],
        # hoverinfo="skip",
        line=dict(color="pink"),
        name="vars_const"
    ))

    fig = go.Figure(data=figl.data + figs.data)
    fig.update_layout(
        width=800, height=600, title=bmname,
        xaxis_title="mean(acc(A), acc(B))",
        yaxis_title="σ(A-B)"
    )
    return fig

display(summary)
display(fig_cov_baseline(bid, summary, trend_df(selected)))

In [None]:
def summary_stats(s, f=2, percent=True):
    return f"{s['mean']:.2f}±{s['std']:.2f} | [{s['min']:.2f}--{s['max']:.2f}] | n={s['count']} "

def format_stats_badge(s):
    s_percent = dict(s)
    for st in ["mean", "std", "min", "max"]:
        s_percent[st] = 100 * s[st]
    summary = summary_stats(s_percent)
    mean = 100*s["mean"]
    return f'<span title="{summary}">{mean:.2f}</span>'

def write_summary_table(summary_count: pd.DataFrame, output_path: Path):
    summary_count = summary_count.sort_values(by='benchmark_id')

    def link_detail(bid):
        l1 = f"""by <a href="model_{bid}.html">models </a> """
        l2 = f"""<a href="ex_{bid}.html"> examples </a>"""
        l3 = f"""<a href="ex_v_model_{bid}.html"> data </a>"""
        return l1 + '|' + l2 + '|' + l3
    summary_count['link to details'] = summary_count['benchmark_id'].apply(link_detail)

    def normalize(counts, includes):
        percent = counts.copy(deep=True)
        for c in includes:
            percent[c] = percent[c] / percent['size']
        return percent

    includes_cols = ['benchmark_id', 'size',  'std(A-B)', 'corr_ab', 'p5_min', 'p5_max', 'no_solve', 'tau-', 'sig_noise','link to details']
    percent_cols = ['p5_min', 'p5_max', 'no_solve', 'tau-']
    summary_percent = normalize(summary_count, percent_cols)

    display(summary_percent)
    template_path = r"templates/summary.html"

    with open(output_path, "w", encoding="utf-8") as output_file:
        with open(template_path) as template_file:
            j2_template = Template(template_file.read())
            output_file.write(j2_template.render({
                'count_table': summary_count[includes_cols].to_html(escape=False, index=False),
                'percent_table': summary_percent[includes_cols].to_html(
                    escape=False,
                    index=False,
                    formatters={
                        "std(A-B)": lambda x: format_stats_badge(x),
                        "corr_ab": lambda x: format_stats_badge(x),
                        'p5_min': lambda x: f'{x*100:.2f}',
                        'p5_max': lambda x: f'{x*100:.2f}',
                        'min_dist': '{:.2}'.format,
                        'no_solve': '{:.2}'.format,
                        'tau-': '{:.2}'.format,
                        'sig_noise': '{:.2f}'.format,
                    }),
            }))
            
def generate_summary(args: ReportArgs):
    tmp_dir = Path(args.out_dir) / 'tmp'
    os.makedirs(tmp_dir, exist_ok=True)
    
    if args.write_summary:
        records = []
        for fname in glob.glob(f'{tmp_dir}/summary-*.jsonl'):
            with open(fname, 'rt') as f:
                records.extend([json.loads(l) for l in f.readlines()])
        print(records)
        write_summary_table(pd.DataFrame(records), Path(args.out_dir) / 'index.html')

generate_summary(args)

In [None]:
import numpy as np
from scipy.stats import beta

import plotly.graph_objects as go

# Create a range of alpha values to explore
alphas = [0.1, 0.3, 0.5]

# Create x values for plotting
x = np.linspace(0, 1, 1000)

fig = go.Figure()

for alpha in alphas:
    # For beta(alpha, 1-alpha), we need alpha > 0 and 1-alpha > 0, so 0 < alpha < 1
    if 0 < alpha < 1:
        beta_param = 1 - alpha
        # Calculate CDF
        cdf_values = beta.pdf(x, alpha, beta_param)
        fig.add_trace(go.Scatter(
            x=x, 
            y=cdf_values, 
            mode='lines',
            name=f'Beta({alpha}, {beta_param:.1f})'
        ))

fig.update_layout(
    title='Cumulative Distribution Function of Beta(α, 1-α)',
    xaxis_title='x',
    yaxis_title='CDF(x)',
    width=800,
    height=600
)

fig.show()

In [None]:
for alpha in np.linspace(0, 1, 20):
    beta_modelpred = beta(2*alpha+1, 2*(1-alpha)+1) / beta(alpha, (1-alpha))
    var = alpha * (1-alpha)
    print(beta_modelpred, var)

In [None]:
from scipy.special import beta
def plot_betapdf(a=0.5, b=0.5):    
    # Define the integrand
    def integrand(x):
        return x**(a-1) * (1-x)**(b-1) / beta(a, b)

    x = np.linspace(0, 1, 100)
    return px.scatter(x=x, y=integrand(x))

alpha = 0.3
fig = plot_betapdf(a=1, b=4)
display(fig)


In [None]:

import torch
from torch import mean, var
A = torch.randn([100, 30], dtype=torch.float64) > 0
A = A.to(torch.float32)
def var(A, dim=None):
    return torch.var(A, dim=dim, unbiased=False)
# print(A)
print(f"{var(A)=}")
print(f"{var(A, dim=0)=}")
print(f"{mean(A)=}")
print(f"{mean(var(A, dim=0))=}")
print(f"{var(mean(A, dim=0))=}")


from numpy import mean, var
A = A.numpy()
print(f"{var(A)=}")
print(f"{var(var(A, axis=0))=}")
# print(f"{var(A, axis=0)=}")
# print(f"{mean(A)=}")
print(f"{mean(var(A, axis=0))=}")
print(f"{var(mean(A, axis=0))=}")
print(f"{mean(var(A, axis=1))=}")
print(f"{var(mean(A, axis=1))=}")

In [None]:

res = []
N = 100
K = 2
import numpy as np
from numpy import mean, var, std

def cov(A, B, ddof=0):
    # okay, is there ever a time to need ddof=1?
    return np.cov(A, B, ddof=ddof)[0, 1]

class Paired:
    @staticmethod
    def sample_vars(A: np.ndarray, B: np.ndarray, dof=0) -> dict:
        assert A.shape[0] == B.shape[0], "should be paired" 
        return {
            "var(E(A-B))": var(mean(A-B, axis=1)),
            "E(var(A-B))": mean(var(A, axis=1) + var(B, axis=1)),
            "var(A-B)": var(A) + var(B) - 2 * cov(mean(A, axis=1), mean(B, axis=1)),
            "cov(A,B)": cov(mean(A, axis=1), mean(B, axis=1)),
            "var(A) + var(B)": var(A) + var(B),
            # "_var(A-B)": mean(A**2 + B**2) - 2 * mean(mean(A, axis=1) * mean(B, axis=1)) - mean(A-B)**2,
        }
    
    @staticmethod
    def sample_vars_unbiased(A: np.ndarray, B: np.ndarray, dof=0) -> dict:
        assert A.shape[0] == B.shape[0] # paired data
        kA = A.shape[1]
        kB = A.shape[1]
        return {
            "var(E(A-B))": var(mean(A-B, axis=1)) - mean(var(A, axis=1)/(kA-1) + var(B, axis=1)/(kA-1)) ,
            "E(var(A-B))": mean(var(A, axis=1)* (1 + 1/(kA-1)) + var(B, axis=1) * (1 + 1/(kB-1))),
            "var(A-B)": var(A) + var(B) - 2 * cov(mean(A, axis=1), mean(B, axis=1)), # actually this is slightly biased too, but we ignore it
            # "_var(A-B)": mean(A**2 + B**2) - 2 * mean(mean(A, axis=1) * mean(B, axis=1)) - mean(A-B)**2,
        }
    
    @staticmethod
    def bernoulli_sample_vars(A: np.ndarray, B: np.ndarray, dof=0) -> dict:
        ...

    @staticmethod
    def bernoulli_p_vars(pA: np.ndarray, pB: np.ndarray) -> dict:
        assert pA.shape[0] == pB.shape[0]
        assert pA.shape[1] == pB.shape[1] == 1
        pA = pA.flatten()
        pB = pB.flatten()
        return {
            "var(E(A-B))": var(pA - pB),
            "E(var(A-B))": mean(pA*(1-pA) + pB*(1-pB)),
            "var(A-B)": mean(pA)*(1-mean(pA)) + mean(pB)*(1-mean(pB)) - 2*cov(pA, pB),
            "cov(A,B)": cov(pA, pB),
            "var(A) + var(B)": mean(pA)*(1-mean(pA)) + mean(pB)*(1-mean(pB)),
            "_var(A-B)": mean(pA + pB - 2*pA*pB) - mean(pA - pB)**2,
        }
    
    @staticmethod
    def bernoulli_self(ph: np.ndarray, K: np.ndarray) -> dict:
        ph = ph.flatten()
        pB = ph
        mu = mean(ph)
        covAA = mean((ph*ph - 1/K*ph)*(1+1/(K-1)))  - mu * mu
        return {
            "var(E(A-B))": var(ph - pB),
            "E(var(A-B))": mean(ph*(1-ph) + pB*(1-pB)),
            "var(A-B)": mean(ph)*(1-mean(ph)) + mean(pB)*(1-mean(pB)) - 2*covAA,
            "cov(A,B)": covAA,
            "var(A) + var(B)": mean(ph)*(1-mean(ph)) + mean(pB)*(1-mean(pB)),
            "_var(A-B)": mean(ph + pB - 2*ph*pB) - mean(ph - pB)**2,
        }


pA = np.random.rand(N, 1)
pB = (pA + 1*(np.random.rand(N, 1)-0.5)).clip(0, 1)
pA = pB
for i in range(100):
    A = np.random.rand(N, K)
    A = np.where(A < pA, 1, 0)

    B = np.random.rand(N, K)
    B = np.where(B < pB, 1, 0)

    delta = (A-B).mean() 
    vars = {
        "E(A-B)": delta,
        # "var(A-B)":  np.mean(np.mean(A*A + B*B, axis=1) - 2 * A.mean(axis=1) * B.mean(axis=1)) - delta * delta,
        # "var(A-B)": mean(A*A + B*B) - 2*mean(mean(A, axis=1) * mean(B, axis=1)) - delta * delta,
        
    }
    def total_variance_test(v: dict):
        assert np.allclose(v["var(A-B)"], v["E(var(A-B))"] + v["var(E(A-B))"]), v

    def relative_error(v1, v2):
        # for k in "var(A-B)", "E(var(A-B))", "var(E(A-B))"
        ... 

    v = Paired.sample_vars(A, B)
    total_variance_test(v) 

    vstar = Paired.bernoulli_p_vars(pA, pB)
    total_variance_test(vstar)

    B = np.random.rand(N, K)
    pA_hat = A.mean(axis=1, keepdims=True)
    B = np.where(B < pA, 1, 0)

    pA_hat = A.mean(axis=1, keepdims=True)
    C = np.random.rand(N, K)
    C = np.where(C < pA, 1, 0)
    vresamp = Paired.bernoulli_self(A.mean(axis=1, keepdims=True), K)
    vresamp2 = Paired.bernoulli_p_vars(A.mean(axis=1, keepdims=True), A.mean(axis=1, keepdims=True))
    # total_variance_test(vresamp)
    
    # v_unb = Paired.sample_vars_unbiased(A, A)
    # total_variance_test(v_unb) 

    res.append({
        **{("star", k): v for k, v in vstar.items() if k=="var(A-B)"},
        **{("resamp", k): v for k, v in vresamp.items() if k=="var(A-B)"},
        **{("resamp2", k): v for k, v in vresamp2.items() if k=="var(A-B)"},
        **{("hat", k): v for k, v in v.items() if k=="var(A-B)"},
        # **{("unb", k): v for k, v in v_unb.items()},
    })

df = pd.DataFrame(res)
# df["diff"] = df["var(A-B)"] - df["E(var(A-B))"] - df["var(E(A-B))"]
# df["diff2"] = df["_var(A-B)"] - df["E(var(A-B))"] - df["var(E(A-B))"]
# df["diff_star"] = df["var(A-B)_star"] - df["E(var(A-B))_star"] - df["var(E(A-B))_star"]
# px.scatter(df, x="x", y="y"
display(df.describe())
df["diff"] = df[("resamp", "var(A-B)")] / df[("star", "var(A-B)")] - 1
df["diff2"] = df[("resamp2", "var(A-B)")] / df[("star", "var(A-B)")] - 1
df["diff3"] = df[("hat", "var(A-B)")] / df[("star", "var(A-B)")] - 1
display(df)
display(df.describe())

def rmse(diff):
    return np.sqrt(np.mean(diff**2))

print("MSE", rmse(df["diff"].to_numpy()))
print("MSE2", rmse(df["diff2"].to_numpy()))
print("MSE3", rmse(df["diff3"].to_numpy()))

In [None]:
class Single:
    @staticmethod
    def sample_vars(A: np.ndarray, dof=0) -> dict:
        return {
            "var(E(A))": var(mean(A, axis=1)),
            "E(var(A))": mean(var(A, axis=1)),
            "var(A)": var(A),
        } 
    
    @staticmethod
    def sample_vars_unbiased2(A: np.ndarray) -> dict:
        kA = A.shape[1]
        N = A.shape[0]
        return {
            "var(E(A))": var(mean(A, axis=1)) - 1/(kA-1) * mean(var(A, axis=1)) + var(A)*1/(N*kA - 1), 
            "E(var(A))": mean(var(A, axis=1, ddof=1)), 
            "var(A)": var(A, ddof=1),  # empirically maybe still < 1% too large  
        }
    
    @staticmethod
    def sample_vars_unbiased(A: np.ndarray) -> dict:
        kA = A.shape[1]
        N = A.shape[0]
        return {
            "var(E(A))": var(mean(A, axis=1)) - 1/(kA-1) * mean(var(A, axis=1)) + var(A)*1/(N*kA - 1), 
            "E(var(A))": mean(var(A, axis=1)) * (1 + 1/(kA-1)), 
            "var(A)": var(A) * (1 + 1/(N*kA - 1)), # empirically maybe still < 1% too large 
        }

    @staticmethod
    def bootstrap_SE(A: np.ndarray, boots_size=100) -> dict:
        kA = A.shape[1]
        N = A.shape[0]
        means = []
        for _ in range(boots_size):
            inds = np.random.choice(N*kA, size=N, replace=True)
            samples = A.flatten()[inds]
            means.append(samples.mean())

        SE = np.std(means)
        return {
            "var(A)": SE**2*N
        }
    
    @staticmethod
    def bernoulli_p_vars(pA: np.ndarray) -> dict:
        return {
            "var(E(A))": var(pA),
            "E(var(A))": mean(pA*(1-pA)),
            "var(A)": mean(pA)*(1-mean(pA))
        }
    
N, K = 1000, 10
ph = np.random.rand(N, 1)
# pA = 0.5 * np.ones((N, 1))
# pA[:N//2] = 0
res = []
for i in range(1000):
    A = np.random.rand(N, K)
    A = np.where(A < ph, 1, 0)
    # A = pA + 0.5 * np.random.randn(N, K)

    def total_variance_test(v: dict):
        assert np.isclose(v["var(A)"], v["E(var(A))"] + v["var(E(A))"]), v

    def relative_error(v1, v2):
        # for k in "var(A-B)", "E(var(A-B))", "var(E(A-B))"
        ... 

    v = Single.sample_vars(A)
    total_variance_test(v) 

    vstar = Single.bernoulli_p_vars(ph)
    total_variance_test(vstar)

    v_unb = Single.sample_vars_unbiased(A)
    # total_variance_test(v_unb)

    v_unb2 = Single.sample_vars_unbiased2(A)
    total_variance_test(v_unb2)
    for k in v_unb:
        assert np.isclose(v_unb[k], v_unb2[k])

    se_boots = Single.bootstrap_SE(A, 10)

    res.append({
        # **{("star", k): v for k, v in vstar.items()},
        **{("hat", k): (v - vstar[k]) / vstar[k] for k, v in v.items()},
        **{("unb", k): (v - vstar[k]) / vstar[k] for k, v in v_unb.items()},
        **{("boots", k): (v - vstar[k]) / vstar[k] for k, v in se_boots.items()},
    })

df = pd.DataFrame(res)
# df["diff"] = df["var(A-B)"] - df["E(var(A-B))"] - df["var(E(A-B))"]
# df["diff2"] = df["_var(A-B)"] - df["E(var(A-B))"] - df["var(E(A-B))"]
# df["diff_star"] = df["var(A-B)_star"] - df["E(var(A-B))_star"] - df["var(E(A-B))_star"]
display(df.aggregate(lambda x:
    pd.Series({
        "std": std(x),
        "mse": np.sqrt(np.mean(x*x)),
        "mean": np.mean(x),
})))
# px.scatter(df, x="x", y="y")
display(df.describe())