# Week 2 — Notebook 1: Dataset Preparation & DPO Pair Construction

This notebook covers:
1. Loading and inspecting raw data (~90k rows)
2. De-identification with standard libraries (no LLMs)
3. Data quality assessment
4. Constructing chosen/rejected pairs for DPO via two strategies:
   - **Weighted sum** scoring
   - **Single-perspective filtering** (isolating one metric delta)
5. Sample efficiency analysis — how much survives aggressive filtering

---
> **GPU requirement:** None — this is CPU/data-engineering work.  
> **Estimated runtime:** ~10–20 min depending on dataset size.

## 0. Environment & Imports

In [None]:
# Install dependencies if needed (comment out after first run)
# !pip install datasets pandas pyarrow presidio-analyzer presidio-anonymizer spacy tqdm
# !python -m spacy download en_core_web_lg

In [None]:
import os
import json
import random
import warnings
from pathlib import Path
from typing import Optional

import numpy as np
import pandas as pd
from datasets import Dataset, DatasetDict, load_dataset
from tqdm.auto import tqdm

warnings.filterwarnings("ignore")
random.seed(42)
np.random.seed(42)

# Paths
DATA_DIR = Path("../data")
DATA_DIR.mkdir(exist_ok=True)

print("Imports OK")

## 1. Load Raw Dataset

We use the **Argilla Distilabel Customer Support** dataset as a realistic proxy.  
Replace `DATASET_PATH` with your own Spark-processed parquet/JSONL if you have it.

In [None]:
# --- Option A: Load from HuggingFace Hub (public proxy dataset) ---
dataset = load_dataset("HuggingFaceH4/ultrafeedback_binarized", split="train_prefs")
df = dataset.to_pandas()

# --- Option B: Load your own data ---
# df = pd.read_parquet(DATA_DIR / "rollouts_raw.parquet")
# df = pd.read_json(DATA_DIR / "rollouts_raw.jsonl", lines=True)

print(f"Dataset shape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")
df.head(3)

In [None]:
# Quick stats
print(df.dtypes)
print("\nNull counts:")
print(df.isnull().sum())

## 2. De-identification (Standard Libraries)

Using **Microsoft Presidio** — no LLM calls, fully deterministic.

In [None]:
try:
    from presidio_analyzer import AnalyzerEngine
    from presidio_anonymizer import AnonymizerEngine

    analyzer = AnalyzerEngine()
    anonymizer = AnonymizerEngine()
    PRESIDIO_AVAILABLE = True
    print("Presidio loaded.")
except ImportError:
    PRESIDIO_AVAILABLE = False
    print("Presidio not installed — skipping de-id (install presidio-analyzer presidio-anonymizer spacy + en_core_web_lg)")


def deidentify(text: str) -> str:
    """Replace PII entities with type placeholders, e.g. <PERSON>."""
    if not PRESIDIO_AVAILABLE or not isinstance(text, str):
        return text
    results = analyzer.analyze(text=text, language="en")
    anonymized = anonymizer.anonymize(text=text, analyzer_results=results)
    return anonymized.text


# Test
sample = "Hi, I'm John Smith and my email is john@example.com, phone 555-1234."
print("Before:", sample)
print("After: ", deidentify(sample))

In [None]:
# Apply to the prompt column (adjust column name to your schema)
PROMPT_COL = "prompt"  # change if your column is named differently

if PROMPT_COL in df.columns and PRESIDIO_AVAILABLE:
    tqdm.pandas(desc="De-identifying prompts")
    df["prompt_clean"] = df[PROMPT_COL].progress_apply(deidentify)
    changed = (df["prompt_clean"] != df[PROMPT_COL]).sum()
    print(f"Records modified by de-id: {changed}/{len(df)} ({changed/len(df)*100:.1f}%)")
else:
    df["prompt_clean"] = df.get(PROMPT_COL, "")

## 3. Data Quality Assessment

In [None]:
def quality_metrics(df: pd.DataFrame, text_col: str) -> pd.DataFrame:
    """Compute basic quality signals on a text column."""
    d = df.copy()
    d["len_chars"] = d[text_col].str.len()
    d["len_words"] = d[text_col].str.split().str.len()
    d["is_empty"] = d[text_col].str.strip().str.len() == 0
    d["has_url"] = d[text_col].str.contains(r"https?://", na=False)
    d["is_duplicate"] = d.duplicated(subset=[text_col], keep=False)
    return d


df = quality_metrics(df, "prompt_clean")

print("=== Quality Summary ===")
print(f"Total rows        : {len(df):,}")
print(f"Empty prompts     : {df['is_empty'].sum():,}")
print(f"Duplicate prompts : {df['is_duplicate'].sum():,}")
print(f"Contains URL      : {df['has_url'].sum():,}")
print(f"\nPrompt length (words):")
print(df["len_words"].describe().round(1).to_string())

In [None]:
# Filter out bad rows
before = len(df)
df = df[
    ~df["is_empty"]
    & ~df.duplicated(subset=["prompt_clean"], keep="first")
    & df["len_words"].between(5, 2000)
].reset_index(drop=True)
print(f"Rows after quality filter: {len(df):,} (removed {before - len(df):,})")

## 4. Simulating VJ (Virtual Judge) Scores

In production these come from your scoring pipeline.  
Here we attach synthetic scores to demonstrate the pair-construction logic — swap in real columns when available.

In [None]:
# --- If your dataframe already has score columns, skip this cell ---
# Expected schema after this cell:
#   score_correctness    : float [0,1]
#   score_groundedness   : float [0,1]
#   score_problem_solution: float [0,1]
#   score_style          : float [0,1]
#   response_chosen      : str
#   response_rejected    : str

