In [1]:
# iBKT Evaluation â€” single-notebook version
import math
from dataclasses import dataclass
from typing import Dict, List, Iterable, Tuple, Optional

import numpy as np
import pandas as pd
from sklearn.metrics import roc_auc_score, brier_score_loss


In [2]:
# Game-type weights (so coding doesn't dominate too much)
GAME_TYPE_WEIGHTS = {"coding": 2.0, "non_coding": 1.0}

DEFAULT_TIME_LIMITS = {
    "debugging": 300.0, "hangman": 300.0, "crossword": 300.0, "wordsearch": 300.0,
}

def _safe_float(v, default: float = 0.0) -> float:
    try:
        return float(v) if v is not None else default
    except (TypeError, ValueError):
        return default

def _expected_time(entry: dict) -> float:
    T = _safe_float(entry.get("time_limit"), 0.0)
    if T > 0:
        return T
    m = (entry.get("minigame_type") or entry.get("game_type") or "").strip().lower()
    return DEFAULT_TIME_LIMITS.get(m, 300.0)

def _observed_time(entry: dict) -> float:
    t = _safe_float(entry.get("minigame_time_taken"), 0.0)
    T = _expected_time(entry)
    return max(0.0, min(t, 2.0 * T))  # clamp huge pauses

def _time_multiplier(entry: dict, is_correct: bool) -> float:
    t = _observed_time(entry)
    if t <= 0:
        return 1.0
    T = max(1e-6, _expected_time(entry))
    r = min(2.0, max(0.0, t / T))  # observed/budget
    r0, beta, alpha = 0.8, 2.0, 0.20
    signed = 1.0 if is_correct else -1.0
    mult = 1.0 + signed * alpha * math.tanh(beta * (r0 - r))
    return max(0.90, min(1.10, mult))  # tighter clamp

def _game_weight(entry: dict) -> float:
    g = (entry.get("game_type") or "").strip().lower()
    return GAME_TYPE_WEIGHTS.get(g, GAME_TYPE_WEIGHTS["non_coding"])

def _norm_diff(d: Optional[str]) -> str:
    if not d:
        return "intermediate"
    d = str(d).strip().lower()
    if d.startswith("beg"): return "beginner"
    if d.startswith("inter"): return "intermediate"
    if d.startswith("adv"): return "advanced"
    if d.startswith("mast"): return "master"
    return d if d in {"beginner","intermediate","advanced","master"} else "intermediate"

DIFF_LEVELS = {"beginner": 0, "intermediate": 1, "advanced": 2, "master": 3}
def _diff_level(d: Optional[str]) -> int:
    return DIFF_LEVELS.get(_norm_diff(d), 1)

def _diff_centered(level: int) -> float:
    # beginner:-1, intermediate:-0.333..., advanced:+0.333..., master:+1
    return (max(0, min(3, level)) - 1.5) / 1.5

def _impact_with_difficulty(base_impact: float, correct: bool, level: int) -> float:
    c = _diff_centered(level)  # [-1..+1]
    k = 0.20
    scale = (1.0 + k * c) if correct else (1.0 - k * c)
    return max(0.5, min(5.0, base_impact * scale))

def _mistakes_from_entry(entry: dict) -> int:
    for key in ("mistakes","mistake_count","num_mistakes","attempts_before_correct","attempts"):
        if key in entry and entry[key] is not None:
            try:
                return min(3, max(0, int(entry[key])))  # cap at 3
            except (TypeError, ValueError):
                pass
    return 0


In [3]:
@dataclass
class BKTParams:
    p_L0: float = 0.20
    p_T: float = 0.10         # learning after observation
    p_T_wrong: float = 0.02   # small learning from mistakes
    p_S: float = 0.10         # slip
    p_G: float = 0.20         # guess
    decay_wrong: float = 0.90 # lighter forgetting baseline
    min_floor: float = 0.001
    max_ceiling: float = 0.999

def _observe_params_with_difficulty(base: BKTParams, level: int):
    # Small, bounded adjustments by difficulty + tighter bands
    c = _diff_centered(level)           # [-1..+1]
    s_k, g_k, d_k = 0.04, 0.04, 0.07
    p_S_eff = max(0.06, min(0.14, base.p_S + s_k*c))
    p_G_eff = max(0.12, min(0.28, base.p_G - g_k*c))
    # Move decay toward 0.98 on hard, 0.80 on easy
    target = 0.98 if c > 0 else 0.80
    decay_eff = base.decay_wrong + d_k*(target - base.decay_wrong) + 0.05*c
    decay_eff = max(0.80, min(0.98, decay_eff))
    return p_S_eff, p_G_eff, decay_eff

