In [1]:
import sys
from pathlib import Path
sys.path.append(str(Path.cwd() / "notebooks"))  # so we can import _utils from notebooks/

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from _utils import get_data_dir

DATA_DIR = get_data_dir()
DATA_DIR

WindowsPath('C:/Users/AdamR/OneDrive/UCSB/VIU/HonorsThesis/data')

In [None]:
# === Load Gemini-2.5-Pro angle estimation CSVs for 50_50, 80_20, and 100_0 ===
from pathlib import Path
import pandas as pd

ROOT = DATA_DIR  # <-- ensure DATA_DIR points to your project root
FOLDERS = ["50_50", "80_20", "100_0"]
SUBPATH = "angle_estimations/gemini-2.5-pro.csv"

def safe_read_csv(path: Path, **kwargs) -> pd.DataFrame:
    """Robust CSV reader with fallback parsing."""
    defaults = dict(low_memory=False, encoding_errors="ignore")
    defaults.update(kwargs)
    try:
        return pd.read_csv(path, **defaults)
    except Exception:
        return pd.read_csv(path, engine="python", sep=None, **defaults)

def load_gemini_dataset(folder: Path) -> pd.DataFrame:
    """Load only the Gemini-2.5-Pro CSV from angle_estimations/ subfolder."""
    gemini_path = folder / SUBPATH
    if not gemini_path.exists():
        raise FileNotFoundError(f"Missing {SUBPATH} in {folder}")
    df = safe_read_csv(gemini_path)
    df["condition"] = folder.name  # tag condition (50_50, 80_20, 100_0)
    return df, gemini_path

# === Main loading loop ===
datasets = {}
records = []

for name in FOLDERS:
    folder = ROOT / name
    if not folder.exists():
        print(f"⚠️ Warning: Folder '{name}' not found under {ROOT}")
        continue

    try:
        df, path = load_gemini_dataset(folder)
        datasets[name] = df
        records.append({
            "condition": name,
            "n_rows": len(df),
            "n_cols": df.shape[1],
            "path": str(path.resolve())
        })
        print(f"✅ Loaded Gemini data for {name}: {path}")
    except FileNotFoundError as e:
        print("⚠️", e)

# === Summary table ===
gemini_index = pd.DataFrame.from_records(records).sort_values("condition").reset_index(drop=True)



✅ Loaded Gemini data for 50_50: C:\Users\AdamR\OneDrive\UCSB\VIU\HonorsThesis\data\50_50\angle_estimations\gemini-2.5-pro.csv
✅ Loaded Gemini data for 80_20: C:\Users\AdamR\OneDrive\UCSB\VIU\HonorsThesis\data\80_20\angle_estimations\gemini-2.5-pro.csv
✅ Loaded Gemini data for 100_0: C:\Users\AdamR\OneDrive\UCSB\VIU\HonorsThesis\data\100_0\angle_estimations\gemini-2.5-pro.csv

Summary of loaded Gemini datasets:


Unnamed: 0,condition,n_rows,n_cols,path
0,100_0,1000,9,C:\Users\AdamR\OneDrive\UCSB\VIU\HonorsThesis\...
1,50_50,1000,9,C:\Users\AdamR\OneDrive\UCSB\VIU\HonorsThesis\...
2,80_20,1000,9,C:\Users\AdamR\OneDrive\UCSB\VIU\HonorsThesis\...


In [19]:
def get_csv_columns(csv_path):
    """
    Return a list of column names from a CSV file.
    Accepts either a string/Path to a file or a pandas DataFrame.
    """
    if isinstance(csv_path, pd.DataFrame):
        return list(csv_path.columns)
    
    csv_path = Path(csv_path)
    if not csv_path.exists():
        raise FileNotFoundError(f"File not found: {csv_path}")
    
    # Read only the header row
    df = pd.read_csv(csv_path, nrows=0, encoding_errors="ignore")
    return list(df.columns)


In [16]:
# === Parse Gemini GPT_response into separate columns for each condition ===
import pandas as pd
import numpy as np

def parse_gpt_response(resp):
    """
    Parse GPT_response string of the form 'angle1, angle2, decision'
    → (float, float, str). Returns NaN if parsing fails.
    """
    if not isinstance(resp, str):
        return (np.nan, np.nan, np.nan)
    parts = [p.strip() for p in resp.split(",")]
    if len(parts) < 3:
        return (np.nan, np.nan, np.nan)
    try:
        a1 = float(parts[0])
        a2 = float(parts[1])
        decision = parts[2].lower()
        return (a1, a2, decision)
    except Exception:
        return (np.nan, np.nan, np.nan)

# --- Parse function to add columns
def add_parsed_columns(df):
    df = df.copy()
    parsed = df["GPT_response"].apply(parse_gpt_response)
    df[["est_angle_1", "est_angle_2", "decision"]] = pd.DataFrame(parsed.tolist(), index=df.index)
    return df


