
# 05 — Audit Validation & Gaming-Resistant Defenses (from scratch)

This notebook rebuilds a **clean pipeline** to:
1) Recreate interpretable features (length, richness, POS-ish, readability, prompt–response coupling).  
2) Fit a **Ridge** model with **cross‑validation** and summarize coefficients.  
3) Run **partial‑dependence‑style** sweeps to see how individual features move predicted score.  
4) Do a light **subgroup analysis** by **prompt domain** (policy + math + code + general).  
5) Plot **lift curves**: how average score increases with more targets met.  
6) Prototype **defensive metrics** that reduce gaming: diminishing returns for length and prompt‑coverage checks.

*No external downloads required; spaCy is optional.*


In [1]:

# --- 0) Setup & Data
import os, re, random, warnings
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Ridge
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import KFold
from sklearn.metrics import r2_score, mean_squared_error

warnings.filterwarnings("ignore")
SEED = 42
random.seed(SEED); np.random.seed(SEED)

# If df is already defined, reuse it; otherwise try to load train.csv
if 'df' not in globals():
    candidates = [Path('train.csv'), Path('/mnt/data/helpsteer_extracted/train.csv')]
    DATA_PATH = next((p for p in candidates if p.exists()), candidates[0])
    assert DATA_PATH.exists(), f"Could not find {DATA_PATH.resolve()} — place 'train.csv' next to this notebook or define df."
    df = pd.read_csv(DATA_PATH)
    print(f"Loaded: {DATA_PATH}")

# Column detection
def pick(cols, cands):
    for c in cands:
        if c in cols: return c
    return None

cols = df.columns.tolist()
prompt_col   = pick(cols, ["prompt","instruction","question","query","user_input"])
response_col = pick(cols, ["response","response_text","answer","assistant_response","model_output","completion"])
if response_col is None:
    obj = [c for c in cols if df[c].dtype == object]
    response_col = next((c for c in obj if c != prompt_col), None)
assert response_col is not None, "No response-like text column found."

print({"prompt": prompt_col, "response": response_col})

# Build overall_score if missing from common rating columns
if "overall_score" not in df.columns:
    cands = [c for c in ["helpfulness","correctness","coherence","complexity","verbosity","quality","score","label"]
             if c in df.columns and pd.api.types.is_numeric_dtype(df[c])]
    assert len(cands) >= 1, "overall_score missing and no rating columns found to derive it."
    df["overall_score"] = df[cands].astype(float).mean(axis=1)

# Score bins for summaries
if "overall_bin" not in df.columns:
    bins   = [0, 4, 8, 12, 16, 20]
    labels = ["0–4","5–8","9–12","13–16","17–20"]
    df["overall_bin"] = pd.cut(df["overall_score"], bins=bins, labels=labels, include_lowest=True, right=True)


AssertionError: Could not find C:\Users\ethan\Downloads\train.csv — place 'train.csv' next to this notebook or define df.

In [None]:

# --- 1) Tokenization, POS-lite, readability, overlap helpers
from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS
STOPWORDS = set(ENGLISH_STOP_WORDS) | {
    "im","ive","id","youre","youll","dont","cant","wont","didnt","doesnt","isnt","arent","wasnt","werent",
    "couldnt","shouldnt","wouldnt","thats","theres","heres","whats","lets",
    "ok","okay","yes","yeah","nope","uh","um","uhh","hmm",
    "like","just","really","actually","basically","literally",
    "etc","e.g","eg","i.e","ie","http","https","www","com","net","org"
}

def tokens(text):
    return re.findall(r"[A-Za-z]{2,}", str(text).lower())

VERB_SUFFIXES = ("ing","ed","en","ize","ise","ify")
ADJ_SUFFIXES  = ("ous","ful","ive","less","able","ible","al","ary","ic","ical","y","ish")
NOUN_SUFFIXES = ("tion","sion","ment","ness","ity","ship","ism","ist","ance","ence","ery","or","er")
def is_verbish(t): return t.endswith(VERB_SUFFIXES)
def is_adjish(t):  return t.endswith(ADJ_SUFFIXES)
def is_nounish(t): return t.endswith(NOUN_SUFFIXES)