def bkt_update_once(p_know: float, correct: bool, p: BKTParams) -> float:
    if correct:
        num = p_know * (1.0 - p.p_S)
        den = num + (1.0 - p_know) * p.p_G
    else:
        num = p_know * p.p_S
        den = num + (1.0 - p_know) * (1.0 - p.p_G)
    post = 0.0 if den == 0 else num / den
    pT_use = p.p_T if correct else p.p_T_wrong
    p_next = post + (1.0 - post) * pT_use
    if not correct:
        p_next *= p.decay_wrong
    return max(p.min_floor, min(p.max_ceiling, p_next))

def bkt_update_fractional(p_know: float, correct: bool, p: BKTParams, impact: float) -> float:
    rounds = int(max(0, math.floor(impact)))
    for _ in range(rounds):
        p_know = bkt_update_once(p_know, correct, p)
    frac = max(0.0, impact - rounds)
    if frac > 1e-6:
        p_soft = BKTParams(
            p_L0=p.p_L0,
            p_T=max(1e-6, min(0.95, p.p_T * frac)),
            p_T_wrong=p.p_T_wrong,
            p_S=p.p_S,
            p_G=p.p_G,
            decay_wrong=1.0 - (1.0 - p.decay_wrong) * frac,
            min_floor=p.min_floor,
            max_ceiling=p.max_ceiling,
        )
        p_know = bkt_update_once(p_know, correct, p_soft)
    return p_know

def _lr_mult_from_practice(wins: float, fails: float) -> float:
    n = max(1.0, wins + fails)
    perf = (wins - fails) / n  # [-1..+1]
    m = 1.0 + 0.5 * perf       # [0.5..1.5]
    return max(0.5, min(1.5, m))


In [4]:
def _predict_p_correct(p_know: float, p_S: float, p_G: float) -> float:
    return float(p_know * (1.0 - p_S) + (1.0 - p_know) * p_G)

def evaluate_events(events: Iterable[dict], *, base_prior: float = 0.20) -> dict:
    base = BKTParams(p_L0=base_prior)
    user_state: Dict[int, Dict[int, float]] = {}     # user_id -> {subtopic_id: p_know}
    extra_counters: Dict[Tuple[int,int], Tuple[float,float]] = {}  # (user, subtopic) -> (wins,fails)

    rows: List[dict] = []
    for e in list(events):
        user_id = int(e.get("user_id", 0) or 0)
        subtopic_ids = e.get("subtopic_ids") or []
        if not user_id or not subtopic_ids:
            continue

        is_correct = bool(e.get("is_correct", False))
        difficulty = _norm_diff(e.get("estimated_difficulty"))
        level = _diff_level(difficulty)

        p_S_eff, p_G_eff, decay_eff = _observe_params_with_difficulty(base, level)
        mistakes = _mistakes_from_entry(e)

        base_weight = float(_game_weight(e))
        time_mult = _time_multiplier(e, is_correct)
        impact_raw = base_weight * time_mult
        impact = _impact_with_difficulty(impact_raw, is_correct, level)

        k = max(1, len(subtopic_ids))
        impact_each = impact / k

        for s_id in subtopic_ids:
            sub_map = user_state.setdefault(user_id, {})
            p_know = sub_map.get(s_id, base_prior)

            # Predict before update
            p_pred = _predict_p_correct(p_know, p_S_eff, p_G_eff)
            rows.append({
                "user_id": user_id,
                "subtopic_id": s_id,
                "is_correct": int(is_correct),
                "p_correct": float(np.clip(p_pred, 1e-9, 1-1e-9)),
                "game_type": (e.get("game_type") or "").strip().lower(),
                "estimated_difficulty": difficulty,
                "minigame_type": (e.get("minigame_type") or "").strip().lower(),
            })

            # Lightweight practice counters for lr scaling
            key = (user_id, s_id)
            w, f = extra_counters.get(key, (0.0, 0.0))
            if is_correct:
                w += impact_each
                f += impact_each * mistakes
            else:
                f += impact_each * (1 + mistakes)
            extra_counters[key] = (w, f)
            lr_mult = _lr_mult_from_practice(w, f)

            # Update mastery
            step = BKTParams(
                p_L0=p_know,
                p_T=max(1e-4, min(0.95, base.p_T * lr_mult)),
                p_T_wrong=base.p_T_wrong,
                p_S=p_S_eff,
                p_G=p_G_eff,
                decay_wrong=decay_eff,
                min_floor=base.min_floor,
                max_ceiling=base.max_ceiling,
            )
            new_p = bkt_update_fractional(p_know, is_correct, step, impact_each)
            sub_map[s_id] = new_p

    events_df = pd.DataFrame(rows)
    return {"events_df": events_df, "user_final_mastery": user_state}


In [5]:
def log_likelihood(y_true, y_pred) -> float:
    eps = 1e-9
    p = np.clip(np.asarray(y_pred, dtype=float), eps, 1 - eps)
    y = np.asarray(y_true, dtype=float)
    return float(np.mean(y * np.log(p) + (1 - y) * np.log(1 - p)))

