# Badis — Notebook d'entraînement DKT **enrichi**
Ce notebook entraîne un modèle **Deep Knowledge Tracing (DKT)** enrichi par des signaux contextuels (difficulté, ajustements ZPDES, temps de réponse, motivation, anxiété, phase).

## Points clés
- Schéma d'interaction unique: `(skill_id, correct, difficulty, adjustment, time_spent, motivation, anxiety_level, phase_type)`
- Encodage étendu pour DKT: concaténation de one-hot (`skill_id`, `phase_type`) + variables continues
- Modèle: GRU → projection sigmoïde (probabilités de réussite par compétence)
- Perte: **BCE** sur la probabilité de réussite du **pas suivant** pour la compétence réellement tentée
- Métriques: AUC, Accuracy
- Visualisations: courbes d'entraînement (loss, AUC)
- Données: générateur synthétique **ou** chargement d'un CSV externe



## 1) Installation & imports

In [None]:
# Si nécessaire, décommentez pour installer des dépendances
# !pip install scikit-learn torch pandas numpy matplotlib

import math
import random
from dataclasses import dataclass
from typing import List, Dict, Tuple

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import roc_auc_score, accuracy_score
import matplotlib.pyplot as plt

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device


## 2) Schéma des données & phases
Nous utilisons un enregistrement **unique** par interaction:  
`(skill_id, correct, difficulty, adjustment, time_spent, motivation, anxiety_level, phase_type)`

- `skill_id` : entier dans `[0..K-1]`  
- `correct` : {0,1}  
- `difficulty` : flottant ∈ [0.2, 0.9]  
- `adjustment` : {-1, 0, +1} (ajustement de ZPDES)  
- `time_spent` : temps de réponse (sec)  
- `motivation` : [0,1]  
- `anxiety_level` : [0,1]  
- `phase_type` : dans `{'hub','exploration','lecon','synthese','defi'}`


## 3) Générateur de données synthétiques (+ règles ZPDES)

In [None]:
PHASES = ['hub','exploration','lecon','synthese','defi']

def zpdes_adjust(difficulty: float, consec_fail: int, consec_succ: int):
    """Règle simple (inspirée ZPDES) pour ajuster la difficulté en temps réel.
    - Si >2 échecs consécutifs → baisse la difficulté
    - Si >3 succès consécutifs → augmente la difficulté
    - Bornes: [0.2, 0.9]
    Retourne (new_difficulty, adjustment)
    """
    adjustment = 0
    if consec_fail > 2:
        adjustment = -1
    elif consec_succ > 3:
        adjustment = +1
    new_diff = min(0.9, max(0.2, difficulty + 0.1 * adjustment))
    return new_diff, adjustment

