# Moore & Gordon–style ART pre-scoring (run before factor analysis)

This notebook computes Moore & Gordon ART-style scores and item statistics (no IRT). It detects author/foil items via the code row of the cleaned ART file, enforces binary responses, performs scoring + diagnostics, and saves outputs to `data/processed/results/01_pre_scoring_moore_gordon` (timestamped run folder).


In [None]:
# ============================================
# CELL 1: Setup environment, paths, results dir
# ============================================

import os
import re
import json
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime
from difflib import SequenceMatcher

# Project root: walk up until we find the actual data file (handles cwd = repo root or scripts/analysis)
_DATA_FILE = "data/processed/art_cleaned/ART_pretest_merged_EN_cleaned.csv"
PROJECT_ROOT = os.path.abspath(os.getcwd())
while PROJECT_ROOT:
    candidate = os.path.join(PROJECT_ROOT, _DATA_FILE)
    if os.path.isfile(candidate):
        break
    _parent = os.path.dirname(PROJECT_ROOT)
    if _parent == PROJECT_ROOT:
        PROJECT_ROOT = os.path.abspath(os.getcwd())
        candidate = os.path.join(PROJECT_ROOT, _DATA_FILE)
        break
    PROJECT_ROOT = _parent

# Input path
DATA_PATH = os.path.join(PROJECT_ROOT, _DATA_FILE)

# Results folder (timestamped run under base directory)
BASE_RESULTS_DIR = os.path.join(PROJECT_ROOT, "data", "processed", "results", "01_pre_scoring_moore_gordon")
RUN_ID = datetime.now().strftime("%Y%m%d_%H%M%S")
RESULTS_DIR = os.path.join(BASE_RESULTS_DIR, f"run_{RUN_ID}")
os.makedirs(RESULTS_DIR, exist_ok=True)

print("PROJECT_ROOT:", PROJECT_ROOT)
print("DATA_PATH:", DATA_PATH)
print("RESULTS_DIR:", RESULTS_DIR)


In [None]:
# ============================================
# CELL 2: Load dataset and basic checks
# ============================================

df_raw = pd.read_csv(DATA_PATH)

# First row holds item codes; participant responses start on the next row
codes_row = df_raw.iloc[0].fillna("").astype(str).str.strip()
df = df_raw.iloc[1:].reset_index(drop=True)

print("Loaded dataset (responses only after removing code row).")
print("Shape (rows, cols):", df.shape)
print("First 5 columns:", list(df.columns[:5]))
print("Last 5 columns:", list(df.columns[-5:]))

# Quick missingness snapshot
na_rate = df.isna().mean().sort_values(ascending=False).head(10)
print("\nTop-10 columns by missing rate:")
print(na_rate)

# Save basic metadata
meta = {
    "data_path": DATA_PATH,
    "n_rows": int(df.shape[0]),
    "n_cols": int(df.shape[1]),
    "run_id": RUN_ID,
}
with open(os.path.join(RESULTS_DIR, "metadata.json"), "w") as f:
    json.dump(meta, f, indent=2)

df.head()


In [None]:
# =========================================================
# CELL 3: Identify real-author and foil item columns
# =========================================================
# Auto: use code row prefixes (authors: cla/det/fan/mod/sci/sfi/rom/soc; foils: fill*)
# Manual override: populate AUTHOR_COLS_MANUAL / FOIL_COLS_MANUAL if needed.

ALLOWED_AUTHOR_PREFIXES = ("cla", "det", "fan", "mod", "sci", "sfi", "rom", "soc")
FOIL_PREFIX = "fill"

# =======================
# PATCH 1: Code-row audit
# =======================

# Build an auditable column↔code mapping
codes_row = codes_row.astype(str)  # ensure string
colmap = pd.DataFrame({
    "column_name": df_raw.columns,
    "code_row_value": codes_row.values
})
colmap["code_norm"] = (
    colmap["code_row_value"]
    .str.strip().str.lower()
    .str.replace(r"\s+", "", regex=True)
)

# Save mapping for later verification
colmap_path = os.path.join(RESULTS_DIR, "column_code_map.csv")
colmap.to_csv(colmap_path, index=False)
print("Saved column↔code map:", colmap_path)

# Sanity-check that the code row looks like a code row (not participant data)
ALLOWED_AUTHOR_PREFIXES = ("cla","det","fan","mod","sci","sfi","rom","soc")
FOIL_PREFIX = "fill"

n_author_like = int(colmap["code_norm"].str.startswith(ALLOWED_AUTHOR_PREFIXES).sum())
n_foil_like   = int(colmap["code_norm"].str.startswith(FOIL_PREFIX).sum())

print("Sanity counts from code row:")
print("  author-coded columns:", n_author_like)
print("  foil-coded columns  :", n_foil_like)

# Fail-fast thresholds (tune if needed, but do not remove)
if n_author_like < 20 or n_foil_like < 20:
    raise ValueError(
        "Code-row classification looks wrong (too few author/foil-coded columns). "
        "Check whether row 0 is truly the code row and whether columns are aligned."
    )

code_series = codes_row.str.lower().str.strip()

author_cols_auto = code_series[code_series.str.startswith(ALLOWED_AUTHOR_PREFIXES)].index.tolist()
foil_cols_auto = code_series[code_series.str.startswith(FOIL_PREFIX)].index.tolist()

print("AUTO DETECTION:")
print("# author cols detected:", len(author_cols_auto))
print("# foil cols detected:", len(foil_cols_auto))

# Manual overrides (leave empty to use auto)
AUTHOR_COLS_MANUAL = []
FOIL_COLS_MANUAL = []

author_cols = AUTHOR_COLS_MANUAL if len(AUTHOR_COLS_MANUAL) > 0 else author_cols_auto
foil_cols = FOIL_COLS_MANUAL if len(FOIL_COLS_MANUAL) > 0 else foil_cols_auto