gemini_50_50 = add_parsed_columns(datasets.get("50_50", pd.DataFrame()))
gemini_80_20 = add_parsed_columns(datasets.get("80_20", pd.DataFrame()))
gemini_100_0 = add_parsed_columns(datasets.get("100_0", pd.DataFrame()))


datasets["50_50_parsed"] = gemini_50_50
datasets["80_20_parsed"] = gemini_80_20
datasets["100_0_parsed"] = gemini_100_0

display(gemini_100_0.head())


Unnamed: 0,image_id,side_selected,cue_points,line1_angle,line2_angle,valid_cue,TP,GPT_response,condition,est_angle_1,est_angle_2,decision
0,100,1,1,14.314827,1.921956,True,True,"-14.8, -1.2, present",100_0,-14.8,-1.2,present
1,845,1,1,15.054317,4.22223,True,True,"-15.2, 1.0, present",100_0,-15.2,1.0,present
2,245,1,1,14.314827,6.508956,True,True,"16.5, 4.5, present",100_0,16.5,4.5,present
3,72,2,2,8.775056,15.054317,True,True,"6.1, 16.0, present",100_0,6.1,16.0,present
4,469,2,2,4.22223,19.885165,True,True,"4.1, 18.2, present",100_0,4.1,18.2,present


In [18]:
# === Accuracy / SDT metrics for the three parsed Gemini datasets ===
import pandas as pd
import numpy as np

# --- Helper: ensure the three parsed DataFrames exist and are tagged with condition
def ensure_condition(df: pd.DataFrame, cond: str) -> pd.DataFrame:
    d = df.copy()
    if "condition" not in d.columns:
        d["condition"] = cond
    else:
        d["condition"] = (
            d["condition"].astype(str)
            .str.strip().str.replace(r"\s+", "", regex=True)
            .str.replace("-", "_").str.replace("__", "_")
        )
        # backfill with provided cond if missing/blank
        d.loc[d["condition"].isna() | (d["condition"] == ""), "condition"] = cond
    return d

# --- Helper: if parsed cols missing, parse GPT_response
def parse_gpt_response(resp):
    if not isinstance(resp, str):
        return (np.nan, np.nan, np.nan)
    parts = [p.strip() for p in resp.split(",")]
    if len(parts) < 3:
        return (np.nan, np.nan, np.nan)
    try:
        a1 = float(parts[0]); a2 = float(parts[1]); decision = parts[2].strip().lower()
        return (a1, a2, decision)
    except Exception:
        return (np.nan, np.nan, np.nan)

def ensure_parsed_columns(d: pd.DataFrame) -> pd.DataFrame:
    d = d.copy()
    need_parse = not {"est_angle_1", "est_angle_2", "decision"}.issubset(d.columns)
    if need_parse and "GPT_response" in d.columns:
        parsed = d["GPT_response"].apply(parse_gpt_response).tolist()
        d[["est_angle_1", "est_angle_2", "decision"]] = pd.DataFrame(parsed, index=d.index)
    return d

# --- Bring together the three parsed datasets
parts = []
for cond, df_part in [("50_50", gemini_50_50), ("80_20", gemini_80_20), ("100_0", gemini_100_0)]:
    if isinstance(df_part, pd.DataFrame) and not df_part.empty:
        d = ensure_condition(df_part, cond)
        d = ensure_parsed_columns(d)
        parts.append(d)
    else:
        print(f"⚠️ Missing or empty dataset for {cond}")

if not parts:
    raise ValueError("No parsed Gemini datasets available. Make sure gemini_50_50/80_20/100_0 exist.")

df_all = pd.concat(parts, ignore_index=True)

# --- Compute metrics by condition
def metrics_by_condition(df: pd.DataFrame) -> pd.DataFrame:
    d = df.copy()

    # map decision -> prediction
    d["pred"] = d["decision"].map({"present": 1, "absent": 0})
    d = d.dropna(subset=["pred", "TP", "condition"]).copy()

    # normalize & order condition labels
    order = ["50_50", "80_20", "100_0"]
    d["condition"] = (
        d["condition"].astype(str)
        .str.strip().str.replace(r"\s+", "", regex=True)
        .str.replace("-", "_").str.replace("__", "_")
    )
    d["condition"] = pd.Categorical(d["condition"], categories=order, ordered=True)

    rows = []
    for cond, sub in d.groupby("condition", observed=True):
        if pd.isna(cond):
            continue
        total = len(sub)
        tp = ((sub["pred"] == 1) & (sub["TP"] == 1)).sum()
        fp = ((sub["pred"] == 1) & (sub["TP"] == 0)).sum()
        tn = ((sub["pred"] == 0) & (sub["TP"] == 0)).sum()
        fn = ((sub["pred"] == 0) & (sub["TP"] == 1)).sum()

        accuracy = (tp + tn) / total if total else np.nan
        hit_rate = tp / (tp + fn) if (tp + fn) else np.nan
        fa_rate = fp / (fp + tn) if (fp + tn) else np.nan

        rows.append({
            "condition": cond,
            "n_trials": int(total),
            "accuracy": accuracy,
            "hit_rate": hit_rate,
            "false_alarm_rate": fa_rate,
            "TP_hits": int(tp),
            "FP_false_alarms": int(fp),
            "TN_correct_rejects": int(tn),
            "FN_misses": int(fn),
        })
    return pd.DataFrame(rows).sort_values("condition").reset_index(drop=True)