def syllable_count(word):
    w = word.lower()
    vowels = "aeiouy"; count = 0; prev = False
    for ch in w:
        v = ch in vowels
        if v and not prev: count += 1
        prev = v
    if w.endswith("e") and count > 1: count -= 1
    return max(1, count)

def flesch_reading_ease(text):
    toks = tokens(text); 
    if not toks: return 0.0
    n_words = len(toks)
    n_sents = max(1, len(re.findall(r"[.!?]+", str(text))))
    n_syll  = sum(syllable_count(w) for w in toks)
    return 206.835 - 1.015*(n_words/n_sents) - 84.6*(n_syll/n_words)

def avg_word_len(text):
    toks = tokens(text)
    return (sum(len(t) for t in toks)/len(toks)) if toks else 0.0

def jaccard_overlap(a_text, b_text):
    A = set([t for t in tokens(a_text) if t not in STOPWORDS])
    B = set([t for t in tokens(b_text) if t not in STOPWORDS])
    if not A and not B: return 0.0
    return len(A & B) / max(1, len(A | B))

def pos_counts(text):
    toks = tokens(text)
    v = sum(1 for t in toks if is_verbish(t))
    a = sum(1 for t in toks if is_adjish(t))
    n = sum(1 for t in toks if is_nounish(t))
    return len(toks), v, a, n


In [None]:

# --- 2) Feature extraction (response + prompt + relational)
RESP = response_col
PR   = prompt_col  # may be None

def extract_all(text):
    toks = tokens(text); n_tok = len(toks)
    n_sent = max(1, len(re.findall(r"[.!?]+", str(text))))
    total, v, a, n = pos_counts(text)
    return {
        "resp_word_len": n_tok,
        "resp_avg_word_len": avg_word_len(text),
        "resp_tokens_per_sentence": (n_tok/n_sent) if n_sent else 0.0,
        "resp_type_token_ratio": (len(set(toks))/n_tok) if n_tok else 0.0,
        "resp_flesch_readability": flesch_reading_ease(text),
        "resp_adj_count": a,
        "resp_verb_ratio": (v/n_tok) if n_tok else 0.0,
        "resp_adj_ratio": (a/n_tok) if n_tok else 0.0,
        "resp_noun_count": n,
        "resp_noun_ratio": (n/n_tok) if n_tok else 0.0,
        "resp_punct_density": (len(re.findall(r"[,:;—-]", str(text))) / max(1, len(str(text)))),
    }

resp_feat = df[RESP].astype(str).apply(extract_all).apply(pd.Series)

if PR is not None and PR in df.columns:
    prm_feat_raw = df[PR].astype(str).apply(extract_all).apply(pd.Series)
    # lexical density from prompt
    def lex_density_prompt(s):
        toks = tokens(s)
        if not toks: return 0.0
        stop = sum(1 for t in toks if t in STOPWORDS)
        return (len(toks)-stop)/len(toks)
    prm_feat = pd.DataFrame({
        "pr_word_len":        prm_feat_raw["resp_word_len"],
        "pr_lexical_density": df[PR].astype(str).apply(lex_density_prompt),
        "pr_hapax_ratio":     df[PR].astype(str).apply(lambda s: (lambda t: (sum(1 for w in set(t) if t.count(w)==1)/len(t) if len(t)>0 else 0.0))(tokens(s))),
        "pr_punct_density":   df[PR].astype(str).apply(lambda s: len(re.findall(r"[,:;—-]", str(s))) / max(1, len(str(s)))),
    })
else:
    prm_feat = pd.DataFrame(index=df.index, columns=["pr_word_len","pr_lexical_density","pr_hapax_ratio","pr_punct_density"]).fillna(0.0)

# Relational
if PR is not None and PR in df.columns:
    pr_lens = df[PR].astype(str).apply(lambda s: len(tokens(s))).replace(0, np.nan)
    rel = pd.DataFrame({
        "prr_len_ratio": (resp_feat["resp_word_len"] / pr_lens).fillna(0.0),
        "prr_avg_wordlen_diff": resp_feat["resp_avg_word_len"] - df[PR].astype(str).apply(avg_word_len),
        "prr_overlap_jaccard": [jaccard_overlap(p, r) for p, r in zip(df[PR].astype(str), df[RESP].astype(str))],
    })