# =========================================
# Exclude late-added items (not administered to full sample / high missingness)
# =========================================
ITEMS_EXCLUDED_FROM_ANALYSIS = [
    # Late-added / not administered to full sample
    "Yuri Tsypkin",
    "Victor Khlystun fill 92",
    "Andrea Segre fill 93",
    "Natalya Shagaida fill 94",
    "Ivan Buzdalov fill 95",
    "Ivan Ushachev fill 96",
    "Holger Magel fill 97",
    "Vasily Uzun fill 98",
    "Sergey Siptits fill 99",
    "Valentina Shirokova fill 100",
    # Top missing proportion among remainder (per review)
    "Ian Fleming",
    "Gerrit HoogenbuM fill1",
    "Lawrense Stern",
    "Yakushkina Gilyan fill 83",
]
author_cols = [c for c in author_cols if c not in ITEMS_EXCLUDED_FROM_ANALYSIS]
foil_cols = [c for c in foil_cols if c not in ITEMS_EXCLUDED_FROM_ANALYSIS]
print("Excluded from analysis (late-added / high missingness):", len(ITEMS_EXCLUDED_FROM_ANALYSIS), "items")
print("  Authors after exclusion:", len(author_cols))
print("  Foils after exclusion:", len(foil_cols))

# =========================================
# PATCH 2: Ensure author/foil sets disjoint
# =========================================
author_set = set(author_cols)
foil_set = set(foil_cols)
overlap = sorted(list(author_set & foil_set))
if overlap:
    raise ValueError(f"Author/Foil column overlap detected (should be impossible): {overlap[:20]}")

if len(author_cols) == 0 or len(foil_cols) == 0:
    raise ValueError(
        "MISSING item columns: Could not identify author/foil columns. "
        "Rename with auth_/foil_ prefixes or fill AUTHOR_COLS_MANUAL/FOIL_COLS_MANUAL."
    )

print("\nFINAL ITEM COUNTS:")
print("Authors:", len(author_cols))
print("Foils:", len(foil_cols))
print("Total items:", len(author_cols) + len(foil_cols))

# Save item lists
pd.Series(author_cols, name="author_item").to_csv(
    os.path.join(RESULTS_DIR, "author_item_list.csv"), index=False
)
pd.Series(foil_cols, name="foil_item").to_csv(
    os.path.join(RESULTS_DIR, "foil_item_list.csv"), index=False
)
print("\nSaved item lists to results.")


In [None]:
# =========================================================
# CELL 4: Extract and validate binary response matrix
# =========================================================

item_cols = author_cols + foil_cols
X = df[item_cols].copy()

# ======================================================
# PATCH 3: Strict binary validation + missingness tracing
# ======================================================

# Convert to numeric but DO NOT silently coerce bad strings into usable values without logging
X_raw = X.copy()
X_num = X_raw.apply(pd.to_numeric, errors="coerce")

# Log columns that created NaNs after coercion (often indicates parsing problems)
na_by_col = X_num.isna().mean().sort_values(ascending=False)
na_report_path = os.path.join(RESULTS_DIR, "missingness_by_item_column.csv")
na_by_col.to_csv(na_report_path, header=["missing_rate"])
print("Saved missingness report:", na_report_path)

cols_with_na = na_by_col[na_by_col > 0].index.tolist()
if cols_with_na:
    sample_col = cols_with_na[0]
    print("Example column with NaNs after coercion:", sample_col)
    print("Raw values (first 20):", X_raw[sample_col].head(20).tolist())

    # Show where the NaNs actually are and what the raw values look like
    nan_idx = X_num[sample_col].isna()
    print(f"NaNs in {sample_col}: {nan_idx.sum()} rows")
    if nan_idx.any():
        print("Example rows with NaN in this column (first 10 indices):", nan_idx[nan_idx].index[:10].tolist())
        print("Raw values at those rows:", X_raw.loc[nan_idx, sample_col].head(10).tolist())

# --- Enhanced missingness + coercion logging ---
before_na = int(X_raw.isna().sum().sum())
missing_count = int(X_num.isna().sum().sum())
new_na = missing_count - before_na

print(f"Missing cells before numeric coercion: {before_na}")
print(f"Missing cells after numeric coercion:  {missing_count}")
print(f"New NaNs introduced by coercion:       {new_na}")

# Preserve NaN-containing numeric matrix for downstream missingness analysis (Cells 16-17)
X_num_prefill = X_num.copy()

# --- Strict binary validation (on non-NaN values only) ---
nonbinary = ~X_num.isin([0, 1]) & X_num.notna()
if nonbinary.any().any():
    offenders = X_num[nonbinary].stack().value_counts()
    print("Non-binary values found (will raise error):")
    print(offenders.head(20))
    raise ValueError(
        f"Non-binary values present in item matrix. Fix upstream cleaning."
    )

# --- Strict NaN handling: do NOT fill missing with 0 ---
# Scores will be NaN for any participant with missing item(s).
X = X_num  # float matrix: 0.0, 1.0, or NaN
n_participants_any_missing = int(X.isna().any(axis=1).sum())
print(f"\nBinary matrix (NaN-preserving): {X.shape}")
print(f"Total missing cells: {missing_count}")
print(f"Participants with any missing item: {n_participants_any_missing} / {X.shape[0]}")


In [None]:
# =========================================================
# CELL 5: Compute scoring per Moore & Gordon definitions
#   Strict NaN + prorated-with-threshold option
# =========================================================

Xa = X[author_cols]
Xf = X[foil_cols]

# Strict NaN: score = NaN if any item in the group is missing
hits = Xa.sum(axis=1, min_count=len(author_cols))
false_alarms = Xf.sum(axis=1, min_count=len(foil_cols))
standard_score = hits - false_alarms
name_score = hits

# Prorate-with-threshold: scale to full length, require high completeness
min_frac = 0.975
nA = Xa.notna().sum(axis=1)
nF = Xf.notna().sum(axis=1)

hits_raw = Xa.sum(axis=1, skipna=True)
fa_raw = Xf.sum(axis=1, skipna=True)

hits_pr = hits_raw * (len(author_cols) / nA.replace(0, np.nan))
fa_pr = fa_raw * (len(foil_cols) / nF.replace(0, np.nan))

