# DKT Model

In [9]:
import logging

import numpy as np
import torch
import tqdm
from torch import nn
from torch.autograd import Variable
from sklearn.metrics import roc_auc_score

from EduKTM import KTM


class Net(nn.Module):
    def __init__(self, num_questions, hidden_size, num_layers):
        super(Net, self).__init__()
        self.hidden_dim = hidden_size
        self.layer_dim = num_layers
        self.rnn = nn.RNN(num_questions * 2, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(self.hidden_dim, num_questions)

    def forward(self, x):
        h0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim))
        out, _ = self.rnn(x, h0)
        res = torch.sigmoid(self.fc(out))
        return res


def process_raw_pred(raw_question_matrix, raw_pred, num_questions: int) -> tuple:
    questions = torch.nonzero(raw_question_matrix)[1:, 1] % num_questions
    length = questions.shape[0]
    pred = raw_pred[: length]
    pred = pred.gather(1, questions.view(-1, 1)).flatten()
    truth = torch.nonzero(raw_question_matrix)[1:, 1] // num_questions
    return pred, truth


class DKT(KTM):
    def __init__(self, num_questions, hidden_size, num_layers):
        super(DKT, self).__init__()
        self.num_questions = num_questions
        self.dkt_model = Net(num_questions, hidden_size, num_layers)

    def train(self, train_data, test_data=None, *, epoch: int, lr=0.002, optimizer_type='adam') -> ...:
        loss_function = nn.BCELoss()
        
        # Select optimizer based on optimizer_type parameter
        if optimizer_type.lower() == 'adam':
            optimizer = torch.optim.Adam(self.dkt_model.parameters(), lr)
        elif optimizer_type.lower() == 'sgd':
            optimizer = torch.optim.SGD(self.dkt_model.parameters(), lr)
        elif optimizer_type.lower() == 'rmsprop':
            optimizer = torch.optim.RMSprop(self.dkt_model.parameters(), lr)
        else:
            # Default to Adam if unknown optimizer is specified
            optimizer = torch.optim.Adam(self.dkt_model.parameters(), lr)

        for e in range(epoch):
            losses = []
            # for batch in tqdm.tqdm(train_data, "Epoch %s" % e):
            for batch in train_data:
                integrated_pred = self.dkt_model(batch)
                batch_size = batch.shape[0]
                loss = torch.Tensor([0.0])
                for student in range(batch_size):
                    pred, truth = process_raw_pred(batch[student], integrated_pred[student], self.num_questions)
                    if pred.shape[0] != 0:
                        loss += loss_function(pred, truth.float())

                # back propagation
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                losses.append(loss.mean().item())
            print("[Epoch %d] LogisticLoss: %.6f" % (e, float(np.mean(losses))))

            if test_data is not None:
                auc = self.eval(test_data)
                print("[Epoch %d] auc: %.6f" % (e, auc))
                
            yield auc
            

    def eval(self, test_data) -> float:
        self.dkt_model.eval()
        y_pred = torch.Tensor([])
        y_truth = torch.Tensor([])
        # for batch in tqdm.tqdm(test_data, "evaluating"):
        for batch in test_data:
            integrated_pred = self.dkt_model(batch)
            batch_size = batch.shape[0]
            for student in range(batch_size):
                pred, truth = process_raw_pred(batch[student], integrated_pred[student], self.num_questions)
                y_pred = torch.cat([y_pred, pred])
                y_truth = torch.cat([y_truth, truth])

        return roc_auc_score(y_truth.detach().numpy(), y_pred.detach().numpy())

    def save(self, filepath):
        torch.save(self.dkt_model.state_dict(), filepath)
        logging.info("save parameters to %s" % filepath)

    def load(self, filepath):
        self.dkt_model.load_state_dict(torch.load(filepath))
        logging.info("load parameters from %s" % filepath)

# Model Training

In [10]:
import torch
import torch.utils.data as Data
import numpy as np
import optuna

In [None]:
NUM_QUESTIONS = 9

def get_train_val_loaders(batch_size, val_split=0.2, shuffle=True, data_percentage=1.0):
    # Load the entire dataset
    data = torch.FloatTensor(np.load('./train_data.npy'))
    
    # Apply data_percentage to reduce dataset size if needed
    if data_percentage < 1.0:
        total_samples = len(data)
        samples_to_keep = int(total_samples * data_percentage)
        if shuffle:
            indices = torch.randperm(total_samples)[:samples_to_keep]
            data = data[indices]
        else:
            data = data[:samples_to_keep]
    
    # Get dataset size and calculate split
    dataset_size = len(data)
    val_size = int(dataset_size * val_split)
    train_size = dataset_size - val_size
    
    # Split the dataset
    if shuffle:
        indices = torch.randperm(dataset_size)
        train_indices = indices[:train_size]
        val_indices = indices[train_size:]
        train_data = data[train_indices]
        val_data = data[val_indices]
    else:
        train_data = data[:train_size]
        val_data = data[train_size:]
    
    # Create data loaders
    train_loader = Data.DataLoader(train_data, batch_size=batch_size, shuffle=shuffle)
    val_loader = Data.DataLoader(val_data, batch_size=batch_size, shuffle=False)
    
    print(f"Training samples: {train_size}, Validation samples: {val_size}")
    
    return train_loader, val_loader

def get_test_data_loader(batch_size, shuffle=False, data_percentage=1.0):
    data = torch.FloatTensor(np.load('./test_data.npy'))
    # Select only a percentage of the data
    if data_percentage < 1.0:
        total_samples = len(data)
        samples_to_keep = int(total_samples * data_percentage)
        if shuffle:
            indices = torch.randperm(total_samples)[:samples_to_keep]
            data = data[indices]
        else:
            data = data[:samples_to_keep]
    
    data_loader = Data.DataLoader(data, batch_size=batch_size, shuffle=shuffle)
    return data_loader

