# Data Audit & Heavy-Tails Notebook
*Generated: 2025-01-09*

This notebook performs:
1) Schema checks & parsing of list-like fields  
2) Heavy-tail diagnostics using **Clauset–Shalizi–Newman (2009)** discrete power-law workflow  
3) Overdispersion tests (Poisson) per **Dean–Lawless (1989)** and **Cameron–Trivedi (1990)**  
4) Edge case analysis and guardrails

> References: Clauset et al. (2009, SIAM Review), Alstott et al. (2014, *powerlaw* package), Dean & Lawless (1989), Cameron & Trivedi (1990).


In [1]:
# %% [code] Imports & Config
import os, json, math, logging, pathlib, re, ast, time
from datetime import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Reproducibility
SEED = 42
np.random.seed(SEED)

# Paths (edit as needed)
DATA_CSV = os.environ.get("GOODREADS_CSV", "../../data/processed/romance_books_main_final.csv")
ARTIFACTS_DIR = pathlib.Path("./audit_artifacts")
ARTIFACTS_DIR.mkdir(parents=True, exist_ok=True)

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("audit")
log.info(f"Notebook start. CSV: {DATA_CSV}")


ModuleNotFoundError: No module named 'numpy'

In [None]:
# %% [code] Load data & schema checks
df = pd.read_csv(DATA_CSV)
log.info(f"Loaded dataframe with shape {df.shape}")

expected_cols = [
 'work_id','book_id_list_en','title','publication_year','num_pages_median','description','language_codes_en',
 'author_id','author_name','author_average_rating','author_ratings_count','series_id','series_title',
 'ratings_count_sum','text_reviews_count_sum','average_rating_weighted_mean','genres_str','shelves_str','series_works_count_numeric'
]

missing = [c for c in expected_cols if c not in df.columns]
extra = [c for c in df.columns if c not in expected_cols]
schema_ok = len(missing)==0

schema_report = {
    "timestamp": datetime.utcnow().isoformat(),
    "n_rows": int(len(df)),
    "n_cols": int(df.shape[1]),
    "present_cols": list(df.columns),
    "expected_cols": expected_cols,
    "missing_cols": missing,
    "extra_cols": extra,
    "schema_ok": schema_ok
}
with open(ARTIFACTS_DIR / "schema_report.json", "w", encoding="utf-8") as f:
    json.dump(schema_report, f, indent=2)
log.info(f"Schema report written. OK? {schema_ok}")

# Quick dtype summary
dtype_summary = df.dtypes.astype(str).to_dict()
pd.Series(dtype_summary).to_csv(ARTIFACTS_DIR / "dtype_summary.csv")
dtype_summary


In [None]:
# %% [code] Parse list-like fields & simple unit checks

def parse_listlike(s):
    """Robust parser for list-like strings.
    Tries literal_eval (e.g., "['123','456']"), falls back to comma split.
    Returns list[str].
    """
    if pd.isna(s):
        return []
    s = str(s).strip()
    if not s:
        return []
    # Try literal list
    try:
        v = ast.literal_eval(s)
        if isinstance(v, (list, tuple)):
            return [str(x).strip() for x in v]
    except Exception:
        pass
    # Fallback: comma-separated
    return [x.strip() for x in s.split(",") if x.strip()]

for col in ["book_id_list_en","genres_str","shelves_str"]:
    if col in df.columns:
        df[col + "_list"] = df[col].apply(parse_listlike)

# Minimal unit checks
assert df["book_id_list_en_list"].apply(lambda x: isinstance(x, list)).all(), "book_id_list_en_list parse failed"
assert df["genres_str_list"].apply(lambda x: isinstance(x, list)).all(), "genres_str_list parse failed"
assert df["shelves_str_list"].apply(lambda x: isinstance(x, list)).all(), "shelves_str_list parse failed"

# Counts non-negative
for c in ["ratings_count_sum","text_reviews_count_sum","author_ratings_count"]:
    if c in df.columns:
        assert (df[c] >= 0).all(), f"Negative counts found in {c}"

# Save small preview
df.head(50).to_csv(ARTIFACTS_DIR / "parsed_preview.csv", index=False)
log.info("Parsing complete and basic unit checks passed.")


In [None]:
# %% [code] Heavy-tail visuals: CCDF and log-binned histograms
from math import ceil