ok = (nA >= min_frac * len(author_cols)) & (nF >= min_frac * len(foil_cols))

hits_pr = hits_pr.where(ok)
fa_pr = fa_pr.where(ok)
name_score_pr = hits_pr
standard_score_pr = hits_pr - fa_pr

scores = pd.DataFrame({
    "hits": hits,
    "false_alarms": false_alarms,
    "standard_score": standard_score,
    "name_score": name_score,
})

scores_prorated = pd.DataFrame({
    "hits": hits_pr,
    "false_alarms": fa_pr,
    "standard_score": standard_score_pr,
    "name_score": name_score_pr,
})

n_nan_scores = int(scores["standard_score"].isna().sum())
n_valid_scores = int(scores["standard_score"].notna().sum())
n_valid_prorated = int(scores_prorated["standard_score"].notna().sum())

print(f"Participants with valid strict scores: {n_valid_scores}")
print(f"Participants with NaN scores (missing item data): {n_nan_scores}")
print(f"Participants with valid prorated scores (min_frac={min_frac}): {n_valid_prorated}")

# Identity check: mean errors equals sum of foil probs (strict only)
valid = false_alarms.notna()
foil_probs = X.loc[valid, foil_cols].mean(axis=0)
mean_errors_from_scores = float(false_alarms[valid].mean())
mean_errors_from_probs = float(foil_probs.sum())

print(f"\nIdentity check (on {int(valid.sum())} complete-case participants):")
print("Mean false alarms per participant (from sums):", mean_errors_from_scores)
print("Mean false alarms per participant (sum of foil probs):", mean_errors_from_probs)

if abs(mean_errors_from_scores - mean_errors_from_probs) > 1e-8:
    raise ValueError(
        "Mismatch: mean false alarms per participant != sum of foil endorsement probabilities. "
        "This indicates a logic/column-selection problem."
    )

print("Score columns computed.")
scores.describe().T


In [None]:
# =========================================================
# CELL 6: Scoring integrity checks (required)
# =========================================================

N = scores.shape[0]
n_auth = len(author_cols)
n_foil = len(foil_cols)

# 1) Confirm corrected score = hits − false alarms on random participants
random.seed(12345)
check_idx = random.sample(range(N), k=min(10, N))
check_df = scores.iloc[check_idx].copy()
check_df["recomputed"] = check_df["hits"] - check_df["false_alarms"]
check_df["matches"] = (
    (check_df["standard_score"].isna() & check_df["recomputed"].isna())
    | (check_df["standard_score"].notna() & check_df["recomputed"].notna() & (check_df["standard_score"] == check_df["recomputed"]))
)

print("Random participant scoring checks (10 or fewer):")
print(check_df)

check_df.to_csv(os.path.join(RESULTS_DIR, "scoring_random_checks.csv"), index=True)

# 2) Hard bounds assertion on valid (non-NaN) scores
min_allowed = -n_foil
max_allowed = n_auth
valid_std = scores["standard_score"].dropna()
viol = (valid_std < min_allowed) | (valid_std > max_allowed)

print(f"\nBounds check on {len(valid_std)} valid scores: [{min_allowed}, {max_allowed}]")
print(f"Impossible standard_score cases: {int(viol.sum())}")

if viol.any():
    bad_ids = valid_std.index[viol][:20].tolist()
    impossible = scores.loc[viol[viol].index]
    impossible.to_csv(os.path.join(RESULTS_DIR, "impossible_standard_score_cases.csv"), index=True)
    raise ValueError(f"Standard score bound violation for first cases: {bad_ids}")

# 3) Save assumptions
n_nan = int(scores["standard_score"].isna().sum())
assumptions = {
    "missing_cells_in_item_matrix": int(missing_count),
    "missing_handling": "strict_nan_if_any_item_missing",
    "binary_value_enforcement": True,
    "n_participants_total": int(N),
    "n_participants_with_nan_scores": n_nan,
    "n_participants_with_valid_scores": int(N - n_nan),
    "items_excluded_from_analysis": ITEMS_EXCLUDED_FROM_ANALYSIS,
    "n_authors_after_exclusion": len(author_cols),
    "n_foils_after_exclusion": len(foil_cols),
}
with open(os.path.join(RESULTS_DIR, "scoring_assumptions.json"), "w") as f:
    json.dump(assumptions, f, indent=2)

print("\nSaved integrity outputs + assumptions.")


In [None]:
# =========================================================
# CELL 7: Score distributions (Moore & Gordon-style)
# =========================================================

def summarize_series(s: pd.Series):
    return pd.Series({
        "mean": s.mean(),
        "sd": s.std(ddof=1),
        "min": s.min(),
        "max": s.max(),
        "skewness": s.skew(),
    })

score_summary = pd.DataFrame({
    "standard_score": summarize_series(scores["standard_score"]),
    "name_score": summarize_series(scores["name_score"]),
    "hits": summarize_series(scores["hits"]),
    "false_alarms": summarize_series(scores["false_alarms"]),
}).T

print("Score summary:")
print(score_summary)

score_summary.to_csv(os.path.join(RESULTS_DIR, "score_summary.csv"), index=True)


In [None]:
# =========================================================
# CELL 8: Plots (histograms) + save
# =========================================================

def save_hist(series, title, filename, bins=30):
    clean = series.dropna()
    n_dropped = len(series) - len(clean)
    if n_dropped > 0:
        print(f"[{title}] Dropping {n_dropped} NaN values before plotting.")
    plt.figure(figsize=(8, 5))
    plt.hist(clean, bins=bins)
    plt.title(title)
    plt.xlabel(title)
    plt.ylabel("Count")
    plt.tight_layout()
    outpath = os.path.join(RESULTS_DIR, filename)
    plt.savefig(outpath, dpi=200)
    plt.show()
    print("Saved:", outpath)

save_hist(scores["standard_score"], "Standard ART score (Hits - False Alarms)", "hist_standard_score.png")
save_hist(scores["name_score"], "Name score (Hits)", "hist_name_score.png")
save_hist(scores["false_alarms"], "False alarms (Foils selected)", "hist_false_alarms.png")


