# Experiment DB Inspector (Prompts & Outputs)

This notebook is for **manual auditing** of the exact prompts and model outputs stored in the run databases used by the paper.

Itâ€™s designed to help you:
- Discover all run DBs in `runs-hpc-full/runs` (temperature sweep)
- Inspect full `system_prompt`, `user_prompt`, and model `raw_text` verbatim
- Randomly sample trials (with filters) to sanity-check responses
- View a single item across **conditions** (Control / Asch / Authority)
- Compare a single item across **temperatures** (by switching run DBs)
- Browse per-run artifacts (CSVs, JSON metrics, figures)


## Quickstart

1. Run the cells top-to-bottom.
2. If your runs live somewhere else, change `RUNS_BASE_DIR`.
3. Use `sample_trials(...)`, `show_trial(...)`, `show_item_across_conditions(...)`, and `compare_across_temperatures(...)`.

Notes:
- Some variants emit `<think>...</think>` blocks in `raw_text`. Use `strip_think=True` (default) to hide those while auditing.
- This notebook only reads from the databases and artifacts on disk.


In [None]:
from __future__ import annotations

import html
import json
import os
import random
import sqlite3
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path
from typing import Any, Iterable, Optional

import pandas as pd
from IPython.display import HTML, Image, Markdown, display

try:
    import ipywidgets as widgets  # optional
except Exception:
    widgets = None


def find_repo_root(start: Path) -> Path:
    for p in [start, *start.parents]:
        if (p / "pyproject.toml").exists():
            return p
    return start


REPO_ROOT = find_repo_root(Path.cwd())
RUNS_BASE_DIR = (REPO_ROOT / "runs-hpc-full" / "runs").resolve()
PAPER_TEX_PATH = (REPO_ROOT / "paper" / "paper.tex").resolve()

print("REPO_ROOT:", REPO_ROOT)
print("RUNS_BASE_DIR:", RUNS_BASE_DIR)
print("PAPER_TEX_PATH:", PAPER_TEX_PATH)
print("ipywidgets:", "available" if widgets is not None else "not installed")


In [None]:
@dataclass(frozen=True)
class RunInfo:
    run_dir: Path
    db_path: Path
    run_id: Optional[str]
    temperature: Optional[float]
    n_trials: int
    n_outputs: int
    variants: list[str]
    conditions: list[str]
    datasets: list[str]


def connect_sqlite(db_path: Path) -> sqlite3.Connection:
    conn = sqlite3.connect(str(db_path))
    conn.row_factory = sqlite3.Row
    return conn


def _scalar(conn: sqlite3.Connection, query: str, params: tuple[Any, ...] = ()) -> Any:
    row = conn.execute(query, params).fetchone()
    if row is None:
        return None
    return row[0]


def discover_run_dirs(base_dir: Path) -> list[Path]:
    if not base_dir.exists():
        raise FileNotFoundError(f"Base dir not found: {base_dir}")
    dirs: list[Path] = []
    for p in sorted(base_dir.iterdir()):
        if p.is_dir() and (p / "simulation.db").exists():
            dirs.append(p)
    return dirs


def load_run_info(run_dir: Path) -> RunInfo:
    db_path = run_dir / "simulation.db"
    conn = connect_sqlite(db_path)
    try:
        run_id = _scalar(conn, "SELECT run_id FROM runs LIMIT 1")
        temp = _scalar(conn, "SELECT MIN(temperature) FROM conformity_trials")
        n_trials = int(_scalar(conn, "SELECT COUNT(*) FROM conformity_trials") or 0)
        n_outputs = int(_scalar(conn, "SELECT COUNT(*) FROM conformity_outputs") or 0)
        variants_raw = _scalar(conn, "SELECT GROUP_CONCAT(DISTINCT variant) FROM conformity_trials")
        variants = sorted([v for v in (variants_raw or "").split(",") if v])
        conds_raw = _scalar(
            conn,
            """
            SELECT GROUP_CONCAT(DISTINCT c.name)
            FROM conformity_trials t
            JOIN conformity_conditions c ON c.condition_id = t.condition_id
            """,
        )
        conditions = sorted([c for c in (conds_raw or "").split(",") if c])
        datasets_raw = _scalar(conn, "SELECT GROUP_CONCAT(DISTINCT name) FROM conformity_datasets")
        datasets = sorted([d for d in (datasets_raw or "").split(",") if d])
        return RunInfo(
            run_dir=run_dir,
            db_path=db_path,
            run_id=run_id,
            temperature=float(temp) if temp is not None else None,
            n_trials=n_trials,
            n_outputs=n_outputs,
            variants=variants,
            conditions=conditions,
            datasets=datasets,
        )
    finally:
        conn.close()