def plot_ccdf(x, title, fname):
    x = np.asarray(x, dtype=float)
    x = x[~np.isnan(x)]
    x = x[x >= 0]
    x_sorted = np.sort(x)
    n = len(x_sorted)
    if n == 0:
        log.warning(f"No data for {title}")
        return
    y = 1.0 - np.arange(1, n+1)/n
    plt.figure()
    plt.loglog(x_sorted, y, marker='.', linestyle='none')
    plt.xlabel("x")
    plt.ylabel("CCDF P(X>=x)")
    plt.title(title + " — Empirical CCDF (log–log)")
    plt.grid(True, which="both")
    plt.tight_layout()
    plt.savefig(ARTIFACTS_DIR / fname, dpi=150)
    plt.close()

def plot_log_binned_hist(x, title, fname, bins=50):
    x = np.asarray(x, dtype=float)
    x = x[~np.isnan(x)]
    x = x[x > 0]
    if len(x) == 0:
        log.warning(f"No positive data for {title}")
        return
    # Log-spaced bins for visualization ONLY (do not fit on binned data)
    xmin, xmax = x.min(), x.max()
    edges = np.logspace(np.log10(xmin), np.log10(xmax), bins)
    plt.figure()
    plt.hist(x, bins=edges, density=True)
    plt.xscale("log")
    plt.yscale("log")
    plt.xlabel("x (log)")
    plt.ylabel("Density (log)")
    plt.title(title + " — Log-binned histogram (for visualization only)")
    plt.grid(True, which="both")
    plt.tight_layout()
    plt.savefig(ARTIFACTS_DIR / ("hist_" + fname), dpi=150)
    plt.close()

targets = ["ratings_count_sum","text_reviews_count_sum","author_ratings_count"]
for var in targets:
    if var in df.columns:
        x = df[var].values
        plot_ccdf(x, f"{var}", f"ccdf_{var}.png")
        plot_log_binned_hist(x, f"{var}", f"{var}.png")
log.info("Heavy-tail visualizations saved.")


In [None]:
# %% [code] CSN (2009) discrete power-law fits & model comparison (via `powerlaw`)
# This cell uses the `powerlaw` package (Alstott et al., 2014).
# If not installed, install in your environment: `pip install powerlaw`

try:
    import powerlaw
    HAVE_POWERLAW = True
except Exception as e:
    HAVE_POWERLAW = False
    log.warning("`powerlaw` not available. Install with `pip install powerlaw` to enable CSN fits.")

def csn_fit_report(x_raw, varname):
    if not HAVE_POWERLAW:
        return None
    x = np.asarray(x_raw, dtype=float)
    x = x[~np.isnan(x)]
    # CSN tail model for counts: use strictly positive support
    x = x[x >= 1]
    if len(x) < 50:
        log.warning(f"{varname}: too few positive observations for tail fitting ({len(x)}).")
        return None
    fit = powerlaw.Fit(x, discrete=True, estimate_discrete=True, verbose=False)
    pl = fit.power_law
    # Likelihood ratio tests vs alternatives
    comps = {}
    for alt in ["lognormal","exponential","truncated_power_law","stretched_exponential"]:
        try:
            R, p = fit.distribution_compare('power_law', alt)
            comps[alt] = {"LR": float(R), "p": float(p)}
        except Exception as e:
            comps[alt] = {"LR": None, "p": None}
    report = {
        "var": varname,
        "n_positive": int(len(x)),
        "xmin": float(pl.xmin),
        "alpha": float(pl.alpha),
        "D_KS": float(fit.D),
        # Note: `powerlaw`'s gof p-value estimation can be computationally heavy; 
        # here we use the default KS; for full GOF bootstrap, see docs.
        "tail_fraction": float((x >= pl.xmin).mean()),
        "comparisons": comps,
    }
    return report

csn_reports = []
for var in ["ratings_count_sum","text_reviews_count_sum","author_ratings_count"]:
    if var in df.columns:
        rep = csn_fit_report(df[var].values, var)
        if rep:
            csn_reports.append(rep)

if csn_reports:
    with open(ARTIFACTS_DIR / "csn_powerlaw_reports.json", "w", encoding="utf-8") as f:
        json.dump(csn_reports, f, indent=2)
    log.info("CSN reports saved to csn_powerlaw_reports.json")
else:
    log.info("CSN reports not generated (missing powerlaw or insufficient data).")


In [None]:
# %% [code] Overdispersion tests: Dean–Lawless (Pearson chi-square) & Cameron–Trivedi (auxiliary OLS)
# Requires statsmodels
import statsmodels.api as sm