else:
    rel = pd.DataFrame({"prr_len_ratio": 0.0, "prr_avg_wordlen_diff": resp_feat["resp_avg_word_len"], "prr_overlap_jaccard": 0.0}, index=df.index)

features = pd.concat([resp_feat, prm_feat, rel], axis=1)
data = pd.concat([df.reset_index(drop=True), features.reset_index(drop=True)], axis=1)

print("Engineered feature columns:", len(features.columns))
data.head(3)


## Cross‑validation: Ridge performance & stable coefficients

In [None]:

# --- 3) KFold CV on Ridge
X = features.astype(float).fillna(0.0).values
y = data["overall_score"].astype(float).values
feat_names = list(features.columns)

kf = KFold(n_splits=5, shuffle=True, random_state=SEED)
r2s, rmses, coefs = [], [], []

for tr, te in kf.split(X):
    model = make_pipeline(StandardScaler(with_mean=True, with_std=True), Ridge(alpha=2.0, random_state=SEED))
    model.fit(X[tr], y[tr])
    pred = model.predict(X[te])
    r2s.append(r2_score(y[te], pred))
    rmses.append(np.sqrt(mean_squared_error(y[te], pred)))
    coefs.append(model.named_steps["ridge"].coef_)

print("CV R^2  (mean ± sd):", f"{np.mean(r2s):.3f} ± {np.std(r2s):.3f}")
print("CV RMSE (mean ± sd):", f"{np.mean(rmses):.3f} ± {np.std(rmses):.3f}")

coef_mean = np.mean(np.stack(coefs, axis=0), axis=0)
coef_df = pd.DataFrame({"feature": feat_names, "coef": coef_mean}).sort_values("coef", ascending=False)
print("\nTop + coefficients (mean across folds):")
display(coef_df.head(15))
print("Top - coefficients (mean across folds):")
display(coef_df.tail(15))


## Partial‑dependence‑style sweeps

In [None]:

# --- 4) Sweep one feature across percentiles while holding others at dataset mean
model_full = make_pipeline(StandardScaler(with_mean=True, with_std=True), Ridge(alpha=2.0, random_state=SEED))
model_full.fit(X, y)

def partial_dependence_like(feature_name, n_points=30):
    assert feature_name in features.columns, f"{feature_name} not found."
    X_ref = features.astype(float).fillna(0.0).mean(axis=0).values.reshape(1, -1)
    idx = feat_names.index(feature_name)
    vals = np.quantile(features[feature_name].astype(float), np.linspace(0.01, 0.99, n_points))
    preds = []
    for v in vals:
        X_tmp = X_ref.copy()
        X_tmp[0, idx] = v
        preds.append(model_full.predict(X_tmp)[0])
    return vals, np.array(preds)

to_sweep = coef_df.head(6)["feature"].tolist()
plt.figure(figsize=(10,6))
for f in to_sweep:
    xs, ys = partial_dependence_like(f)
    plt.plot(xs, ys, label=f)
plt.legend()
plt.title("Partial‑dependence‑style curves (one feature at a time)")
plt.xlabel("feature value"); plt.ylabel("predicted score")
plt.tight_layout(); plt.show()


## Subgroup analysis by prompt domain

In [None]:

# --- 5) Heuristic domain tags using keywords in prompt (fallbacks to 'general')
def tag_domain(p):
    p = str(p).lower()
    if any(k in p for k in ["election","policy","govern","vote","senate","congress","law"]): return "policy"
    if any(k in p for k in ["math","algebra","equation","solve","probability","integral","derivative"]): return "math"
    if any(k in p for k in ["code","python","function","bug","error","compile","algorithm"]): return "code"
    return "general"

domains = df[prompt_col].apply(tag_domain) if prompt_col else pd.Series(["general"]*len(df))
data["domain"] = domains

