In [124]:
import numpy as np
import pickle
from tqdm import tqdm

In [119]:
class CDM(object):
    def __init__(self, *args, **kwargs) -> None:
        pass

    def train(self, *args, **kwargs) -> None:
        raise NotImplementedError

    def eval(self, *args, **kwargs) -> tuple:
        raise NotImplementedError

    def save(self, *args, **kwargs) -> None:
        raise NotImplementedError

    def load(self, *args, **kwargs) -> None:
        raise NotImplementedError

In [120]:
def init_parameters(stu_num, prob_num):
    theta = np.zeros(stu_num)
    slip = np.zeros(prob_num) + 0.2
    guess = np.zeros(prob_num) + 0.2
    return theta, slip, guess


def init_all_knowledge_state(know_num):
    state_num = 2**know_num
    all_states = np.zeros((state_num, know_num))
    for i in range(state_num):
        k, quotient, residue = 1, i // 2, i % 2
        while True:
            all_states[i, know_num - k] = residue
            if quotient <= 0:
                break
            quotient, residue = quotient // 2, quotient % 2
            k += 1
    return all_states

In [121]:
class DINA(CDM):
    def __init__(self, R, Q, skip_value=-1) -> None:
        """DINA model

        :param array R: response matrix
        :param array Q: relation matrix
        :param int skip_value: skip value in response matrix, defaults to -1
        """
        super().__init__()
        self.R = R
        self.Q = Q
        self.stu_num, self.prob_num = R.shape
        _, self.know_num = Q.shape
        self.state_num = 2**self.know_num
        self.skip_value = skip_value
        self.theta, self.slip, self.guess = init_parameters(self.stu_num, self.prob_num)
        self.all_states = init_all_knowledge_state(self.know_num)
        # shape = (state_num, know_num)
        state_prob = np.transpose(
            np.sum(Q, axis=1, keepdims=True) - np.dot(Q, np.transpose(self.all_states))
        )
        # self.state_prob [state_num, prob_num]
        self.eta = 1 - (state_prob > 0)
        # self.eta [state_num, prob_num]

    def train(self, epoch=10, epsilon=None) -> None:
        theta, slip, guess, tmp_R = (
            np.copy(self.theta),
            np.copy(self.slip),
            np.copy(self.guess),
            np.copy(self.R),
        )
        tmp_R[
            np.where(self.R == self.skip_value)[0],
            np.where(self.R == self.skip_value)[1],
        ] = 0
        post = 0
        for iteration in range(epoch):
            post_tmp, slip_tmp, guess_tmp = np.copy(post), np.copy(slip), np.copy(guess)
            # self.eta []
            answer_right = (1 - slip) * self.eta + guess * (1 - self.eta)
            # anser_right [state_num, prob_num]
            answer_right = answer_right[:, None, :].repeat(self.stu_num, 1)
            # anser_right [state_num, stu_num, prob_num]
            log_like = np.log(answer_right + 1e-9) * self.R + np.log(
                1 - answer_right + 1e-9
            ) * (1 - self.R)
            log_like[
                :,
                np.where(self.R == self.skip_value)[0],
                np.where(self.R == self.skip_value)[1],
            ] = 0
            # log_like [state_num, stu_num, prob_num]
            log_like = log_like.sum(axis=-1)
            like = np.exp(log_like)
            # like [state_num, stu_num]
            # like.sum(0) [stu_num]
            post = like / like.sum(0)
            # post [state_num, stu_num], The probability of student in different states
            state_r = post.sum(axis=1, keepdims=True)
            # state_r [state_num, 1], 每个状态的期望
            # tmp_R (stu_num, prob_num)
            except_right = post @ tmp_R
            # except_right [state_num, pro_blem], the expectation of getting the question right in different states
            # self.eta [state_num, pro_blem]
            exceptation_guess = (except_right * (1 - self.eta)).sum(0)
            # exceptation_guess [prob_num],  the expectation of guess right
            exceptation_correct = (except_right * self.eta).sum(0)
            # exceptation_correct [prob_num], the expectation of answer correct
            exceptation_wrong = (state_r * (1 - self.eta)).sum(0)
            # exceptation_wrong [prob_num], the expectation of anser wrong
            exceptation_right = (state_r * self.eta).sum(0)
            # exceptation_right [prob_num], the expectation of anser right
            guess = exceptation_guess / exceptation_wrong
            # guess [prob_num], the expectation of guess right/the expectation of anser wrong
            slip = (exceptation_right - exceptation_correct) / exceptation_right
            # slip [prob_num], the expectation of anser right-the expectation of answer correct/the expectation of anser right
            theta = post.argmax(axis=0)
            # theta [state]
            change = max(
                np.max(np.abs(post - post_tmp)),
                np.max(np.abs(slip - slip_tmp)),
                np.max(np.abs(guess - guess_tmp)),
            )
            if iteration > 20 and change < epsilon:
                break
        self.theta = theta
        self.slip = slip
        self.guess = guess

    def eval(self, test_data) -> tuple:
        pred_score = (1 - self.slip) * self.eta + self.guess * (1 - self.eta)
        test_rmse, test_mae = [], []
        for i in tqdm(test_data, "evaluating"):
            stu, test_id, true_score = i["user_id"], i["item_id"], i["score"]
            test_rmse.append((pred_score[self.theta[stu], test_id] - true_score) ** 2)
            test_mae.append(abs(pred_score[self.theta[stu], test_id] - true_score))
        return np.sqrt(np.average(test_rmse)), np.average(test_mae)

    def save(self, filepath) -> None:
        with open(filepath, "wb") as file:
            pickle.dump(
                {"theta": self.theta, "slip": self.slip, "guess": self.guess}, file
            )
            print("save")

    def load(self, filepath) -> None:
        with open(filepath, "rb") as file:
            self.theta, self.slip, self.guess = pickle.load(file).values()
            print("load")