def fit_poisson(y, X):
    X_ = sm.add_constant(X, has_constant='add')
    model = sm.GLM(y, X_, family=sm.families.Poisson())
    res = model.fit()
    return res

def dean_lawless_test(res):
    # Pearson chi-square should have E ≈ df and Var ≈ 2*df under Poisson
    chi2 = res.pearson_chi2
    df = res.df_resid
    z = (chi2 - df) / math.sqrt(2.0*df) if df > 0 else np.nan
    # two-sided p-value from N(0,1)
    from math import erf, sqrt
    p = 2*(1 - 0.5*(1 + erf(abs(z)/sqrt(2))))
    return {"pearson_chi2": chi2, "df": df, "z": z, "p_two_sided": p}

def cameron_trivedi_test(y, mu):
    # Following auxiliary regression: Y* = ((y - mu)^2 - y) / mu  onto mu (no intercept)
    # Reference: Cameron & Trivedi (1990); see also CRAN overdisp vignette
    y = np.asarray(y, dtype=float)
    mu = np.asarray(mu, dtype=float)
    mask = np.isfinite(y) & np.isfinite(mu) & (mu > 0)
    y = y[mask]; mu = mu[mask]
    y_star = ((y - mu)**2 - y) / mu
    X_aux = mu.reshape(-1,1)  # no intercept
    aux_res = sm.OLS(y_star, X_aux).fit()
    alpha_hat = aux_res.params[0]
    t_stat = aux_res.tvalues[0]
    p_val = aux_res.pvalues[0]
    return {"alpha_hat": alpha_hat, "t": t_stat, "p": p_val, "n": int(mask.sum())}

# Example design matrix: simple intercept-only model for baseline
targets = ["ratings_count_sum","text_reviews_count_sum"]
X_baseline = pd.DataFrame({"intercept_only": np.ones(len(df))})

overdisp_results = []
for var in targets:
    if var in df.columns:
        y = df[var].values
        res = fit_poisson(y, X_baseline)
        dl = dean_lawless_test(res)
        ct = cameron_trivedi_test(y, res.fittedvalues)
        overdisp_results.append({"var": var, "dean_lawless": dl, "cameron_trivedi": ct})

with open(ARTIFACTS_DIR / "overdispersion_tests.json", "w", encoding="utf-8") as f:
    json.dump(overdisp_results, f, indent=2)
log.info("Overdispersion tests saved to overdispersion_tests.json")
overdisp_results


## Outputs generated
- `audit_artifacts/schema_report.json` — column presence & dtype summary
- `audit_artifacts/dtype_summary.csv` — dtypes
- `audit_artifacts/parsed_preview.csv` — first 50 rows after parsing
- `audit_artifacts/ccdf_*.png`, `audit_artifacts/hist_*.png` — heavy-tail visuals
- `audit_artifacts/csn_powerlaw_reports.json` — (if `powerlaw` installed) CSN fits
- `audit_artifacts/overdispersion_tests.json` — Dean–Lawless & Cameron–Trivedi test results

> Note: **Do not** fit lines to binned histograms. Use CSN MLE + KS (and LR comparisons) for tail inference.


In [None]:
# %% [code] Edge case analysis and summary

# Analyze zeros and support issues
edge_case_report = {
    "timestamp": datetime.utcnow().isoformat(),
    "zero_analysis": {},
    "support_analysis": {},
    "recommendations": []
}

for var in ["ratings_count_sum", "text_reviews_count_sum", "author_ratings_count"]:
    if var in df.columns:
        x = df[var].values
        zeros = (x == 0).sum()
        total = len(x)
        
        edge_case_report["zero_analysis"][var] = {
            "zero_count": int(zeros),
            "zero_fraction": float(zeros / total),
            "total_observations": int(total)
        }
        
        # Check for potential zero-inflation
        if zeros / total > 0.1:  # More than 10% zeros
            edge_case_report["recommendations"].append(
                f"{var}: High zero fraction ({zeros/total:.1%}) - consider zero-inflated models"
            )

# Summary statistics
summary_stats = {
    "dataset_shape": df.shape,
    "missing_values": df.isnull().sum().to_dict(),
    "edge_case_report": edge_case_report
}

with open(ARTIFACTS_DIR / "edge_case_analysis.json", "w", encoding="utf-8") as f:
    json.dump(summary_stats, f, indent=2)

log.info("Edge case analysis complete.")
print("\n=== AUDIT SUMMARY ===")
print(f"Dataset: {df.shape[0]:,} rows, {df.shape[1]} columns")
print(f"Schema OK: {schema_ok}")
print(f"Artifacts saved to: {ARTIFACTS_DIR}")
if edge_case_report["recommendations"]:
    print("\nRecommendations:")
    for rec in edge_case_report["recommendations"]:
        print(f"  - {rec}")