# --- Run & display

gemini_metrics = metrics_by_condition(df_all)
display(gemini_metrics)

# Also break out per-dataset (if you want individual variables)
metrics_50_50 = gemini_metrics.loc[gemini_metrics["condition"] == "50_50"].reset_index(drop=True)
metrics_80_20 = gemini_metrics.loc[gemini_metrics["condition"] == "80_20"].reset_index(drop=True)
metrics_100_0 = gemini_metrics.loc[gemini_metrics["condition"] == "100_0"].reset_index(drop=True)


Unnamed: 0,condition,n_trials,accuracy,hit_rate,false_alarm_rate,TP_hits,FP_false_alarms,TN_correct_rejects,FN_misses
0,100_0,1000,0.871,0.97,0.228,485,114,386,15
1,50_50,1000,0.865,0.916,0.186,458,93,407,42
2,80_20,1000,0.845,0.944,0.254,472,127,373,28


In [21]:
# === Angle-estimation metrics using MAGNITUDES ONLY ===
import pandas as pd
import numpy as np

def compute_angle_metrics_magnitude(df: pd.DataFrame) -> pd.Series:
    d = df.copy()

    # Ensure needed columns exist
    needed = {"est_angle_1", "est_angle_2", "line1_angle", "line2_angle"}
    missing = needed - set(d.columns)
    if missing:
        raise KeyError(f"Missing columns for magnitude metrics: {missing}")

    # Drop rows with NaNs in required fields
    d = d.dropna(subset=list(needed)).copy()

    # Compare absolute magnitudes (ignore sign)
    d["true1_mag"] = d["line1_angle"].abs()
    d["true2_mag"] = d["line2_angle"].abs()
    d["est1_mag"]  = d["est_angle_1"].abs()
    d["est2_mag"]  = d["est_angle_2"].abs()

    # Signed error in magnitude space (can be useful for bias)
    d["err1_mag"] = d["est1_mag"] - d["true1_mag"]
    d["err2_mag"] = d["est2_mag"] - d["true2_mag"]

    # Absolute error in magnitude space
    d["abs_err1_mag"] = d["err1_mag"].abs()
    d["abs_err2_mag"] = d["err2_mag"].abs()

    # Metrics
    metrics = {
        "MAE_mag_line1": d["abs_err1_mag"].mean(),
        "MAE_mag_line2": d["abs_err2_mag"].mean(),
        "RMSE_mag_line1": np.sqrt((d["err1_mag"]**2).mean()),
        "RMSE_mag_line2": np.sqrt((d["err2_mag"]**2).mean()),
        "Bias_mag_line1": d["err1_mag"].mean(),  # + means overestimating magnitude
        "Bias_mag_line2": d["err2_mag"].mean(),
        "Corr_mag_line1": d[["est1_mag", "true1_mag"]].corr().iloc[0,1],
        "Corr_mag_line2": d[["est2_mag", "true2_mag"]].corr().iloc[0,1],
        "n_trials": len(d),
    }
    return pd.Series(metrics)

# Compute per-condition (expects parsed DataFrames already exist)
angle_metrics_mag = pd.DataFrame({
    "50_50": compute_angle_metrics_magnitude(gemini_50_50),
    "80_20": compute_angle_metrics_magnitude(gemini_80_20),
    "100_0": compute_angle_metrics_magnitude(gemini_100_0),
}).T

display(angle_metrics_mag)


Unnamed: 0,MAE_mag_line1,MAE_mag_line2,RMSE_mag_line1,RMSE_mag_line2,Bias_mag_line1,Bias_mag_line2,Corr_mag_line1,Corr_mag_line2,n_trials
50_50,1.617646,1.663294,2.176531,2.175877,-1.221681,-0.49598,0.940647,0.933267,1000.0
80_20,1.45196,1.766039,1.970698,2.309326,-0.749881,-0.41375,0.942629,0.927198,1000.0
100_0,1.689481,1.787509,2.345441,2.374318,-0.418281,-0.32408,0.911256,0.914494,1000.0