for dom in ["policy","math","code","general"]:
    m = (data["domain"] == dom).values
    if m.sum() < 50: 
        print(f"Skip domain {dom} (n={m.sum()})")
        continue
    Xm = features[m].astype(float).fillna(0.0).values
    ym = data.loc[m, "overall_score"].astype(float).values
    model = make_pipeline(StandardScaler(with_mean=True, with_std=True), Ridge(alpha=2.0, random_state=SEED))
    model.fit(Xm, ym)
    r2 = r2_score(ym, model.predict(Xm))
    coef_dom = pd.DataFrame({"feature": feat_names, "coef": model.named_steps["ridge"].coef_}).sort_values("coef", ascending=False).head(8)
    print(f"\nDomain: {dom} | n={m.sum()} | in-sample R^2={r2:.3f}")
    display(coef_dom)


## Lift curves: average score vs. # of targets met

In [None]:

# --- 6) Build μ + kσ targets for top positive features and plot lift by #hits
TOPK = 10; K_STD = 1.0
top_pos = coef_df.head(TOPK)["feature"].tolist()

def targets_for(df_feat, feat_list, k=K_STD):
    rows = []
    for f in feat_list:
        s = pd.to_numeric(df_feat[f], errors="coerce")
        mu, sd = s.mean(), s.std(ddof=0)
        rows.append((f, mu + k*sd))
    return dict(rows)

targets = targets_for(features, top_pos, K_STD)

M = []
for f, t in targets.items():
    M.append((pd.to_numeric(features[f], errors="coerce") >= t).values)
M = np.vstack(M).T  # (n_rows, n_features)
hits = M.sum(axis=1)

tmp = pd.DataFrame({"hits": hits, "score": data["overall_score"].astype(float)})
lift = tmp.groupby("hits")["score"].mean()

plt.figure(figsize=(6,4))
plt.plot(lift.index, lift.values, marker="o")
plt.title(f"Mean score vs #targets met (top {TOPK}, μ+{K_STD}σ)")
plt.xlabel("# of targets met"); plt.ylabel("mean overall_score")
plt.tight_layout(); plt.show()


## Gaming‑resistant defenses (prototype)

In [None]:

# --- 7) Propose a more robust audit score that reduces gaming
# (A) Diminishing returns on length using a log transform
length = features["resp_word_len"].astype(float).clip(lower=1.0)
len_diminish = np.log1p(length)  # concave

# (B) Prompt coverage: combine Jaccard overlap + coverage of rare prompt words
from collections import Counter
def rare_weighted_overlap(prompt, response):
    p = [t for t in tokens(prompt) if t not in STOPWORDS]
    r = set([t for t in tokens(response) if t not in STOPWORDS])
    if not p: return 0.0
    counts = Counter(p)
    total = 0.0
    hit = 0.0
    for w,c in counts.items():
        total += 1.0 / c
        if w in r: hit += 1.0 / c
    return hit / total if total>0 else 0.0

if prompt_col:
    rw_overlap = [rare_weighted_overlap(p, r) for p, r in zip(df[prompt_col].astype(str), df[response_col].astype(str))]
else:
    rw_overlap = [0.0]*len(df)

# (C) Clarity guardrail for extreme sentence length
tokens_per_sent = features["resp_tokens_per_sentence"].astype(float)
clarity_bonus = np.exp(-np.maximum(0, tokens_per_sent - tokens_per_sent.median()) / (tokens_per_sent.std(ddof=0)+1e-6))

# Combine into a prototype "robust_score" (normalize components first)
def z(x): 
    x = np.asarray(x, float)
    return (x - np.nanmean(x)) / (np.nanstd(x)+1e-9)

robust_score = z(len_diminish) + z(features["prr_overlap_jaccard"]) + z(rw_overlap) + z(features["resp_type_token_ratio"]) + z(features["resp_flesch_readability"]) + z(clarity_bonus)

out = pd.DataFrame({
    "overall_score": data["overall_score"].astype(float),
    "robust_score": robust_score,
    "resp_word_len": features["resp_word_len"],
    "prr_overlap_jaccard": features["prr_overlap_jaccard"]
})
print("Correlation (overall vs robust):", np.corrcoef(out["overall_score"], out["robust_score"])[0,1].round(3))
out.head(5)