In [13]:
import numpy as np
import random
import json

train_ratio = 0.8
valid_ratio = 0
# Q matrix
np.savetxt(
    "../../data/math2015/Math1/q.csv",
    np.loadtxt("../../data/math2015/Math1/q.txt", dtype=int),
    delimiter=",",
    fmt="%d",
)

R = (np.loadtxt("../../data/math2015/Math1/data.txt") == 1).astype(float)

stu_num, prob_num = R.shape[0], R.shape[1]
train_logs, valid_logs, test_logs = [], [], []
for stu in range(stu_num):
    stu_logs = []
    for prob in range(prob_num):
        log = {"user_id": int(stu), "item_id": int(prob), "score": R[stu][prob]}
        stu_logs.append(log)
    random.shuffle(stu_logs)
    train_logs += stu_logs[: int(train_ratio * prob_num)]
    valid_logs += stu_logs[
        int(train_ratio * prob_num) : int(train_ratio * prob_num)
        + int(valid_ratio * prob_num)
    ]
    test_logs += stu_logs[int(train_ratio * prob_num) + int(valid_ratio * prob_num) :]

with open("../../data/math2015/Math1/train_data.json", "w", encoding="utf8") as file:
    json.dump(train_logs, file, indent=4, ensure_ascii=False)
with open("../../data/math2015/Math1/valid_data.json", "w", encoding="utf8") as file:
    json.dump(valid_logs, file, indent=4, ensure_ascii=False)
with open("../../data/math2015/Math1/test_data.json", "w", encoding="utf8") as file:
    json.dump(test_logs, file, indent=4, ensure_ascii=False)

print(train_logs[0], test_logs[0])

{'user_id': 0, 'item_id': 2, 'score': 1.0} {'user_id': 0, 'item_id': 14, 'score': 0.0}


In [122]:
import json

Q = np.loadtxt("../../data/math2015/Math1/q.csv", dtype=int, delimiter=",")
prob_num, know_num = Q.shape[0], Q.shape[1]
with open("../../data/math2015/Math1/train_data.json", encoding="utf-8") as file:
    train_set = json.load(file)
stu_num = max([x["user_id"] for x in train_set]) + 1
R = -1 * np.ones(shape=(stu_num, prob_num))
for log in train_set:
    R[log["user_id"], log["item_id"]] = log["score"]

In [123]:
model = DINA(R, Q)
model.train(epoch=2, epsilon=1e-3)

[2043 1979 1575 ...  282  283   41]