In [None]:
# =========================================================
# CELL 9: Real-author item selection rates (observed denominator)
# =========================================================

X_items = X[author_cols + foil_cols]

def selection_rates(cols):
    x = X_items[cols]
    n_obs = x.notna().sum(axis=0)
    pct = x.mean(axis=0, skipna=True) * 100
    out = pd.DataFrame({
        "pct_selected": pct,
        "n_observed": n_obs,
        "missing_prop": 1 - (n_obs / len(x)),
    })
    return out.sort_values("pct_selected", ascending=False)

author_rates = selection_rates(author_cols)
author_rates.index.name = "author_item"

mean_author_sel = author_rates["pct_selected"].mean()
max_author = author_rates.iloc[0]
min_author = author_rates.iloc[-1]

print("Author selection rates summary (observed-denominator):")
print("Mean author selection rate (%):", mean_author_sel)
print("Highest-selected author:", author_rates.index[0], "=", float(max_author["pct_selected"]))
print("Lowest-selected author:", author_rates.index[-1], "=", float(min_author["pct_selected"]))

top10_authors = author_rates.head(10)
bottom10_authors = author_rates.tail(10)

print("\nTop 10 authors by selection rate:")
print(top10_authors)

print("\nBottom 10 authors by selection rate:")
print(bottom10_authors)

# Save tables
author_rates.to_csv(os.path.join(RESULTS_DIR, "author_selection_rates_computed.csv"), index=True)
top10_authors.to_csv(os.path.join(RESULTS_DIR, "top10_authors_selection.csv"), index=True)
bottom10_authors.to_csv(os.path.join(RESULTS_DIR, "bottom10_authors_selection.csv"), index=True)


In [None]:
# =========================================================
# CELL 10: Plot top/bottom author rates + save
# =========================================================

def plot_bar(df_rates, title, filename):
    plt.figure(figsize=(10, 5))
    plt.bar(df_rates.index.astype(str), df_rates["pct_selected"].values)
    plt.title(title)
    plt.ylabel("% selected")
    plt.xticks(rotation=75, ha="right")
    plt.tight_layout()
    outpath = os.path.join(RESULTS_DIR, filename)
    plt.savefig(outpath, dpi=200)
    plt.show()
    print("Saved:", outpath)

plot_bar(top10_authors, "Top 10 author items by selection rate", "bar_top10_authors.png")
plot_bar(bottom10_authors, "Bottom 10 author items by selection rate", "bar_bottom10_authors.png")


In [None]:
# =========================================================
# CELL 11: Foil selection rates and “alluring foils”
# =========================================================

foil_rates = selection_rates(foil_cols)
foil_rates.index.name = "foil_item"

mean_false_alarms_per_person = scores["false_alarms"].mean()
sd_false_alarms_per_person = scores["false_alarms"].std(ddof=1)

never_selected = foil_rates[foil_rates["pct_selected"] == 0.0]
most_selected_foil_name = foil_rates.index[0]
most_selected_foil_rate = float(foil_rates.iloc[0]["pct_selected"])

mean_foil_item_rate = foil_rates["pct_selected"].mean()
threshold_25x = 2.5 * mean_foil_item_rate
high_rate_foils = foil_rates[foil_rates["pct_selected"] >= threshold_25x]

print("Foil summary (observed-denominator):")
print("Mean false alarms per participant:", mean_false_alarms_per_person)
print("SD false alarms per participant:", sd_false_alarms_per_person)
print("Foils never selected (count):", never_selected.shape[0])
print("Most selected foil:", most_selected_foil_name, "=", most_selected_foil_rate)
print("Mean foil item selection rate (%):", mean_foil_item_rate)
print("2.5× mean foil item rate threshold (%):", threshold_25x)

print("\nFoils never selected (names):")
print(list(never_selected.index))

print("\nFoils selected ≥ 2.5× mean foil item rate:")
print(high_rate_foils)

# Save
foil_rates.to_csv(os.path.join(RESULTS_DIR, "foil_selection_rates_computed.csv"), index=True)
never_selected.to_csv(os.path.join(RESULTS_DIR, "foils_never_selected.csv"), index=True)
high_rate_foils.to_csv(os.path.join(RESULTS_DIR, "foils_ge_2p5x_mean_rate.csv"), index=True)


In [None]:
# =========================================================
# CELL A1: Load high-rate foils (≥ 2.5× mean foil item rate)
# =========================================================

import os
import pandas as pd
import numpy as np
from difflib import SequenceMatcher
import re

# If you already computed these in-notebook, you can skip reading from disk.
# Otherwise point to the file you saved earlier:
HIGH_RATE_FOILS_PATH = os.path.join(RESULTS_DIR, "foils_ge_2p5x_mean_rate.csv")

high_rate_foils_df = pd.read_csv(HIGH_RATE_FOILS_PATH)
# Expecting: index column might be saved; handle both cases robustly:
if "foil_item" in high_rate_foils_df.columns:
    high_rate_foil_names = high_rate_foils_df["foil_item"].astype(str).tolist()
elif "Unnamed: 0" in high_rate_foils_df.columns:
    high_rate_foil_names = high_rate_foils_df["Unnamed: 0"].astype(str).tolist()
else:
    # Fall back: if file contains only the index as first col
    high_rate_foil_names = high_rate_foils_df.iloc[:, 0].astype(str).tolist()

print("Loaded high-rate foils count:", len(high_rate_foil_names))
print("First 10 high-rate foils:", high_rate_foil_names[:10])

# Verify they exist in your foil_cols
missing_from_foils = sorted(set(high_rate_foil_names) - set(foil_cols))
if len(missing_from_foils) > 0:
    print("WARNING: These high-rate foils are not in foil_cols (check naming):")
    print(missing_from_foils[:50])
else:
    print("All high-rate foils match foil_cols.")


In [None]:
# =========================================================
# CELL A2: Alluring-foil audit (string diagnostics only)
# =========================================================