run_dirs = discover_run_dirs(RUNS_BASE_DIR)
run_infos = [load_run_info(d) for d in run_dirs]

runs_index = pd.DataFrame(
    [
        {
            "temperature": ri.temperature,
            "run_dir": str(ri.run_dir),
            "run_id": ri.run_id,
            "n_trials": ri.n_trials,
            "n_outputs": ri.n_outputs,
            "variants": ",".join(ri.variants),
            "conditions": ",".join(ri.conditions),
            "datasets": ",".join(ri.datasets),
        }
        for ri in run_infos
    ]
)

display(runs_index.sort_values(["temperature", "run_dir"], na_position="last").reset_index(drop=True))


## Select A Run (One Temperature)

The temperature sweep is stored as *multiple run directories* (one DB per temperature). Pick a temperature and load its DB below.


In [None]:
def pick_run_by_temperature(target_temp: float) -> RunInfo:
    candidates = [ri for ri in run_infos if ri.temperature is not None and abs(ri.temperature - target_temp) < 1e-9]
    if not candidates:
        raise ValueError(f"No run found for temperature={target_temp}. Available: {sorted(set(r.temperature for r in run_infos))}")
    if len(candidates) > 1:
        print(f"Warning: multiple runs found for temperature={target_temp}; using the first.")
    return candidates[0]


SELECTED_TEMPERATURE = 0.0  # change me
selected_run = pick_run_by_temperature(SELECTED_TEMPERATURE)

conn = connect_sqlite(selected_run.db_path)
RUN_ID = _scalar(conn, "SELECT run_id FROM runs LIMIT 1")

print("Selected run_dir:", selected_run.run_dir)
print("DB:", selected_run.db_path)
print("RUN_ID:", RUN_ID)
print("Temperature:", selected_run.temperature)
print("Trials:", selected_run.n_trials)


In [None]:
def show_run_config(conn: sqlite3.Connection) -> None:
    config_json = _scalar(conn, "SELECT config_json FROM runs WHERE run_id = ?", (RUN_ID,))
    if not config_json:
        print("No config_json found in runs table.")
        return
    try:
        config = json.loads(config_json)
    except Exception:
        print("Config exists but could not be parsed as JSON.")
        return
    pretty = json.dumps(config, indent=2, sort_keys=True)
    display(HTML(f"<h3>runs.config_json</h3><pre style='white-space: pre-wrap'>{html.escape(pretty)}</pre>"))


show_run_config(conn)


In [None]:
def list_tables(conn: sqlite3.Connection) -> list[str]:
    rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;").fetchall()
    return [r[0] for r in rows]


def table_schema(conn: sqlite3.Connection, table: str) -> pd.DataFrame:
    return pd.read_sql_query(f"PRAGMA table_info({table});", conn)


def preview_table(conn: sqlite3.Connection, table: str, limit: int = 5) -> pd.DataFrame:
    return pd.read_sql_query(f"SELECT * FROM {table} LIMIT {int(limit)};", conn)


tables = list_tables(conn)
print(f"Tables ({len(tables)}):")
for t in tables:
    print(" -", t)


In [None]:
def show_trial_summary(conn: sqlite3.Connection) -> pd.DataFrame:
    q = """
    SELECT
        d.name AS dataset,
        c.name AS condition,
        t.variant,
        t.temperature,
        COUNT(*) AS n_trials,
        AVG(o.is_correct) AS accuracy,
        AVG(o.refusal_flag) AS refusal_rate
    FROM conformity_trials t
    JOIN conformity_conditions c ON c.condition_id = t.condition_id
    JOIN conformity_items i ON i.item_id = t.item_id
    JOIN conformity_datasets d ON d.dataset_id = i.dataset_id
    LEFT JOIN conformity_outputs o ON o.trial_id = t.trial_id
    WHERE t.run_id = ?
    GROUP BY 1,2,3,4
    ORDER BY 1,2,3,4;
    """
    return pd.read_sql_query(q, conn, params=(RUN_ID,))