def generate_synthetic_classroom(
    num_students=120, num_skills=12, max_seq_len=80, seed=42
):
    """Génère un DataFrame d'interactions avec le schéma enrichi.
    Hypothèses simplifiées:
    - Chaque étudiant a une maîtrise latente par compétence (beta)
    - La probabilité de réussite dépend de la maîtrise, de la difficulté et de l'anxiété
    - Un mini-contrôle ZPDES ajuste la difficulté selon les réussites/échecs récents
    """
    rng = random.Random(seed)
    np.random.seed(seed)

    rows = []
    for s in range(num_students):
        # Profil élève
        mastery = np.clip(np.random.normal(0.0, 0.6, size=num_skills), -1.5, 1.5)
        motivation = np.clip(np.random.beta(2, 2), 0, 1)
        anxiety_base = np.clip(np.random.beta(2, 5), 0, 1)

        # séquence individuelle
        seq_len = rng.randint(max_seq_len//2, max_seq_len)
        diff = rng.uniform(0.3, 0.7)
        c_fail = 0
        c_succ = 0
        for t in range(seq_len):
            skill = rng.randrange(num_skills)
            phase = rng.choice(PHASES)

            # anxiété contextuelle (augmente avec les échecs consécutifs)
            anxiety = np.clip(anxiety_base + 0.05*c_fail - 0.03*c_succ, 0, 1)
            # temps de réponse: plus haut si difficulté/anxiété élevées
            time_spent = max(0.5, np.random.lognormal(mean=math.log(2 + 6*diff + 3*anxiety), sigma=0.4))
            # prob réussite (logit simple)
            logit = mastery[skill] - (diff - 0.5)*2.0 - (anxiety - 0.3)*1.2
            p_correct = 1 / (1 + math.exp(-logit))
            correct = 1 if rng.random() < p_correct else 0

            # MàJ compteurs
            if correct:
                c_succ += 1
                c_fail = 0
                # légère progression de maîtrise
                mastery[skill] = np.clip(mastery[skill] + 0.05*(0.8 - diff), -2, 2)
            else:
                c_fail += 1
                c_succ = 0
                # légère baisse de maîtrise perçue
                mastery[skill] = np.clip(mastery[skill] - 0.03*(diff + anxiety), -2, 2)

            # ZPDES adjust
            new_diff, adjust = zpdes_adjust(diff, c_fail, c_succ)
            diff = new_diff

            rows.append({
                "student_id": s,
                "t": t,
                "skill_id": skill,
                "correct": correct,
                "difficulty": round(diff, 3),
                "adjustment": adjust,
                "time_spent": float(time_spent),
                "motivation": float(motivation),
                "anxiety_level": float(anxiety),
                "phase_type": phase
            })

    df = pd.DataFrame(rows)
    return df

df = generate_synthetic_classroom(num_students=2500, num_skills=5, max_seq_len=100, seed=7)
df.head()


In [None]:

print(df.shape)
print(df.describe(include='all').T.iloc[:10])
df['phase_type'].value_counts().head()

### (Option) Charger vos propres données
Le CSV doit contenir **au minimum** ces colonnes:
- `student_id, t, skill_id, correct, difficulty, adjustment, time_spent, motivation, anxiety_level, phase_type`

Décommentez et ajustez le chemin si vous avez un fichier :

In [None]:
# df = pd.read_csv('models/mes_traces.csv')  
# df.head()

## 4) Encodage des features (one-hot + continues normalisées)

In [None]:
from sklearn.preprocessing import OneHotEncoder, StandardScaler

def build_encoders(df: pd.DataFrame):
    ohe_skill = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
    ohe_phase = OneHotEncoder(sparse_output=False, handle_unknown='ignore')

    ohe_skill.fit(df[['skill_id']])
    ohe_phase.fit(df[['phase_type']])

    scaler = StandardScaler()
    cont = df[['difficulty','adjustment','time_spent','motivation','anxiety_level']].astype(float)
    scaler.fit(cont)

    return ohe_skill, ohe_phase, scaler

ohe_skill, ohe_phase, scaler = build_encoders(df)

num_skills = len(ohe_skill.categories_[0])
num_phases = len(ohe_phase.categories_[0])
feature_dim = num_skills + num_phases + 5  # 5 continuous features
num_skills, num_phases, feature_dim


## 5) Dataset séquentiel (par élève)

In [None]:
def encode_row(row, ohe_skill, ohe_phase, scaler):
    skill_oh = ohe_skill.transform([[row['skill_id']]])[0]
    phase_oh  = ohe_phase.transform([[row['phase_type']]])[0]
    cont = np.array([[row['difficulty'], row['adjustment'], row['time_spent'], row['motivation'], row['anxiety_level']]], dtype=float)
    cont_scaled = scaler.transform(cont)[0]
    x = np.concatenate([skill_oh, phase_oh, cont_scaled], axis=0).astype(np.float32)
    return x

class DKTSequenceDataset(Dataset):
    def __init__(self, df: pd.DataFrame, ohe_skill, ohe_phase, scaler, num_skills: int, seq_len_cap: int = 150):
        self.num_skills = num_skills
        self.ohe_skill = ohe_skill
        self.ohe_phase = ohe_phase
        self.scaler = scaler
        self.seq_len_cap = seq_len_cap

        # group by student and sort by t
        grouped = []
        for sid, g in df.sort_values(['student_id','t']).groupby('student_id'):
            g = g.reset_index(drop=True)
            # cap sequence length (take last seq_len_cap interactions to keep "recent" signal)
            if len(g) > seq_len_cap:
                g = g.iloc[-seq_len_cap:].reset_index(drop=True)
            # build tensors
            X = np.stack([encode_row(g.iloc[i], ohe_skill, ohe_phase, scaler) for i in range(len(g))])
            # next-step targets: for t, predict correctness at t+1
            # also need "next skill" indices to select correct head from multi-skill output
            next_correct = np.zeros(len(g), dtype=np.float32)
            next_skill = np.zeros(len(g), dtype=np.int64)
            for i in range(len(g)-1):
                next_correct[i] = g.loc[i+1, 'correct']
                next_skill[i]   = int(g.loc[i+1, 'skill_id'])
            next_correct[-1] = g.loc[len(g)-1, 'correct']  # tail (won't be used if masked)
            next_skill[-1]   = int(g.loc[len(g)-1, 'skill_id'])
            mask = np.ones(len(g), dtype=np.float32)
            mask[-1] = 0.0  # typically don't score last step since no next action

            grouped.append((X, next_skill, next_correct, mask))

        self.samples = grouped

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        X, next_skill, next_correct, mask = self.samples[idx]
        return torch.from_numpy(X), torch.from_numpy(next_skill), torch.from_numpy(next_correct), torch.from_numpy(mask)

dataset = DKTSequenceDataset(df, ohe_skill, ohe_phase, scaler, num_skills=num_skills, seq_len_cap=150)
len(dataset), dataset[0][0].shape


## 6) Split train/val/test

In [None]:
indices = np.arange(len(dataset))
np.random.shuffle(indices)
n = len(indices)
train_idx = indices[:int(0.7*n)]
val_idx   = indices[int(0.7*n):int(0.85*n)]
test_idx  = indices[int(0.85*n):]

def subset(dataset, idxs):
    class _Subset(Dataset):
        def __init__(self, base, idxs):
            self.base = base
            self.idxs = idxs
        def __len__(self): return len(self.idxs)
        def __getitem__(self, i): return self.base[self.idxs[i]]
    return _Subset(dataset, idxs)

train_ds = subset(dataset, train_idx)
val_ds   = subset(dataset, val_idx)
test_ds  = subset(dataset, test_idx)

BATCH_SIZE = 16

def collate(batch):
    # variable-length handling via padding to max_len within batch
    Xs, ks, ys, ms = zip(*batch)
    lengths = [x.shape[0] for x in Xs]
    max_len = max(lengths)

    X_pad = torch.zeros(len(batch), max_len, Xs[0].shape[1], dtype=torch.float32)
    k_pad = torch.zeros(len(batch), max_len, dtype=torch.long)
    y_pad = torch.zeros(len(batch), max_len, dtype=torch.float32)
    m_pad = torch.zeros(len(batch), max_len, dtype=torch.float32)

    for i,(x,k,y,m) in enumerate(batch):
        L = x.shape[0]
        X_pad[i,:L] = x
        k_pad[i,:L] = k
        y_pad[i,:L] = y
        m_pad[i,:L] = m

    return X_pad.to(device), k_pad.to(device), y_pad.to(device), m_pad.to(device), torch.tensor(lengths, dtype=torch.long).to(device)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate)
