In [42]:
# bkt_and_dkt.py
# Requires: numpy, torch
# pip install numpy torch

import math
import random
import numpy as np
from typing import List, Tuple
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

In [43]:
# ---------------------
# BKT implementation
# ---------------------
class BKTModel:
    """
    Classic single-skill BKT:
      params: p_init, p_trans, slip, guess
      - absorbing learned state (if L_t=1 then L_{t+1}=1)
    """

    def __init__(self, p_init=0.1, p_trans=0.1, slip=0.1, guess=0.2):
        #set parameters
        self.p_init = float(p_init)
        self.p_trans = float(p_trans)
        self.slip = float(slip)
        self.guess = float(guess)

    def simulate_student(self, length: int, seed=None) -> List[int]:
        """Simulate a single student sequence of correctness (0/1)."""
        #initate initial skills
        if seed is not None:
            random.seed(seed)
        L = 1 if random.random() < self.p_init else 0
        obs = []
        for t in range(length):
            # observation for that the student answers correctly on current question
            if L == 1:
                prob_correct = 1.0 - self.slip
            else:
                prob_correct = self.guess
            c = 1 if random.random() < prob_correct else 0
            obs.append(c)
            # transition: if not learned, might learn 
            if L == 0 and random.random() < self.p_trans:
                L = 1
            # if L==1, stay learned (absorbing)
        return obs

    def simulate_dataset(self, n_students: int, seq_len: int, rng_seed=None) -> List[List[int]]:
        if rng_seed is not None:
            random.seed(rng_seed)
        return [self.simulate_student(seq_len) for _ in range(n_students)]

    # --------------------------
    # Forward-backward for HMM
    # --------------------------
    def _obs_prob(self, c, L):
        """P(C=c | L)"""
        if L == 1:
            return (1.0 - self.slip) if c == 1 else self.slip
        else:
            return self.guess if c == 1 else (1.0 - self.guess)

    def forward_backward(self, seq: List[int]):
        """Compute forward and backward messages and posteriors for one sequence.
           Returns alpha, beta, gamma (posterior of L_t=1), xi (expected transitions 0->1 at t).
        """
        T = len(seq)
        # states 0 and 1
        # forward alpha_t(s) = P(C_1..C_t, L_t = s)
        alpha = np.zeros((T, 2))
        # init
        alpha[0, 1] = self.p_init * self._obs_prob(seq[0], 1)
        alpha[0, 0] = (1 - self.p_init) * self._obs_prob(seq[0], 0)
        # scale to avoid underflow
        for t in range(1, T):
            c = seq[t]
            # from previous states:
            # if previous was 1, next is 1 (absorbing)
            # P(L_t=1) = alpha[t-1,1]*1 + alpha[t-1,0]*p_trans
            alpha[t, 1] = (alpha[t-1,1] * 1.0 + alpha[t-1,0] * self.p_trans) * self._obs_prob(c, 1)
            # P(L_t=0) = only possible from previous 0 and no learn
            alpha[t, 0] = (alpha[t-1,0] * (1.0 - self.p_trans)) * self._obs_prob(c, 0)
            # normalization maybe small; keep absolute values for backward correctness
        # backward
        beta = np.zeros((T, 2))
        beta[T-1, :] = 1.0
        for t in range(T-2, -1, -1):
            c_next = seq[t+1]
            # compute contributions to beta[t,s] = sum_{s'} P(L_{t+1}=s'|L_t=s) * P(C_{t+1}|s') * beta[t+1,s']
            # for s=1:
            #   from s=1 -> s'=1 with prob 1.0
            beta[t, 1] = 1.0 * self._obs_prob(c_next, 1) * beta[t+1, 1]
            # for s=0:
            #   s'=1 with p_trans
            #   s'=0 with (1-p_trans)
            beta[t, 0] = (self.p_trans * self._obs_prob(c_next, 1) * beta[t+1, 1] +
                          (1.0 - self.p_trans) * self._obs_prob(c_next, 0) * beta[t+1, 0])
        # gamma (posterior P(L_t = 1 | seq))
        gamma = np.zeros(T)
        for t in range(T):
            numer1 = alpha[t,1] * beta[t,1]
            numer0 = alpha[t,0] * beta[t,0]
            denom = numer0 + numer1
            if denom == 0:
                gamma[t] = 0.0
            else:
                gamma[t] = numer1 / denom
        # xi: expected # transitions from 0->1 at time t (i.e., from L_t=0 to L_{t+1}=1)
        xi = np.zeros(T-1)
        for t in range(T-1):
            c_next = seq[t+1]
            # joint prob proportional to alpha[t,0] * P(L_{t+1}=1|L_t=0)=p_trans * P(C_{t+1}|1) * beta[t+1,1]
            numer = alpha[t,0] * self.p_trans * self._obs_prob(c_next, 1) * beta[t+1,1]
            denom = (alpha[t,0] * self._obs_prob(seq[t],0) * beta[t,0] if False else
                     (alpha[t,0]*beta[t,0] + alpha[t,1]*beta[t,1]))  # not used; compute normalization below
            # instead compute sum over s,s' of alpha[t,s]*P(s->s')*P(obs_{t+1}|s')*beta[t+1,s']
            denom_sum = (
                alpha[t,0] * ( (1.0-self.p_trans) * self._obs_prob(c_next, 0) * beta[t+1,0]
                              + self.p_trans * self._obs_prob(c_next, 1) * beta[t+1,1] )
                + alpha[t,1] * (1.0 * self._obs_prob(c_next, 1) * beta[t+1,1])
            )
            xi[t] = (numer / denom_sum) if denom_sum != 0 else 0.0
        return alpha, beta, gamma, xi

    def fit(self, sequences: List[List[int]], n_iters=15, verbose=False):
        """
        Fit parameters with EM (Baum-Welch) using multiple sequences (list of lists of 0/1).
        """
        for it in range(n_iters):
            # expected counts
            sum_gamma0 = 0.0   # expected times in state 0
            sum_gamma1 = 0.0   # expected times in state 1
            sum_init1 = 0.0
            sum_trans_0_to_1 = 0.0  # expected transitions 0->1
            sum_correct_in_state1 = 0.0
            sum_state1 = 0.0
            sum_correct_in_state0 = 0.0
            sum_state0 = 0.0

            for seq in sequences:
                if len(seq) == 0:
                    continue
                alpha, beta, gamma, xi = self.forward_backward(seq)
                T = len(seq)
                sum_init1 += gamma[0]
                sum_gamma1 += gamma.sum()
                sum_gamma0 += (T - gamma.sum())
                # expected transitions
                sum_trans_0_to_1 += xi.sum()
                # expected observation counts
                # for state1: expected number of times in state1 and response correct
                for t, c in enumerate(seq):
                    if gamma[t] > 0:
                        if c == 1:
                            sum_correct_in_state1 += gamma[t]
                        sum_state1 += gamma[t]
                        sum_state0 += (1.0 - gamma[t])
                        if c == 1:
                            sum_correct_in_state0 += (1.0 - gamma[t])
                    else:
                        # gamma[t]==0 -> contribution to state0 only
                        sum_state0 += 1.0
                        if c == 1:
                            sum_correct_in_state0 += 1.0
            # M-step updates
            new_p_init = (sum_init1 / len(sequences)) if len(sequences) > 0 else self.p_init
            new_p_trans = (sum_trans_0_to_1 / max(1e-8, sum_gamma0))  # expected transitions / expected times in 0
            new_slip = 1.0 - (sum_correct_in_state1 / max(1e-8, sum_state1))
            new_guess = (sum_correct_in_state0 / max(1e-8, sum_state0))
            # clip to (tiny, 1-tiny)
            eps = 1e-6
            self.p_init = float(min(max(new_p_init, eps), 1-eps))
            self.p_trans = float(min(max(new_p_trans, eps), 1-eps))
            self.slip = float(min(max(new_slip, eps), 1-eps))
            self.guess = float(min(max(new_guess, eps), 1-eps))
            if verbose:
                print(f"EM iter {it+1}: p_init={self.p_init:.4f}, p_trans={self.p_trans:.4f}, slip={self.slip:.4f}, guess={self.guess:.4f}")

    def predict_mastery_after_sequence(self, seq: List[int]) -> float:
        """Return P(L_T = 1 | observations seq) â€” posterior mastery after observed seq."""
        if len(seq) == 0:
            return self.p_init
        _, _, gamma, _ = self.forward_backward(seq)
        return float(gamma[-1])

    def predict_next_correct_prob(self, seq: List[int]) -> float:
        """Predict P(next response correct | observed seq)."""
        p_mastery = self.predict_mastery_after_sequence(seq)
        # next-step mastery: if not learned, can learn with p_trans before next observation?
        # Classic BKT assumes learning happens after an opportunity; here we predict on next opportunity:
        # If the learning occurs *between* steps, you might update. We'll assume prediction before next learning:
        p_correct = p_mastery * (1.0 - self.slip) + (1.0 - p_mastery) * self.guess
        return p_correct