summary_df = show_trial_summary(conn)
display(summary_df)


In [None]:
def _safe_json_loads(s: Optional[str]) -> dict[str, Any]:
    if not s:
        return {}
    try:
        return json.loads(s)
    except Exception:
        return {}


def strip_think(raw_text: str) -> str:
    """Remove a leading <think>...</think> block if present."""
    if raw_text is None:
        return ""
    marker = "</think>"
    if marker in raw_text:
        return raw_text.split(marker, 1)[1].lstrip()
    return raw_text


def trial_query(where_sql: str = "", limit_sql: str = "") -> str:
    return f"""
    SELECT
        t.trial_id,
        t.variant,
        t.temperature,
        c.name AS condition,
        d.name AS dataset,
        i.domain,
        t.item_id,
        i.question,
        i.ground_truth_text,
        i.source_json,
        p.system_prompt,
        p.user_prompt,
        o.raw_text,
        o.parsed_answer_text,
        o.is_correct,
        o.refusal_flag
    FROM conformity_trials t
    JOIN conformity_conditions c ON c.condition_id = t.condition_id
    JOIN conformity_items i ON i.item_id = t.item_id
    JOIN conformity_datasets d ON d.dataset_id = i.dataset_id
    LEFT JOIN conformity_prompts p ON p.trial_id = t.trial_id
    LEFT JOIN conformity_outputs o ON o.trial_id = t.trial_id
    WHERE t.run_id = ?
    {where_sql}
    {limit_sql}
    """


def sample_trials(
    conn: sqlite3.Connection,
    n: int = 5,
    dataset: Optional[str] = None,
    condition: Optional[str] = None,
    variant: Optional[str] = None,
    is_correct: Optional[int] = None,
    refusal_flag: Optional[int] = None,
    random_seed: Optional[int] = None,
    strip_think_output: bool = True,
) -> pd.DataFrame:
    where = []
    params: list[Any] = [RUN_ID]

    if dataset is not None:
        where.append("AND d.name = ?")
        params.append(dataset)
    if condition is not None:
        where.append("AND c.name = ?")
        params.append(condition)
    if variant is not None:
        where.append("AND t.variant = ?")
        params.append(variant)
    if is_correct is not None:
        where.append("AND o.is_correct = ?")
        params.append(int(is_correct))
    if refusal_flag is not None:
        where.append("AND o.refusal_flag = ?")
        params.append(int(refusal_flag))

    df = pd.read_sql_query(trial_query(where_sql="\n".join(where), limit_sql=""), conn, params=tuple(params))
    if df.empty:
        return df

    if random_seed is not None:
        rnd = random.Random(random_seed)
        sample_ids = rnd.sample(list(df["trial_id"]), k=min(int(n), len(df)))
        df = df[df["trial_id"].isin(sample_ids)].copy()
    else:
        df = df.sample(n=min(int(n), len(df)), random_state=None).copy()

    if strip_think_output:
        df["raw_text"] = df["raw_text"].fillna("").map(strip_think)

    # Helpful parsed fields
    df["wrong_answer"] = df["source_json"].apply(lambda s: _safe_json_loads(s).get("wrong_answer"))
    df["source_notes"] = df["source_json"].apply(lambda s: _safe_json_loads(s).get("notes"))

    # Reorder columns for inspection
    cols = [
        "trial_id",
        "dataset",
        "domain",
        "variant",
        "temperature",
        "condition",
        "item_id",
        "question",
        "ground_truth_text",
        "wrong_answer",
        "is_correct",
        "refusal_flag",
        "user_prompt",
        "raw_text",
        "parsed_answer_text",
        "source_notes",
        "system_prompt",
    ]
    cols = [c for c in cols if c in df.columns]
    return df[cols].reset_index(drop=True)