# Data Audit & Heavy-Tails Notebook
*Generated: 2025-09-25 22:16:03*

This notebook performs:
1) Schema checks & parsing of list-like fields  
2) Heavy-tail diagnostics using **Clauset–Shalizi–Newman (2009)** discrete power-law workflow  
3) Overdispersion tests (Poisson) per **Dean–Lawless (1989)** and **Cameron–Trivedi (1990)**  
4) Leakage filters for non-content shelves

> References: Clauset et al. (2009, SIAM Review), Alstott et al. (2014, *powerlaw* package), Dean & Lawless (1989), Cameron & Trivedi (1990).


In [None]:
# %% [code] Imports & Config
import os, json, math, logging, pathlib, re, ast, time
from datetime import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Reproducibility
SEED = 42
np.random.seed(SEED)

# Paths (edit as needed)
DATA_CSV = os.environ.get("GOODREADS_CSV", "../../data/processed/romance_books_main_final.csv")
ARTIFACTS_DIR = pathlib.Path("./audit_artifacts")
ARTIFACTS_DIR.mkdir(parents=True, exist_ok=True)

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("audit")
log.info(f"Notebook start. CSV: {DATA_CSV}")


In [None]:
# %% [code] Load data & schema checks
df = pd.read_csv(DATA_CSV)
log.info(f"Loaded dataframe with shape {df.shape}")

expected_cols = [
 'work_id','book_id_list_en','title','publication_year','num_pages_median','description','language_codes_en',
 'author_id','author_name','author_average_rating','author_ratings_count','series_id','series_title',
 'ratings_count_sum','text_reviews_count_sum','average_rating_weighted_mean','genres_str','shelves_str','series_works_count_numeric'
]

missing = [c for c in expected_cols if c not in df.columns]
extra = [c for c in df.columns if c not in expected_cols]
schema_ok = len(missing)==0

schema_report = {
    "timestamp": datetime.utcnow().isoformat(),
    "n_rows": int(len(df)),
    "n_cols": int(df.shape[1]),
    "present_cols": list(df.columns),
    "expected_cols": expected_cols,
    "missing_cols": missing,
    "extra_cols": extra,
    "schema_ok": schema_ok
}
with open(ARTIFACTS_DIR / "schema_report.json", "w", encoding="utf-8") as f:
    json.dump(schema_report, f, indent=2)
log.info(f"Schema report written. OK? {schema_ok}")

# Quick dtype summary
dtype_summary = df.dtypes.astype(str).to_dict()
pd.Series(dtype_summary).to_csv(ARTIFACTS_DIR / "dtype_summary.csv")
dtype_summary


In [None]:
# %% [code] Parse list-like fields & simple unit checks

def parse_listlike(s):
    """Robust parser for list-like strings.
    Tries literal_eval (e.g., "['123','456']"), falls back to comma split.
    Returns list[str].
    """
    if pd.isna(s):
        return []
    s = str(s).strip()
    if not s:
        return []
    # Try literal list
    try:
        v = ast.literal_eval(s)
        if isinstance(v, (list, tuple)):
            return [str(x).strip() for x in v]
    except Exception:
        pass
    # Fallback: comma-separated
    return [x.strip() for x in s.split(",") if x.strip()]

for col in ["book_id_list_en","genres_str","shelves_str"]:
    if col in df.columns:
        df[col + "_list"] = df[col].apply(parse_listlike)

# Minimal unit checks
assert df["book_id_list_en_list"].apply(lambda x: isinstance(x, list)).all(), "book_id_list_en_list parse failed"
assert df["genres_str_list"].apply(lambda x: isinstance(x, list)).all(), "genres_str_list parse failed"
assert df["shelves_str_list"].apply(lambda x: isinstance(x, list)).all(), "shelves_str_list parse failed"

# Counts non-negative
for c in ["ratings_count_sum","text_reviews_count_sum","author_ratings_count"]:
    if c in df.columns:
        assert (df[c] >= 0).all(), f"Negative counts found in {c}"

# Save small preview
df.head(50).to_csv(ARTIFACTS_DIR / "parsed_preview.csv", index=False)
log.info("Parsing complete and basic unit checks passed.")


In [None]:
# %% [code] Heavy-tail visuals: CCDF and log-binned histograms
from math import ceil