def _macro_user_auc(df: pd.DataFrame) -> Optional[float]:
    aucs = []
    for _, g in df.groupby("user_id"):
        y = g["is_correct"].values
        if len(np.unique(y)) < 2:
            continue
        aucs.append(roc_auc_score(y, g["p_correct"].values))
    return None if not aucs else float(np.mean(aucs))

def metrics_overview(events_df: pd.DataFrame) -> pd.DataFrame:
    rows = []
    def _one(df: pd.DataFrame, name: str):
        if len(df) == 0:
            rows.append({"group": name, "n": 0, "avg_log_likelihood": np.nan, "auc_roc": np.nan, "brier": np.nan, "macro_user_auc": np.nan})
            return
        y, p = df["is_correct"].values, df["p_correct"].values
        ll = log_likelihood(y, p)
        auc = roc_auc_score(y, p) if len(np.unique(y)) == 2 else np.nan
        brier = brier_score_loss(y, p)
        mau = _macro_user_auc(df)
        rows.append({"group": name, "n": int(len(df)), "avg_log_likelihood": ll, "auc_roc": auc, "brier": brier, "macro_user_auc": mau})

    _one(events_df, "OVERALL")
    for gt, g in events_df.groupby("game_type"): _one(g, f"game_type={gt}")
    for d, g in events_df.groupby("estimated_difficulty"): _one(g, f"difficulty={d}")
    return pd.DataFrame(rows).sort_values(["group"]).reset_index(drop=True)

def breakdown_by(events_df: pd.DataFrame, group_col: str) -> pd.DataFrame:
    out = []
    for name, g in events_df.groupby(group_col):
        if len(g) == 0: continue
        y, p = g["is_correct"].values, g["p_correct"].values
        ll = log_likelihood(y, p)
        auc = roc_auc_score(y, p) if len(np.unique(y)) == 2 else np.nan
        brier = brier_score_loss(y, p)
        out.append({group_col: name, "n": int(len(g)), "log_likelihood": ll, "auc_roc": auc, "brier": brier})
    return pd.DataFrame(out).sort_values([group_col]).reset_index(drop=True)


In [6]:
# Replace this with real events from your DB/logs
rng = np.random.default_rng(7)
difficulties = ["beginner", "intermediate", "advanced", "master"]
minigames = ["crossword", "wordsearch", "debugging", "hangman"]

events = []
for user_id in [1, 2, 3]:
    for i in range(120):
        mg = minigames[rng.integers(0, len(minigames))]
        gt = "coding" if mg in {"debugging", "hangman"} else "non_coding"
        events.append({
            "user_id": user_id,
            "question_id": 1000 + i,
            "is_correct": int(rng.random() < 0.7),
            "estimated_difficulty": difficulties[rng.integers(0, len(difficulties))],
            "game_type": gt,
            "minigame_type": mg,
            "minigame_time_taken": float(rng.uniform(50, 250)),
            "time_limit": 300.0,
            "mistakes": int(rng.integers(0, 3)),
            "subtopic_ids": [int(rng.integers(1, 10))],
        })


In [7]:
res = evaluate_events(events)
df = res["events_df"]
df.head()


Unnamed: 0,user_id,subtopic_id,is_correct,p_correct,game_type,estimated_difficulty,minigame_type
0,1,3,0,0.326667,coding,advanced,hangman
1,1,8,0,0.353333,non_coding,intermediate,crossword
2,1,3,1,0.17773,non_coding,master,crossword
3,1,5,1,0.353333,coding,intermediate,debugging
4,1,9,0,0.3,coding,master,debugging


In [8]:
metrics_overview(df)


Unnamed: 0,group,n,avg_log_likelihood,auc_roc,brier,macro_user_auc
0,OVERALL,360,-0.828389,0.482565,0.289764,0.480666
1,difficulty=advanced,85,-0.833101,0.459975,0.291863,0.500817
2,difficulty=beginner,89,-0.855633,0.426548,0.290165,0.400088
3,difficulty=intermediate,88,-0.755457,0.566628,0.259539,0.57932
4,difficulty=master,98,-0.865051,0.445153,0.314719,0.427713
5,game_type=coding,184,-0.755356,0.56185,0.261752,0.559093
6,game_type=non_coding,176,-0.904742,0.396784,0.319048,0.410105


In [9]:
breakdown_by(df, "game_type")


Unnamed: 0,game_type,n,log_likelihood,auc_roc,brier
0,coding,184,-0.755356,0.56185,0.261752
1,non_coding,176,-0.904742,0.396784,0.319048


In [10]:
breakdown_by(df, "estimated_difficulty")


Unnamed: 0,estimated_difficulty,n,log_likelihood,auc_roc,brier
0,advanced,85,-0.833101,0.459975,0.291863
1,beginner,89,-0.855633,0.426548,0.290165
2,intermediate,88,-0.755457,0.566628,0.259539
3,master,98,-0.865051,0.445153,0.314719