def normalize_name(s: str) -> str:
    s = str(s).strip().lower()
    s = re.sub(r"\s+", " ", s)
    return s

def split_tokens(s: str):
    s = normalize_name(s)
    toks = [t for t in re.split(r"[\s\-_,.]+", s) if t]
    return toks

def seq_sim(a: str, b: str) -> float:
    return SequenceMatcher(None, a, b).ratio()

author_names = [str(a) for a in author_cols]
author_norm  = [normalize_name(a) for a in author_names]
author_toks  = [split_tokens(a) for a in author_names]
author_token_set = set(tok for toks in author_toks for tok in toks)

def closest_author(foil_norm: str):
    sims = [seq_sim(foil_norm, a) for a in author_norm]
    j = int(np.argmax(sims))
    return float(sims[j]), author_names[j]

# For checking first+last token pair matches
author_first_last = set()
for a in author_names:
    toks = split_tokens(a)
    if len(toks) >= 2:
        author_first_last.add((toks[0], toks[-1]))

rows = []
for foil_item in high_rate_foil_names:
    f_norm = normalize_name(foil_item)
    f_toks = split_tokens(foil_item)

    max_sim, close_auth = closest_author(f_norm)

    overlap_tokens = [t for t in f_toks if t in author_token_set]

    first_tok = f_toks[0] if len(f_toks) else ""
    last_tok  = f_toks[-1] if len(f_toks) else ""

    first_match = (first_tok in author_token_set) if first_tok else False
    last_match  = (last_tok in author_token_set) if last_tok else False

    pair_match = False
    if first_tok and last_tok and len(f_toks) >= 2:
        pair_match = (first_tok, last_tok) in author_first_last

    # Grab foil selection rate (%)
    foil_pct = float(foil_rates.loc[foil_item, "pct_selected"]) if foil_item in foil_rates.index else np.nan

    rows.append({
        "foil_item": foil_item,
        "foil_pct_selected": foil_pct,
        "max_string_similarity_to_any_author": max_sim,
        "closest_author_by_string_similarity": close_auth,
        "foil_tokens": " ".join(f_toks),
        "overlapping_tokens_with_author_list": " ".join(overlap_tokens),
        "any_token_overlap": bool(len(overlap_tokens) > 0),
        "first_token_matches_any_author_token": bool(first_match),
        "last_token_matches_any_author_token": bool(last_match),
        "first_last_pair_matches_an_author": bool(pair_match),
        "note": "Text-only diagnostics; any real-world identity confusion requires web and is UNVERIFIED."
    })

audit_df = pd.DataFrame(rows).sort_values("foil_pct_selected", ascending=False)

print("Alluring foil audit (high-rate foils only):")
print(audit_df)

audit_path = os.path.join(RESULTS_DIR, "alluring_foil_audit_high_rate_only.csv")
audit_df.to_csv(audit_path, index=False)
print("Saved:", audit_path)


In [None]:
# =========================================================
# CELL A3: Rescore excluding high-rate foils
# =========================================================

# Define the foil set to drop (only among foils)
drop_foils = [f for f in high_rate_foil_names if f in foil_cols]
keep_foils = [f for f in foil_cols if f not in drop_foils]

print("Foils to DROP (count):", len(drop_foils))
print("Foils to KEEP (count):", len(keep_foils))

# Recompute false alarms using only KEEP foils
false_alarms_keep = X[keep_foils].sum(axis=1)

# Hits unchanged (still all authors)
hits_same = X[author_cols].sum(axis=1)

standard_score_keep = hits_same - false_alarms_keep
name_score_same = hits_same

scores_excl = pd.DataFrame({
    "hits": hits_same,
    "false_alarms_excluding_high_rate_foils": false_alarms_keep,
    "standard_score_excluding_high_rate_foils": standard_score_keep,
    "name_score": name_score_same
})

# Compare original vs revised at summary level
def summarize(s):
    return pd.Series({"mean": s.mean(), "sd": s.std(ddof=1), "min": s.min(), "max": s.max(), "skew": s.skew()})

comparison = pd.DataFrame({
    "original_false_alarms": summarize(scores["false_alarms"]),
    "revised_false_alarms": summarize(scores_excl["false_alarms_excluding_high_rate_foils"]),
    "original_standard_score": summarize(scores["standard_score"]),
    "revised_standard_score": summarize(scores_excl["standard_score_excluding_high_rate_foils"]),
}).T

print("\nSummary comparison (original vs excluding high-rate foils):")
print(comparison)

# Save outputs
scores_excl.to_csv(os.path.join(RESULTS_DIR, "scores_by_participant_excluding_high_rate_foils.csv"), index=False)
comparison.to_csv(os.path.join(RESULTS_DIR, "score_summary_excluding_high_rate_foils.csv"), index=True)

print("\nSaved rescored participant table and rescored summary.")


In [None]:
# =========================================================
# CELL A4: Plots comparing original vs revised scoring
# =========================================================

import matplotlib.pyplot as plt

def hist_compare(a, b, title_a, title_b, filename_prefix, bins=30):
    plt.figure(figsize=(8,5))
    plt.hist(a, bins=bins, alpha=0.6, label=title_a)
    plt.hist(b, bins=bins, alpha=0.6, label=title_b)
    plt.title(f"{title_a} vs {title_b}")
    plt.xlabel("Value")
    plt.ylabel("Count")
    plt.legend()
    plt.tight_layout()
    outpath = os.path.join(RESULTS_DIR, f"{filename_prefix}.png")
    plt.savefig(outpath, dpi=200)
    plt.show()
    print("Saved:", outpath)

hist_compare(scores["false_alarms"], 
             scores_excl["false_alarms_excluding_high_rate_foils"],
             "False alarms (all foils)", 
             "False alarms (excluding high-rate foils)",
             "hist_false_alarms_original_vs_revised")

hist_compare(scores["standard_score"],
             scores_excl["standard_score_excluding_high_rate_foils"],
             "Standard score (all foils)",
             "Standard score (excluding high-rate foils)",
             "hist_standard_score_original_vs_revised")