def show_trial(conn: sqlite3.Connection, trial_id: str, strip_think_output: bool = True) -> None:
    q = trial_query(where_sql="AND t.trial_id = ?", limit_sql="LIMIT 1")
    df = pd.read_sql_query(q, conn, params=(RUN_ID, trial_id))
    if df.empty:
        print(f"Trial not found: {trial_id}")
        return
    row = df.iloc[0].to_dict()

    source = _safe_json_loads(row.get("source_json"))
    wrong_answer = source.get("wrong_answer")
    notes = source.get("notes")

    raw_text = row.get("raw_text") or ""
    if strip_think_output:
        raw_text = strip_think(raw_text)

    parts = []
    parts.append(f"<h3>Trial: {html.escape(str(trial_id))}</h3>")
    meta = {
        "dataset": row.get("dataset"),
        "domain": row.get("domain"),
        "variant": row.get("variant"),
        "temperature": row.get("temperature"),
        "condition": row.get("condition"),
        "item_id": row.get("item_id"),
        "is_correct": row.get("is_correct"),
        "refusal_flag": row.get("refusal_flag"),
        "wrong_answer": wrong_answer,
    }
    parts.append(f"<pre style='white-space: pre-wrap'>{html.escape(json.dumps(meta, indent=2))}</pre>")
    if notes:
        parts.append(f"<b>Source notes</b><pre style='white-space: pre-wrap'>{html.escape(str(notes))}</pre>")

    parts.append(f"<b>Question</b><pre style='white-space: pre-wrap'>{html.escape(str(row.get('question') or ''))}</pre>")
    parts.append(f"<b>Ground truth</b><pre style='white-space: pre-wrap'>{html.escape(str(row.get('ground_truth_text') or ''))}</pre>")

    parts.append(f"<b>System prompt</b><pre style='white-space: pre-wrap'>{html.escape(str(row.get('system_prompt') or ''))}</pre>")
    parts.append(f"<b>User prompt</b><pre style='white-space: pre-wrap'>{html.escape(str(row.get('user_prompt') or ''))}</pre>")
    parts.append(f"<b>Model raw_text</b><pre style='white-space: pre-wrap'>{html.escape(str(raw_text))}</pre>")
    if row.get("parsed_answer_text"):
        parts.append(f"<b>parsed_answer_text</b><pre style='white-space: pre-wrap'>{html.escape(str(row.get('parsed_answer_text') or ''))}</pre>")

    display(HTML("\n".join(parts)))


## Random Sampling (Manual Audit)

Start here: sample a few trials and then call `show_trial(...)` for any row that looks suspicious.


In [None]:
# Example: sample across everything in the selected DB
df_sample = sample_trials(conn, n=5)
display(df_sample)

# To view a full transcript:
# show_trial(conn, df_sample.loc[0, 'trial_id'])


## Inspect One Item Across Conditions (Control / Asch / Authority)

Given an `item_id` and a `variant`, this will show the prompts and outputs for all conditions *within the selected temperature DB*.


In [None]:
def show_item_across_conditions(
    conn: sqlite3.Connection,
    item_id: str,
    variant: str,
    strip_think_output: bool = True,
    condition_order: Optional[list[str]] = None,
) -> pd.DataFrame:
    q = trial_query(where_sql="AND t.item_id = ? AND t.variant = ?", limit_sql="")
    df = pd.read_sql_query(q, conn, params=(RUN_ID, item_id, variant))
    if df.empty:
        print(f"No trials found for item_id={item_id}, variant={variant}")
        return df

    df = df.copy()
    df["wrong_answer"] = df["source_json"].apply(lambda s: _safe_json_loads(s).get("wrong_answer"))
    if strip_think_output:
        df["raw_text"] = df["raw_text"].fillna("").map(strip_think)

    # Display each condition as its own block
    conds = list(df["condition"].unique())
    if condition_order is None:
        condition_order = ["control", "asch_history_5", "authoritative_bias"]
    conds_sorted = [c for c in condition_order if c in conds] + [c for c in conds if c not in condition_order]

    for cond in conds_sorted:
        r = df[df["condition"] == cond].iloc[0].to_dict()
        title = f"{cond} | {r.get('dataset')} | {r.get('domain')} | {r.get('variant')} | T={r.get('temperature')}"
        display(HTML(f"<h3>{html.escape(title)}</h3>"))
        display(HTML(f"<b>System</b><pre style='white-space: pre-wrap'>{html.escape(str(r.get('system_prompt') or ''))}</pre>"))
        display(HTML(f"<b>User</b><pre style='white-space: pre-wrap'>{html.escape(str(r.get('user_prompt') or ''))}</pre>"))
        display(HTML(f"<b>Output</b><pre style='white-space: pre-wrap'>{html.escape(str(r.get('raw_text') or ''))}</pre>"))

    cols = ["trial_id", "condition", "question", "ground_truth_text", "wrong_answer", "is_correct", "refusal_flag"]
    cols = [c for c in cols if c in df.columns]
    return df[cols].sort_values("condition").reset_index(drop=True)