In [44]:
from typing import List
import random
import numpy as np

class MultiSkillBKT:
    """
    Multi-skill BKT: each skill has its own BKT model.
    """
    def __init__(self, n_skills: int, p_init=0.1, p_trans=0.1, slip=0.1, guess=0.2):
        # Create one BKTModel per skill
        self.skills = [BKTModel(p_init, p_trans, slip, guess) for _ in range(n_skills)]
        self.n_skills = n_skills

    def simulate_student(self, task_skill_map: List[List[int]], seed=None) -> List[int]:
        """
        Simulate a student sequence of responses for tasks.
        task_skill_map: list of lists, each sublist contains skill indices required by that task.
        """
        if seed is not None:
            random.seed(seed)
        # Initialize mastery state for each skill
        L = [1 if random.random() < sk.p_init else 0 for sk in self.skills]
        obs = []

        for skills_required in task_skill_map:
            # compute probability correct
            prob_correct = 1.0
            for s in skills_required:
                if L[s] == 1:
                    prob_correct *= (1.0 - self.skills[s].slip)
                else:
                    prob_correct *= self.skills[s].guess
            c = 1 if random.random() < prob_correct else 0
            obs.append(c)

            # update skill mastery (absorbing)
            for s in range(self.n_skills):
                if L[s] == 0 and random.random() < self.skills[s].p_trans:
                    L[s] = 1

        return obs


In [49]:
# Task mapping: task -> required skills
# Task 1: skill 0
# Task 2: skill 1
# Task 3: skill 0 and 1
task_skill_map = [[0], [1], [0, 1], [0], [1], [0, 1], [0], [1], [0, 1]]

ms_bkt = MultiSkillBKT(n_skills=2, p_init=0.2, p_trans=0.1, slip=0.1, guess=0.2)
student_obs = ms_bkt.simulate_student(task_skill_map, seed=42)
print(student_obs)


[0, 1, 0, 1, 1, 1, 1, 1, 1]