In [None]:
# =========================================================
# CELL B1: Missingness diagnostics (items + participants)
# =========================================================

# Re-load raw item matrix BEFORE fillna(0), if you overwrote it.
X_raw = df[item_cols].copy()

missing_by_item = X_raw.isna().mean().sort_values(ascending=False)  # proportion missing per item
missing_by_person = X_raw.isna().mean(axis=1)                      # proportion missing per person

missing_item_df = missing_by_item.rename("missing_prop").to_frame()
missing_item_df.index.name = "item"

print("Participants with any missing in item block:", int((missing_by_person > 0).sum()))
print("Max missing proportion within a participant:", float(missing_by_person.max()))

# Save
missing_item_df.to_csv(os.path.join(RESULTS_DIR, "missingness_by_item_column.csv"), index=True)
pd.Series(missing_by_person, name="missing_prop").to_csv(os.path.join(RESULTS_DIR, "missingness_by_participant.csv"), index=False)

# Plot: histogram of participant missingness
plt.figure(figsize=(8,5))
plt.hist(missing_by_person, bins=30)
plt.title("Participant-level missingness (proportion missing in item block)")
plt.xlabel("Missing proportion")
plt.ylabel("Count")
plt.tight_layout()
outpath = os.path.join(RESULTS_DIR, "hist_missingness_by_participant.png")
plt.savefig(outpath, dpi=200)
plt.show()
print("Saved:", outpath)


In [None]:
# =========================================================
# CELL B2: Sensitivity scoring
#   A) missing -> 0 (current)
#   B) complete-case participants only
# =========================================================

def compute_scores(X_bin: pd.DataFrame, author_cols, foil_cols):
    hits = X_bin[author_cols].sum(axis=1)
    fa = X_bin[foil_cols].sum(axis=1)
    return pd.DataFrame({
        "hits": hits,
        "false_alarms": fa,
        "standard_score": hits - fa,
        "name_score": hits
    })

# Policy A: Missing -> 0 (your current approach)
X_A = X_raw.apply(pd.to_numeric, errors="coerce").fillna(0).astype(int)
scores_A = compute_scores(X_A, author_cols, foil_cols)

# Policy B: Complete-case participants only (drop any row with any missing in item block)
complete_mask = ~X_raw.isna().any(axis=1)
X_B = X_raw.loc[complete_mask].apply(pd.to_numeric, errors="coerce").astype(int)
scores_B = compute_scores(X_B, author_cols, foil_cols)

def summarize_scores(scores_df, label):
    return pd.DataFrame({
        "label": [label],
        "N": [scores_df.shape[0]],
        "hits_mean": [scores_df["hits"].mean()],
        "hits_sd": [scores_df["hits"].std(ddof=1)],
        "fa_mean": [scores_df["false_alarms"].mean()],
        "fa_sd": [scores_df["false_alarms"].std(ddof=1)],
        "standard_mean": [scores_df["standard_score"].mean()],
        "standard_sd": [scores_df["standard_score"].std(ddof=1)],
        "name_mean": [scores_df["name_score"].mean()],
        "name_sd": [scores_df["name_score"].std(ddof=1)],
    })

sens = pd.concat([
    summarize_scores(scores_A, "Policy_A_missing_to_0"),
    summarize_scores(scores_B, "Policy_B_complete_case_only")
], ignore_index=True)

print("Missingness sensitivity summary:")
print(sens)

# Save
sens.to_csv(os.path.join(RESULTS_DIR, "missingness_sensitivity_score_summary.csv"), index=False)
scores_B.to_csv(os.path.join(RESULTS_DIR, "scores_by_participant_complete_case_only.csv"), index=False)
print("Saved sensitivity outputs.")


In [None]:
# =========================================================
# CELL 12: Plot top foils by selection rate
# =========================================================

top_k = min(20, foil_rates.shape[0])
top_foils = foil_rates.head(top_k)

plt.figure(figsize=(10, 5))
plt.bar(top_foils.index.astype(str), top_foils["pct_selected"].values)
plt.title(f"Top {top_k} foils by selection rate")
plt.ylabel("% selected")
plt.xticks(rotation=75, ha="right")
plt.tight_layout()
outpath = os.path.join(RESULTS_DIR, f"bar_top{top_k}_foils.png")
plt.savefig(outpath, dpi=200)
plt.show()
print("Saved:", outpath)


In [None]:
# =========================================================
# CELL 13: Enhanced alluring-foil audit (text-based only)
# =========================================================
# For each high-rate foil (>= 2.5× mean foil item rate):
#   - max string similarity to any author item
#   - token overlap flags (any, first, last)
#   - first+last token pair match against authors

# Use in-memory high_rate_foils from Cell 11 (no CSV reload needed)
high_rate_foil_names = list(high_rate_foils.index)

def normalize_name(s: str) -> str:
    s = str(s).strip().lower()
    s = re.sub(r"\s+", " ", s)
    return s

def split_tokens(s: str):
    s = normalize_name(s)
    return [t for t in re.split(r"[\s\-_,.]+", s) if t]

def seq_sim(a: str, b: str) -> float:
    return SequenceMatcher(None, a, b).ratio()

author_names = [str(a) for a in author_cols]
author_norm = [normalize_name(a) for a in author_names]
author_toks = [split_tokens(a) for a in author_names]
author_token_set = set(tok for toks in author_toks for tok in toks)

def closest_author(foil_norm: str):
    sims = [seq_sim(foil_norm, a) for a in author_norm]
    j = int(np.argmax(sims))
    return float(sims[j]), author_names[j]

# Build set of (first_token, last_token) pairs from authors for pair-match check
author_first_last = set()
for a in author_names:
    toks = split_tokens(a)
    if len(toks) >= 2:
        author_first_last.add((toks[0], toks[-1]))