def plot_ccdf(x, title, fname):
    x = np.asarray(x, dtype=float)
    x = x[~np.isnan(x)]
    x = x[x >= 0]
    x_sorted = np.sort(x)
    n = len(x_sorted)
    if n == 0:
        log.warning(f"No data for {title}")
        return
    y = 1.0 - np.arange(1, n+1)/n
    plt.figure()
    plt.loglog(x_sorted, y, marker='.', linestyle='none')
    plt.xlabel("x")
    plt.ylabel("CCDF P(X>=x)")
    plt.title(title + " — Empirical CCDF (log–log)")
    plt.grid(True, which="both")
    plt.tight_layout()
    plt.savefig(ARTIFACTS_DIR / fname, dpi=150)
    plt.close()

def plot_log_binned_hist(x, title, fname, bins=50):
    x = np.asarray(x, dtype=float)
    x = x[~np.isnan(x)]
    x = x[x > 0]
    if len(x) == 0:
        log.warning(f"No positive data for {title}")
        return
    # Log-spaced bins for visualization ONLY (do not fit on binned data)
    xmin, xmax = x.min(), x.max()
    edges = np.logspace(np.log10(xmin), np.log10(xmax), bins)
    plt.figure()
    plt.hist(x, bins=edges, density=True)
    plt.xscale("log")
    plt.yscale("log")
    plt.xlabel("x (log)")
    plt.ylabel("Density (log)")
    plt.title(title + " — Log-binned histogram (for visualization only)")
    plt.grid(True, which="both")
    plt.tight_layout()
    plt.savefig(ARTIFACTS_DIR / ("hist_" + fname), dpi=150)
    plt.close()

targets = ["ratings_count_sum","text_reviews_count_sum","author_ratings_count"]
for var in targets:
    if var in df.columns:
        x = df[var].values
        plot_ccdf(x, f"{var}", f"ccdf_{var}.png")
        plot_log_binned_hist(x, f"{var}", f"{var}.png")
log.info("Heavy-tail visualizations saved.")


In [None]:
# %% [code] CSN (2009) discrete power-law fits & model comparison (via `powerlaw`)
# This cell uses the `powerlaw` package (Alstott et al., 2014).
# If not installed, install in your environment: `pip install powerlaw`

try:
    import powerlaw
    HAVE_POWERLAW = True
except Exception as e:
    HAVE_POWERLAW = False
    log.warning("`powerlaw` not available. Install with `pip install powerlaw` to enable CSN fits.")

def csn_fit_report(x_raw, varname):
    if not HAVE_POWERLAW:
        return None
    x = np.asarray(x_raw, dtype=float)
    x = x[~np.isnan(x)]
    # CSN tail model for counts: use strictly positive support
    x = x[x >= 1]
    if len(x) < 50:
        log.warning(f"{varname}: too few positive observations for tail fitting ({len(x)}).")
        return None
    fit = powerlaw.Fit(x, discrete=True, estimate_discrete=True, verbose=False)
    pl = fit.power_law
    # Likelihood ratio tests vs alternatives
    comps = {}
    for alt in ["lognormal","exponential","truncated_power_law","stretched_exponential"]:
        try:
            R, p = fit.distribution_compare('power_law', alt)
            comps[alt] = {"LR": float(R), "p": float(p)}
        except Exception as e:
            comps[alt] = {"LR": None, "p": None}
    report = {
        "var": varname,
        "n_positive": int(len(x)),
        "xmin": float(pl.xmin),
        "alpha": float(pl.alpha),
        "D_KS": float(fit.D),
        # Note: `powerlaw`'s gof p-value estimation can be computationally heavy; 
        # here we use the default KS; for full GOF bootstrap, see docs.
        "tail_fraction": float((x >= pl.xmin).mean()),
        "comparisons": comps,
    }
    return report

csn_reports = []
for var in ["ratings_count_sum","text_reviews_count_sum","author_ratings_count"]:
    if var in df.columns:
        rep = csn_fit_report(df[var].values, var)
        if rep:
            csn_reports.append(rep)

if csn_reports:
    with open(ARTIFACTS_DIR / "csn_powerlaw_reports.json", "w", encoding="utf-8") as f:
        json.dump(csn_reports, f, indent=2)
    log.info("CSN reports saved to csn_powerlaw_reports.json")
else:
    log.info("CSN reports not generated (missing powerlaw or insufficient data).")