# Example:
# show_item_across_conditions(conn, item_id="mmlu_high_school_chemistry_0004", variant="think_sft")


## Search Items By Question Text

This is the quickest way to locate a paper appendix example: search for a distinctive substring from the question.


In [None]:
def search_items(conn: sqlite3.Connection, question_substr: str, limit: int = 50) -> pd.DataFrame:
    q = """
    SELECT
        i.item_id,
        d.name AS dataset,
        i.domain,
        i.question,
        i.ground_truth_text,
        i.source_json
    FROM conformity_items i
    JOIN conformity_datasets d ON d.dataset_id = i.dataset_id
    WHERE i.question LIKE ?
    ORDER BY i.item_id
    LIMIT ?;
    """
    return pd.read_sql_query(q, conn, params=(f"%{question_substr}%", int(limit)))


# Example:
# display(search_items(conn, "antimony"))


## Compare One Item Across Temperatures (Across Multiple Run DBs)

This loads the matching trial (same `item_id` + `variant` + `condition`) from each temperature DB and shows the outputs.


In [None]:
@lru_cache(maxsize=16)
def _conn_for_db(db_path_str: str) -> sqlite3.Connection:
    # Cached connections for convenience during interactive exploration.
    return connect_sqlite(Path(db_path_str))


def _run_id_for_db(conn: sqlite3.Connection) -> Optional[str]:
    return _scalar(conn, "SELECT run_id FROM runs LIMIT 1")


def fetch_trial_by_item_variant_condition(
    db_path: Path,
    item_id: str,
    variant: str,
    condition_name: str,
) -> Optional[dict[str, Any]]:
    conn2 = _conn_for_db(str(db_path))
    run_id2 = _run_id_for_db(conn2)
    if not run_id2:
        return None
    q = trial_query(where_sql="AND t.item_id = ? AND t.variant = ? AND c.name = ?", limit_sql="LIMIT 1")
    df = pd.read_sql_query(q, conn2, params=(run_id2, item_id, variant, condition_name))
    if df.empty:
        return None
    return df.iloc[0].to_dict()


def compare_across_temperatures(
    item_id: str,
    variant: str,
    condition_name: str,
    strip_think_output: bool = True,
) -> pd.DataFrame:
    rows: list[dict[str, Any]] = []
    for ri in sorted([r for r in run_infos if r.temperature is not None], key=lambda r: r.temperature):
        rec = fetch_trial_by_item_variant_condition(ri.db_path, item_id=item_id, variant=variant, condition_name=condition_name)
        if rec is None:
            continue
        raw = rec.get("raw_text") or ""
        if strip_think_output:
            raw = strip_think(raw)
        source = _safe_json_loads(rec.get("source_json"))
        rows.append(
            {
                "temperature": ri.temperature,
                "run_dir": str(ri.run_dir),
                "trial_id": rec.get("trial_id"),
                "dataset": rec.get("dataset"),
                "domain": rec.get("domain"),
                "question": rec.get("question"),
                "ground_truth_text": rec.get("ground_truth_text"),
                "wrong_answer": source.get("wrong_answer"),
                "user_prompt": rec.get("user_prompt"),
                "raw_text": raw,
                "is_correct": rec.get("is_correct"),
                "refusal_flag": rec.get("refusal_flag"),
            }
        )

    df = pd.DataFrame(rows).sort_values("temperature").reset_index(drop=True)
    if df.empty:
        print("No matching trials found across temperatures.")
        return df

    # Show a compact table first
    display(df[["temperature", "trial_id", "is_correct", "refusal_flag", "wrong_answer"]])

    # Then show full outputs
    for r in df.to_dict(orient="records"):
        title = f"T={r['temperature']} | trial_id={r['trial_id']} | correct={r['is_correct']} | refusal={r['refusal_flag']}"
        display(HTML(f"<h3>{html.escape(title)}</h3>"))
        display(HTML(f"<b>User</b><pre style='white-space: pre-wrap'>{html.escape(str(r.get('user_prompt') or ''))}</pre>"))
        display(HTML(f"<b>Output</b><pre style='white-space: pre-wrap'>{html.escape(str(r.get('raw_text') or ''))}</pre>"))

    return df