if len(high_rate_foil_names) == 0:
    audit_df = pd.DataFrame(columns=[
        "foil_item", "foil_pct_selected",
        "max_string_similarity_to_any_author", "closest_author_by_string_similarity",
        "foil_tokens", "overlapping_tokens_with_author_list",
        "any_token_overlap",
        "first_token_matches_any_author_token", "last_token_matches_any_author_token",
        "first_last_pair_matches_an_author",
        "note",
    ])
    print("No foils met the >= 2.5x mean rate threshold; audit table is empty.")
else:
    audit_rows = []
    for foil_item in high_rate_foil_names:
        f_norm = normalize_name(foil_item)
        f_toks = split_tokens(foil_item)

        max_sim, close_auth = closest_author(f_norm)

        overlap_tokens = [t for t in f_toks if t in author_token_set]

        first_tok = f_toks[0] if f_toks else ""
        last_tok = f_toks[-1] if f_toks else ""

        first_match = (first_tok in author_token_set) if first_tok else False
        last_match = (last_tok in author_token_set) if last_tok else False

        pair_match = False
        if first_tok and last_tok and len(f_toks) >= 2:
            pair_match = (first_tok, last_tok) in author_first_last

        foil_pct = float(foil_rates.loc[foil_item, "pct_selected"]) if foil_item in foil_rates.index else np.nan

        audit_rows.append({
            "foil_item": foil_item,
            "foil_pct_selected": foil_pct,
            "max_string_similarity_to_any_author": max_sim,
            "closest_author_by_string_similarity": close_auth,
            "foil_tokens": " ".join(f_toks),
            "overlapping_tokens_with_author_list": " ".join(overlap_tokens),
            "any_token_overlap": bool(len(overlap_tokens) > 0),
            "first_token_matches_any_author_token": bool(first_match),
            "last_token_matches_any_author_token": bool(last_match),
            "first_last_pair_matches_an_author": bool(pair_match),
            "note": "Text-only diagnostics; any real-world identity confusion requires web and is UNVERIFIED.",
        })

    audit_df = pd.DataFrame(audit_rows).sort_values("foil_pct_selected", ascending=False)
    print("Alluring-foil audit (high-rate foils only):")
    print(audit_df)

audit_path = os.path.join(RESULTS_DIR, "alluring_foil_audit_high_rate_only.csv")
audit_df.to_csv(audit_path, index=False)
print("Saved:", audit_path)


In [None]:
# =========================================================
# CELL 14: Rescore excluding high-rate foils
# =========================================================
# Recompute false alarms and standard scores using only foils
# that are NOT in the high-rate set (>= 2.5x mean foil item rate).

drop_foils = [f for f in high_rate_foil_names if f in foil_cols]
keep_foils = [f for f in foil_cols if f not in drop_foils]

print("Foils to DROP (count):", len(drop_foils))
print("Foils to KEEP (count):", len(keep_foils))

false_alarms_keep = X[keep_foils].sum(axis=1, min_count=len(keep_foils))
hits_same = X[author_cols].sum(axis=1, min_count=len(author_cols))
standard_score_keep = hits_same - false_alarms_keep
name_score_same = hits_same

scores_excl = pd.DataFrame({
    "hits": hits_same,
    "false_alarms_excluding_high_rate_foils": false_alarms_keep,
    "standard_score_excluding_high_rate_foils": standard_score_keep,
    "name_score": name_score_same,
})

# Summary comparison: original vs revised
def summarize(s):
    return pd.Series({
        "mean": s.mean(), "sd": s.std(ddof=1),
        "min": s.min(), "max": s.max(), "skew": s.skew(),
    })

comparison = pd.DataFrame({
    "original_false_alarms": summarize(scores["false_alarms"]),
    "revised_false_alarms": summarize(scores_excl["false_alarms_excluding_high_rate_foils"]),
    "original_standard_score": summarize(scores["standard_score"]),
    "revised_standard_score": summarize(scores_excl["standard_score_excluding_high_rate_foils"]),
}).T

print("\nSummary comparison (original vs excluding high-rate foils):")
print(comparison)

scores_excl.to_csv(os.path.join(RESULTS_DIR, "scores_by_participant_excluding_high_rate_foils.csv"), index=False)
comparison.to_csv(os.path.join(RESULTS_DIR, "score_summary_excluding_high_rate_foils.csv"), index=True)
print("\nSaved rescored participant table and summary.")

In [None]:
# =========================================================
# CELL 15: Comparison histograms — original vs revised scoring
# =========================================================

def hist_compare(a, b, title_a, title_b, filename_prefix, bins=30):
    a_clean, b_clean = a.dropna(), b.dropna()
    plt.figure(figsize=(8, 5))
    plt.hist(a_clean, bins=bins, alpha=0.6, label=title_a)
    plt.hist(b_clean, bins=bins, alpha=0.6, label=title_b)
    plt.title(f"{title_a} vs {title_b}")
    plt.xlabel("Value")
    plt.ylabel("Count")
    plt.legend()
    plt.tight_layout()
    outpath = os.path.join(RESULTS_DIR, f"{filename_prefix}.png")
    plt.savefig(outpath, dpi=200)
    plt.show()
    print("Saved:", outpath)

hist_compare(
    scores["false_alarms"],
    scores_excl["false_alarms_excluding_high_rate_foils"],
    "False alarms (all foils)",
    "False alarms (excl. high-rate foils)",
    "hist_false_alarms_original_vs_revised",
)

hist_compare(
    scores["standard_score"],
    scores_excl["standard_score_excluding_high_rate_foils"],
    "Standard score (all foils)",
    "Standard score (excl. high-rate foils)",
    "hist_standard_score_original_vs_revised",
)

In [None]:
# =========================================================
# CELL 16: Missingness diagnostics (items + participants)
# =========================================================
# Uses X_num_prefill (numeric-coerced, NaN-preserving) from Cell 4.

missing_by_item = X_num_prefill.isna().mean().sort_values(ascending=False)
missing_by_person = X_num_prefill.isna().mean(axis=1)

missing_item_df = missing_by_item.rename("missing_prop").to_frame()
missing_item_df.index.name = "item"

n_persons_any_missing = int((missing_by_person > 0).sum())
max_person_missing = float(missing_by_person.max())
print(f"Participants with any missing in item block: {n_persons_any_missing}")
print(f"Max missing proportion within a participant: {max_person_missing:.4f}")