In [12]:
def importance_objective(trial):
    # Define hyperparameters to optimize
    hidden_size = trial.suggest_categorical('hidden_size', [5, 10, 20, 50, 100])
    num_layer = trial.suggest_int('num_layers', 1, 3)
    batch_size = trial.suggest_categorical('batch_size', [16, 32, 64, 128])
    lr = trial.suggest_float('learning_rate', 1e-4, 1e-1, log=True)
    optimizer = trial.suggest_categorical('optimizer', ['adam', 'sgd', 'rmsprop'])
    
    train_subset_loader, val_subset_loader = get_train_val_loaders(batch_size, data_percentage=0.3)
    # test_subset_loader = get_test_data_loader( './test_data.npy', batch_size, False, 0.1)
    
    # Initialize and train model
    dkt_model = DKT(NUM_QUESTIONS, hidden_size, num_layer)
    for epoch, auc in enumerate(dkt_model.train(train_subset_loader, val_subset_loader, epoch=10, lr=lr, optimizer_type=optimizer)):
        trial.report(auc, epoch)
        if trial.should_prune():
            raise optuna.TrialPruned()
    
    # Return the AUC score to be maximized
    return dkt_model.eval(val_subset_loader)

In [None]:
study = optuna.create_study(study_name="importances", storage="sqlite:///studies.db", load_if_exists=True, direction='maximize')
study.optimize(importance_objective, n_trials=30, n_jobs=4)

[I 2025-03-16 20:09:52,292] Using an existing study with name 'importances' instead of creating a new one.


Training samples: 3940, Validation samples: 985
Training samples: 3940, Validation samples: 985Training samples: 3940, Validation samples: 985

Training samples: 3940, Validation samples: 985
[Epoch 0] LogisticLoss: 9.505960
[Epoch 0] LogisticLoss: 9.105800
[Epoch 0] LogisticLoss: 9.088813
[Epoch 0] LogisticLoss: 9.117419


[I 2025-03-16 20:10:00,946] Trial 41 pruned. 
[I 2025-03-16 20:10:01,002] Trial 39 pruned. 
[I 2025-03-16 20:10:01,079] Trial 38 pruned. 
[I 2025-03-16 20:10:01,104] Trial 40 pruned. 


[Epoch 0] auc: 0.764472
[Epoch 0] auc: 0.763816
[Epoch 0] auc: 0.770567
[Epoch 0] auc: 0.772576
Training samples: 3940, Validation samples: 985
Training samples: 3940, Validation samples: 985
Training samples: 3940, Validation samples: 985
Training samples: 3940, Validation samples: 985
[Epoch 0] LogisticLoss: 80.239522


[I 2025-03-16 20:10:08,632] Trial 45 pruned. 


[Epoch 0] auc: 0.765649
[Epoch 0] LogisticLoss: 8.612119
[Epoch 0] LogisticLoss: 8.696094
[Epoch 0] LogisticLoss: 8.629496
[Epoch 0] auc: 0.803859
[Epoch 0] auc: 0.801858
[Epoch 0] auc: 0.798506
[Epoch 1] LogisticLoss: 8.081160
[Epoch 1] LogisticLoss: 8.042658
[Epoch 1] LogisticLoss: 8.030957
[Epoch 1] auc: 0.807604
[Epoch 1] auc: 0.808061
[Epoch 1] auc: 0.812101


In [None]:
print(f"Best parameters: {study.best_params}")
print(f"Best AUC: {study.best_value}")

optuna.visualization.plot_optimization_history(study)
optuna.visualization.plot_param_importances(study)

Best parameters: {'hidden_size': 50, 'num_layers': 3, 'batch_size': 64, 'learning_rate': 0.005026360165040225, 'optimizer': 'adam'}
Best AUC: 0.8248811879218874


In [None]:
# def detailed_objective(trial):
#     # Define hyperparameters to optimize
#     hidden_size = trial.suggest_categorical('hidden_size', [5, 10, 20, 50, 100])
#     num_layer = trial.suggest_int('num_layers', 1, 3)
#     batch_size = trial.suggest_categorical('batch_size', [16, 32, 64, 128])
#     lr = trial.suggest_float('learning_rate', 1e-4, 1e-1, log=True)
    
#     train_loader, val_loader = get_train_val_loaders( './train_data.npy', batch_size, True, 1.0)
#     test_loader = get_test_data_loader( './test_data.npy', batch_size, False, 1.0)
    
#     # Initialize and train model
#     dkt_model = DKT(NUM_QUESTIONS, hidden_size, num_layer)
#     dkt_model.train(train_loader, test_loader, epoch=20, lr=lr)
    
#     # Return the AUC score to be maximized
#     return dkt_model.eval(test_loader)

In [None]:
# detailed_objective(study.best_trial)

In [None]:
# best_params = study.best_params

# train_loader = get_data_loader('./train_data.npy', best_params['batch_size'], True)
# test_loader = get_data_loader('./test_data.npy', best_params['batch_size'], False)

# dkt = DKT(NUM_QUESTIONS, best_params['hidden_size'], best_params['num_layers'])
# dkt.train(train_loader, test_loader, epoch=10, lr=best_params['learning_rate'])
# dkt.save("dkt.params")


In [None]:
# dkt.load("dkt.params")
# auc = dkt.eval(test_loader)
# print("auc: %.6f" % auc)