# Paper-related example (Authority prompt that can look "incomplete" due to the underlying question):
# compare_across_temperatures(item_id="mmlu_high_school_chemistry_0004", variant="think_sft", condition_name="authoritative_bias")


## Browse Per-Run Artifacts (CSVs / JSON / Figures)

Each run directory often includes `artifacts/tables`, `artifacts/logs/tables`, and `artifacts/figures`.


In [None]:
def list_run_files(run_dir: Path, max_files: int = 200) -> pd.DataFrame:
    files: list[dict[str, Any]] = []
    for p in sorted(run_dir.rglob("*")):
        if not p.is_file():
            continue
        files.append(
            {
                "path": str(p),
                "relpath": str(p.relative_to(run_dir)),
                "ext": p.suffix.lower(),
                "bytes": p.stat().st_size,
            }
        )
        if len(files) >= max_files:
            break
    return pd.DataFrame(files)


def preview_csv(path: Path, n: int = 20) -> pd.DataFrame:
    df = pd.read_csv(path)
    print(f"CSV: {path} | shape={df.shape}")
    return df.head(int(n))


def preview_json(path: Path) -> Any:
    obj = json.loads(path.read_text())
    pretty = json.dumps(obj, indent=2, sort_keys=True)
    display(HTML(f"<h3>JSON: {html.escape(str(path))}</h3><pre style='white-space: pre-wrap'>{html.escape(pretty)}</pre>"))
    return obj


def show_png(path: Path, width: int = 900) -> None:
    display(Image(filename=str(path), width=width))


files_df = list_run_files(selected_run.run_dir)
display(files_df)

# Examples:
# display(preview_csv(selected_run.run_dir / "artifacts" / "tables" / "conformity_rate_by_variant.csv"))
# preview_json(selected_run.run_dir / "artifacts" / "logs" / "metrics_behavioral.json")
# show_png(selected_run.run_dir / "artifacts" / "figures" / "accuracy_by_condition.png")


## (Optional) Small Widget UI

If `ipywidgets` is installed, this provides a quick dropdown to sample and inspect trials without editing code.


In [None]:
if widgets is None:
    print("ipywidgets is not installed. You can still use the functions above.")
else:
    temps = sorted([r.temperature for r in run_infos if r.temperature is not None])
    w_temp = widgets.Dropdown(options=temps, value=SELECTED_TEMPERATURE, description="Temp")
    w_n = widgets.IntSlider(value=3, min=1, max=20, step=1, description="n")
    w_strip = widgets.Checkbox(value=True, description="strip_think")
    w_go = widgets.Button(description="Sample")
    out = widgets.Output()

    def on_click(_):
        global conn, RUN_ID, selected_run
        with out:
            out.clear_output()
            selected_run = pick_run_by_temperature(float(w_temp.value))
            conn = connect_sqlite(selected_run.db_path)
            RUN_ID = _scalar(conn, "SELECT run_id FROM runs LIMIT 1")
            df = sample_trials(conn, n=int(w_n.value), strip_think_output=bool(w_strip.value))
            display(df)

    w_go.on_click(on_click)
    display(widgets.HBox([w_temp, w_n, w_strip, w_go]))
    display(out)