missing_item_df.to_csv(os.path.join(RESULTS_DIR, "missingness_by_item_detailed.csv"), index=True)
pd.Series(missing_by_person, name="missing_prop").to_csv(
    os.path.join(RESULTS_DIR, "missingness_by_participant.csv"), index=False
)

plt.figure(figsize=(8, 5))
plt.hist(missing_by_person, bins=30)
plt.title("Participant-level missingness (proportion missing in item block)")
plt.xlabel("Missing proportion")
plt.ylabel("Count")
plt.tight_layout()
outpath = os.path.join(RESULTS_DIR, "hist_missingness_by_participant.png")
plt.savefig(outpath, dpi=200)
plt.show()
print("Saved:", outpath)

In [None]:
# =========================================================
# CELL 17: Sensitivity scoring
#   Policy A: strict NaN (current default — score = NaN if any item missing)
#   Policy B: lenient fill-0 (old approach — missing treated as "not selected")
#   Policy C: prorated-with-threshold (keeps mostly-complete participants)
# =========================================================

def compute_scores_from_matrix(X_bin, author_cols, foil_cols):
    hits = X_bin[author_cols].sum(axis=1)
    fa = X_bin[foil_cols].sum(axis=1)
    return pd.DataFrame({
        "hits": hits,
        "false_alarms": fa,
        "standard_score": hits - fa,
        "name_score": hits,
    })

# Policy A: strict NaN (reuse scores from Cell 5)
scores_A = scores.copy()

# Policy B: lenient fill missing -> 0 (old approach, for comparison)
X_filled = X_num_prefill.fillna(0).astype(int)
scores_B = compute_scores_from_matrix(X_filled, author_cols, foil_cols)

# Policy C: prorated-with-threshold (from Cell 5)
scores_C = scores_prorated.copy()

# Also compute complete-case mask (useful for reporting)
complete_mask = ~X_num_prefill.isna().any(axis=1)


def summarize_scores(scores_df, label):
    valid = scores_df.dropna(subset=["standard_score"])
    return pd.DataFrame({
        "label": [label],
        "N_total": [scores_df.shape[0]],
        "N_valid": [valid.shape[0]],
        "hits_mean": [valid["hits"].mean()],
        "hits_sd": [valid["hits"].std(ddof=1)],
        "fa_mean": [valid["false_alarms"].mean()],
        "fa_sd": [valid["false_alarms"].std(ddof=1)],
        "standard_mean": [valid["standard_score"].mean()],
        "standard_sd": [valid["standard_score"].std(ddof=1)],
        "name_mean": [valid["name_score"].mean()],
        "name_sd": [valid["name_score"].std(ddof=1)],
    })

sens = pd.concat([
    summarize_scores(scores_A, "Policy_A_strict_nan"),
    summarize_scores(scores_B, "Policy_B_lenient_fill_0"),
    summarize_scores(scores_C, "Policy_C_prorated_minfrac_0.975"),
], ignore_index=True)

print("Missingness sensitivity summary:")
print(sens)

sens.to_csv(os.path.join(RESULTS_DIR, "missingness_sensitivity_score_summary.csv"), index=False)
scores_B.to_csv(os.path.join(RESULTS_DIR, "scores_by_participant_lenient_fill0.csv"), index=False)
scores_C.to_csv(os.path.join(RESULTS_DIR, "scores_by_participant_prorated.csv"), index=False)
print("Saved sensitivity outputs.")

In [None]:
# =========================================================
# CELL 18: Save artifacts for downstream factor-analysis notebook
# =========================================================

scores_out = scores.copy()
scores_out.to_csv(os.path.join(RESULTS_DIR, "scores_by_participant.csv"), index=False)

print("Saved participant-level scores:", os.path.join(RESULTS_DIR, "scores_by_participant.csv"))
print("Saved author selection rates:", os.path.join(RESULTS_DIR, "author_selection_rates_computed.csv"))
print("Saved foil selection rates:", os.path.join(RESULTS_DIR, "foil_selection_rates_computed.csv"))

# Compute sensitivity delta for summary (NaN-safe)
_valid_std = scores["standard_score"].dropna()
_valid_excl_std = scores_excl["standard_score_excluding_high_rate_foils"].dropna()
_delta_std = float(_valid_excl_std.mean() - _valid_std.mean())

_n_scored = int(_valid_std.shape[0])
_n_excluded_missing = int(scores["standard_score"].isna().sum())

run_summary = {
    "N": int(N),
    "n_participants_scored": _n_scored,
    "n_participants_excluded_missing": _n_excluded_missing,
    "n_authors": int(n_auth),
    "n_foils": int(n_foil),
    "mean_hits": float(scores["hits"].dropna().mean()),
    "mean_false_alarms": float(scores["false_alarms"].dropna().mean()),
    "mean_standard_score": float(_valid_std.mean()),
    "mean_name_score": float(scores["name_score"].dropna().mean()),
    "mean_author_item_rate_pct": float(mean_author_sel),
    "mean_foil_item_rate_pct": float(mean_foil_item_rate),
    "most_selected_foil": most_selected_foil_name,
    "most_selected_foil_rate_pct": float(most_selected_foil_rate),
    "never_selected_foils_count": int(never_selected.shape[0]),
    "high_rate_foils_count_ge_2p5x": int(high_rate_foils.shape[0]),
    "threshold_foil_2p5x_pct": float(threshold_25x),
    "missing_handling": "strict_nan_if_any_item_missing",
    # Fields from Cells 14-17
    "n_high_rate_foils_excluded": len(drop_foils),
    "revised_mean_standard_score": float(_valid_excl_std.mean()),
    "n_complete_case_participants": int(complete_mask.sum()),
    "sensitivity_delta_standard_score": _delta_std,
}
with open(os.path.join(RESULTS_DIR, "run_summary.json"), "w") as f:
    json.dump(run_summary, f, indent=2)

print("Wrote run_summary.json")