val_loader   = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate)
test_loader  = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate)

len(train_ds), len(val_ds), len(test_ds)


## 7) Modèle DKT enrichi (GRU)

In [None]:
class DKTEnriched(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_skills, num_layers=1, dropout=0.1):
        super().__init__()
        self.gru = nn.GRU(input_dim, hidden_dim, num_layers=num_layers, batch_first=True, dropout=dropout if num_layers>1 else 0.0)
        self.out = nn.Linear(hidden_dim, num_skills)
        self.sigmoid = nn.Sigmoid()

    def forward(self, X, lengths):
        # pack for efficiency
        packed = nn.utils.rnn.pack_padded_sequence(X, lengths.cpu(), batch_first=True, enforce_sorted=False)
        out_packed, h = self.gru(packed)
        out, _ = nn.utils.rnn.pad_packed_sequence(out_packed, batch_first=True)
        logits = self.out(out)  # [B, T, K]
        probs = self.sigmoid(logits)
        return probs  # prob of correctness for each skill at each step

model = DKTEnriched(input_dim=feature_dim, hidden_dim=128, num_skills=num_skills, num_layers=1, dropout=0.1).to(device)

total = sum(p.numel() for p in model.parameters())
trainables = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"Total: {total/1e6:.3f} M params  ({total:,})")
print(f"Apprenables: {trainables/1e6:.3f} M params")



## 8) Entraînement

In [None]:
def step_epoch(model, loader, optimizer=None):
    is_train = optimizer is not None
    if is_train: model.train()
    else: model.eval()

    losses = []
    all_preds = []
    all_tgts = []

    bce = nn.BCELoss(reduction='none')

    for X, k, y, m, lengths in loader:
        probs = model(X, lengths)  # [B,T,K]
        B,T,K = probs.shape

        # sélectionner la tête correspondant à la "next skill" (k)
        # gather over last dimension K
        idx = k.unsqueeze(-1)  # [B,T,1]
        sel = torch.gather(probs, dim=-1, index=idx).squeeze(-1)  # [B,T]

        # BCE + masque
        loss_mat = bce(sel, y)
        loss = (loss_mat * m).sum() / (m.sum() + 1e-8)

        if is_train:
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()

        losses.append(loss.item())

        # Stocker prédictions/targets masqués pour métriques
        mask_flat = m.detach().cpu().numpy().reshape(-1) > 0.5
        preds_flat = sel.detach().cpu().numpy().reshape(-1)
        tgts_flat  = y.detach().cpu().numpy().reshape(-1)

        all_preds.append(preds_flat[mask_flat])
        all_tgts.append(tgts_flat[mask_flat])

    if len(all_preds) == 0:
        return np.mean(losses), float('nan'), float('nan')

    preds = np.concatenate(all_preds)
    tgts = np.concatenate(all_tgts)

    # AUC peut échouer si tgts n'a qu'une classe
    try:
        auc = roc_auc_score(tgts, preds)
    except Exception:
        auc = float('nan')

    acc = accuracy_score(tgts >= 0.5, preds >= 0.5)

    return np.mean(losses), auc, acc

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)