In [None]:
# %% [code] Overdispersion tests: Dean–Lawless (Pearson chi-square) & Cameron–Trivedi (auxiliary OLS)
# Requires statsmodels
import statsmodels.api as sm

def fit_poisson(y, X):
    X_ = sm.add_constant(X, has_constant='add')
    model = sm.GLM(y, X_, family=sm.families.Poisson())
    res = model.fit()
    return res

def dean_lawless_test(res):
    # Pearson chi-square should have E ≈ df and Var ≈ 2*df under Poisson
    chi2 = res.pearson_chi2
    df = res.df_resid
    z = (chi2 - df) / math.sqrt(2.0*df) if df > 0 else np.nan
    # two-sided p-value from N(0,1)
    from math import erf, sqrt
    p = 2*(1 - 0.5*(1 + erf(abs(z)/sqrt(2))))
    return {"pearson_chi2": chi2, "df": df, "z": z, "p_two_sided": p}

def cameron_trivedi_test(y, mu):
    # Following auxiliary regression: Y* = ((y - mu)^2 - y) / mu  onto mu (no intercept)
    # Reference: Cameron & Trivedi (1990); see also CRAN overdisp vignette
    y = np.asarray(y, dtype=float)
    mu = np.asarray(mu, dtype=float)
    mask = np.isfinite(y) & np.isfinite(mu) & (mu > 0)
    y = y[mask]; mu = mu[mask]
    y_star = ((y - mu)**2 - y) / mu
    X_aux = mu.reshape(-1,1)  # no intercept
    aux_res = sm.OLS(y_star, X_aux).fit()
    alpha_hat = aux_res.params[0]
    t_stat = aux_res.tvalues[0]
    p_val = aux_res.pvalues[0]
    return {"alpha_hat": alpha_hat, "t": t_stat, "p": p_val, "n": int(mask.sum())}

# Example design matrix: simple intercept-only model for baseline
targets = ["ratings_count_sum","text_reviews_count_sum"]
X_baseline = pd.DataFrame({"intercept_only": np.ones(len(df))})

overdisp_results = []
for var in targets:
    if var in df.columns:
        y = df[var].values
        res = fit_poisson(y, X_baseline)
        dl = dean_lawless_test(res)
        ct = cameron_trivedi_test(y, res.fittedvalues)
        overdisp_results.append({"var": var, "dean_lawless": dl, "cameron_trivedi": ct})

with open(ARTIFACTS_DIR / "overdispersion_tests.json", "w", encoding="utf-8") as f:
    json.dump(overdisp_results, f, indent=2)
log.info("Overdispersion tests saved to overdispersion_tests.json")
overdisp_results


## Outputs generated
- `audit_artifacts/schema_report.json` — column presence & dtype summary
- `audit_artifacts/dtype_summary.csv` — dtypes
- `audit_artifacts/parsed_preview.csv` — first 50 rows after parsing
- `audit_artifacts/ccdf_*.png`, `audit_artifacts/hist_*.png` — heavy-tail visuals
- `audit_artifacts/csn_powerlaw_reports.json` — (if `powerlaw` installed) CSN fits
- `audit_artifacts/overdispersion_tests.json` — Dean–Lawless & Cameron–Trivedi test results
- `audit_artifacts/excluded_shelves.csv` — shelves flagged by leakage filters

> Note: **Do not** fit lines to binned histograms. Use CSN MLE + KS (and LR comparisons) for tail inference.


In [None]:
# %% [code] Apply leakage filters (regexes) to shelves
import yaml, re

filters = {}
with open("shelf_filters.yml", "r", encoding="utf-8") as f:
    filters = yaml.safe_load(f)

patterns = []
for group, exprs in filters.items():
    for e in exprs:
        try:
            patterns.append((group, re.compile(e, flags=re.IGNORECASE)))
        except re.error as ex:
            log.warning(f"Invalid regex in group {group}: {e} ({ex})")

def is_excluded_shelf(s):
    s = str(s).strip()
    for g, pat in patterns:
        if pat.search(s):
            return True, g
    return False, None

shelves = df["shelves_str_list"].explode().dropna().astype(str).unique().tolist()
rows = []
for s in shelves:
    exc, group = is_excluded_shelf(s)
    if exc:
        rows.append({"shelf": s, "group": group})

excluded_df = pd.DataFrame(rows).sort_values(["group","shelf"])
excluded_df.to_csv(ARTIFACTS_DIR / "excluded_shelves.csv", index=False)
log.info(f"Excluded {len(excluded_df)} unique shelves (see excluded_shelves.csv).")
excluded_df.head(20)