def _extract_text(cell):
    """Handle UltraFeedback schema where responses are lists of dicts."""
    if isinstance(cell, list) and len(cell) > 0:
        item = cell[0]
        if isinstance(item, dict):
            return item.get("content", str(item))
        return str(item)
    return str(cell)


# Extract chosen / rejected text
if "chosen" in df.columns:
    df["response_chosen"]   = df["chosen"].apply(_extract_text)
    df["response_rejected"] = df["rejected"].apply(_extract_text)
else:
    df["response_chosen"]   = "placeholder chosen response"
    df["response_rejected"] = "placeholder rejected response"

# Simulate per-response VJ scores (replace with real scores in production)
rng = np.random.default_rng(42)
n = len(df)
for col, mu_chosen, mu_rejected in [
    ("score_correctness",     0.75, 0.50),
    ("score_groundedness",    0.72, 0.55),
    ("score_problem_solution",0.70, 0.52),
    ("score_style",           0.65, 0.48),
]:
    df[f"{col}_chosen"]   = rng.normal(mu_chosen,   0.12, n).clip(0, 1)
    df[f"{col}_rejected"] = rng.normal(mu_rejected, 0.15, n).clip(0, 1)

print(df[[c for c in df.columns if c.startswith("score_")]].describe().round(3))

## 5. Strategy A — Weighted Sum Pair Selection

In [None]:
WEIGHTS = {
    "score_correctness":      0.40,
    "score_groundedness":     0.25,
    "score_problem_solution": 0.25,
    "score_style":            0.10,
}


def weighted_reward(df: pd.DataFrame, suffix: str, weights: dict) -> pd.Series:
    return sum(w * df[f"{k}_{suffix}"] for k, w in weights.items())


df["reward_chosen"]   = weighted_reward(df, "chosen",   WEIGHTS)
df["reward_rejected"] = weighted_reward(df, "rejected", WEIGHTS)
df["reward_delta"]    = df["reward_chosen"] - df["reward_rejected"]

# Ensure chosen > rejected (filter out noise)
DELTA_THRESHOLD = 0.05
df_weighted = df[df["reward_delta"] >= DELTA_THRESHOLD].copy()

print(f"Pairs surviving weighted-sum filter (delta >= {DELTA_THRESHOLD}):")
print(f"  {len(df_weighted):,} / {len(df):,} ({len(df_weighted)/len(df)*100:.1f}%)")
print(df_weighted["reward_delta"].describe().round(3))

## 6. Strategy B — Single-Perspective Filtering (Isolating Style Delta)

In [None]:
STYLE_DELTA_MIN    = 0.10   # chosen must score >= this better on style
CORRECTNESS_FLOOR  = 0.60   # chosen must maintain at least this correctness

df["style_delta"] = (
    df["score_style_chosen"] - df["score_style_rejected"]
)

df_style = df[
    (df["style_delta"] >= STYLE_DELTA_MIN)
    & (df["score_correctness_chosen"] >= CORRECTNESS_FLOOR)  # correctness floor!
].copy()

print(f"Pairs surviving single-perspective style filter:")
print(f"  {len(df_style):,} / {len(df):,} ({len(df_style)/len(df)*100:.1f}%)")

# Sanity check: is the rejected group's problem_solution perversely higher?
ps_chosen   = df_style["score_problem_solution_chosen"].mean()
ps_rejected = df_style["score_problem_solution_rejected"].mean()
print(f"\n[Sanity] problem_solution — chosen avg: {ps_chosen:.3f}, rejected avg: {ps_rejected:.3f}")
if ps_rejected > ps_chosen:
    print("  ⚠ WARNING: rejected group has higher problem_solution than chosen — negative correlation present.")
    print("  → See Notebook 04 for remedies (correctness floor, MOO, SFT warm-up).")
else:
    print("  ✓ No negative correlation detected.")

## 7. Sample Efficiency Summary

In [None]:
funnel = pd.DataFrame([
    {"stage": "Raw data",                  "rows": len(dataset)},
    {"stage": "After quality filter",      "rows": len(df)},
    {"stage": "Weighted-sum pairs",        "rows": len(df_weighted)},
    {"stage": "Single-perspective (style)","rows": len(df_style)},
])
funnel["retention_%"] = (funnel["rows"] / funnel["rows"].iloc[0] * 100).round(1)
print(funnel.to_string(index=False))
print("\nNote: aggressive multi-VJ filtering can reduce 24k → 700 pairs (see Notebook 04).")

## 8. Export DPO-Ready Datasets

In [None]:
def to_dpo_format(df: pd.DataFrame) -> pd.DataFrame:
    """Convert to TRL DPO trainer expected schema."""
    return pd.DataFrame({
        "prompt":   df["prompt_clean"],
        "chosen":   df["response_chosen"],
        "rejected": df["response_rejected"],
    })


for name, subset in [("weighted", df_weighted), ("style", df_style)]:
    out = to_dpo_format(subset)
    path = DATA_DIR / f"dpo_{name}.jsonl"
    out.to_json(path, orient="records", lines=True)
    print(f"Saved {len(out):,} pairs → {path}")

# Also save as HuggingFace Dataset
ds_weighted = Dataset.from_pandas(to_dpo_format(df_weighted))
ds_weighted.save_to_disk(str(DATA_DIR / "dpo_weighted_hf"))
print("HuggingFace dataset saved.")

---
## Summary

| Dataset | Pairs | Notes |
|---------|-------|-------|
| `dpo_weighted.jsonl` | ~N | Reward delta ≥ 0.05 across all VJs |
| `dpo_style.jsonl`    | ~N | Style delta ≥ 0.10 + correctness floor ≥ 0.60 |

**Next:** `02_finetuning_qlora.ipynb` — SFT warm-up → DPO on these pairs.