EPOCHS = 12
history = {"train_loss":[], "val_loss":[], "train_auc":[], "val_auc":[], "train_acc":[], "val_acc":[]}
best_val = float('inf')
patience, bad = 4, 0
best_state = None

for epoch in range(1, EPOCHS+1):
    tr_loss, tr_auc, tr_acc = step_epoch(model, train_loader, optimizer=optimizer)
    vl_loss, vl_auc, vl_acc = step_epoch(model, val_loader, optimizer=None)

    history["train_loss"].append(tr_loss)
    history["val_loss"].append(vl_loss)
    history["train_auc"].append(tr_auc)
    history["val_auc"].append(vl_auc)
    history["train_acc"].append(tr_acc)
    history["val_acc"].append(vl_acc)

    print(f"Epoch {epoch:02d} | train_loss={tr_loss:.4f} val_loss={vl_loss:.4f} | train_auc={tr_auc:.3f} val_auc={vl_auc:.3f} | train_acc={tr_acc:.3f} val_acc={vl_acc:.3f}")

    if vl_loss < best_val - 1e-4:
        best_val = vl_loss
        best_state = {k:v.cpu().clone() for k,v in model.state_dict().items()}
        bad = 0
    else:
        bad += 1
        if bad >= patience:
            print("Early stopping.")
            break

if best_state is not None:
    model.load_state_dict(best_state)


## 9) Courbes d'entraînement

In [None]:
plt.figure()
plt.plot(history['train_loss'], label='train_loss')
plt.plot(history['val_loss'], label='val_loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

plt.figure()
plt.plot(history['train_auc'], label='train_auc')
plt.plot(history['val_auc'], label='val_auc')
plt.xlabel('Epoch')
plt.ylabel('AUC')
plt.legend()
plt.show()


## 10) Évaluation finale sur test

In [None]:
test_loss, test_auc, test_acc = step_epoch(model, test_loader, optimizer=None)
print({"test_loss": test_loss, "test_auc": test_auc, "test_acc": test_acc})

## 11) Sauvegarde du modèle & des encodeurs

In [None]:
MODEL_PATH = 'models/dkt_enriched.pt'
ENC_INFO_PATH = 'models/dkt_enriched_encoders.npz'

torch.save(model.state_dict(), MODEL_PATH)

# Sauvegarder encoders/scaler (paramètres nécessaires pour l'inférence)
np.savez(ENC_INFO_PATH,
         skill_categories=ohe_skill.categories_[0],
         phase_categories=ohe_phase.categories_[0],
         scaler_mean=scaler.mean_,
         scaler_scale=scaler.scale_)

print('Saved:', MODEL_PATH, ENC_INFO_PATH)

## 12) Démo d'inférence (séquence courte)

In [None]:
# Construire une mini-séquence (3 interactions) et inférer la proba de réussite par compétence au pas suivant
sample = df[df['student_id']==0].sort_values('t').head(5).copy()
X_seq = np.stack([encode_row(sample.iloc[i], ohe_skill, ohe_phase, scaler) for i in range(len(sample))])
X_seq = torch.from_numpy(X_seq).unsqueeze(0).to(device)
lengths = torch.tensor([X_seq.shape[1]]).to(device)
with torch.no_grad():
    probs = model(X_seq, lengths)  # [1,T,K]
probs[:, -1, :].cpu().numpy()[0][:10]  # afficher les 10 premières compétences


## 13) Export optionnel des données synthétiques

In [None]:
DATA_PATH = 'models/dkt_synth_data.csv'
df.to_csv(DATA_PATH, index=False)
DATA_PATH

## 14) Modèle de CSV (template)

In [None]:
TEMPLATE_PATH = 'models/dkt_template.csv'
template = pd.DataFrame({
    'student_id': [0,0,0],
    't': [0,1,2],
    'skill_id': [0,1,0],
    'correct': [1,0,1],
    'difficulty': [0.5,0.6,0.55],
    'adjustment': [0,-1,0],
    'time_spent': [3.2,5.1,2.7],
    'motivation': [0.7,0.65,0.68],
    'anxiety_level': [0.2,0.35,0.25],
    'phase_type': ['exploration','lecon','defi']
})
template.to_csv(TEMPLATE_PATH, index=False)

TEMPLATE_PATH