Imports

In [None]:
from scipy.sparse import load_npz
import csv
import os
import torch
import numpy as np

from utils import *
from torch.autograd import Variable

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data
import matplotlib.pyplot as plt

Autoencoder Initialization

In [None]:
class AutoEncoder(nn.Module):
    torch.manual_seed(42)
    def __init__(self, num_question, k = 50, j = 75):
        """ Initialize a class AutoEncoder.

        :param num_question: int
        :param k: int
        """
        super(AutoEncoder, self).__init__()

        # Define linear functions.
        self.encoder = nn.Linear(num_question, j)
        self.layer1 = nn.Linear(j, k)
        self.layer2 = nn.Linear(k, j)
        self.dropout = nn.Dropout(p=0.25)
        self.decoder = nn.Linear(j, num_question)

    def get_weight_norm(self):
        """ Return ||W^1||^2 + ||W^2||^2.

        :return: float
        """
        g_w_norm = torch.norm(self.encoder.weight, 2) ** 2
        h_w_norm = torch.norm(self.decoder.weight, 2) ** 2
        return g_w_norm + h_w_norm

    def forward(self, inputs):
        """ Return a forward pass given inputs.

        :param inputs: user vector.
        :return: user vector.
        """
        a = torch.sigmoid(self.encoder(inputs))
        b = F.relu(self.layer1(a))
        c = F.relu(self.layer2(b))
        d = F.relu(self.dropout(c))
        out = torch.sigmoid(self.decoder(d))

        return out

Training Function

In [None]:
def train(model, lr, lamb, train_matrix, zero_train_data, train_data, valid_data, num_epoch):
    """ Train the neural network, where the objective also includes
    a regularizer.

    :param model: Module
    :param lr: float
    :param lamb: float
    :param train_data: 2D FloatTensor
    :param zero_train_data: 2D FloatTensor
    :param valid_data: Dict
    :param num_epoch: int
    :return: None
    """
    norm = model.get_weight_norm()
    norm = norm.detach().numpy()

    # Tell PyTorch you are training the model.
    model.train()

    # Define optimizers and loss function.
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.1)
    num_student = train_matrix.shape[0]

    previous_acc = 0
    patience = 3
    count = 0

    eps = []
    train_accs = []
    val_accs = []
    train_losses = []

    for epoch in range(0, num_epoch):
        train_loss = 0.

        for user_id in range(num_student):
            inputs = Variable(zero_train_data[user_id]).unsqueeze(0)
            target = inputs.clone()

            optimizer.zero_grad()
            output = model(inputs)

            # Mask the target to only compute the gradient of valid entries.
            nan_mask = np.isnan(train_matrix[user_id].unsqueeze(0).numpy())
            target[0][nan_mask] = output[0][nan_mask]

            loss = torch.sum((output - target) ** 2.) + lamb*norm/2
            loss.backward()

            train_loss += loss.item()
            optimizer.step()
        train_acc = evaluate(model,zero_train_data, train_data)
        valid_acc = evaluate(model, zero_train_data, valid_data)
        print("Epoch: {} \tTraining Cost: {:.6f}\t "
              "Valid Acc: {}".format(epoch, train_loss, valid_acc))

        eps.append(epoch)
        train_losses.append(train_loss)
        train_accs.append(train_acc)
        val_accs.append(val_acc)

        # Early stopping
        if valid_acc <= previous_acc:
            count += 1
            if count >= patience:
                break

        elif valid_acc > previous_acc:  #to modify patience to 3 consecutive declining iterations
            count = 0

        previous_acc = valid_acc

          #plotting
    plt.title("Training Loss vs. Epochs")
    plt.plot(eps, train_losses, label="Training Curve")
    plt.xlabel("Epochs")
    plt.ylabel("Training Loss")
    plt.show()

    plt.title("Accuracy vs. Epochs")
    plt.plot(eps, train_accs, label="Training Curve")
    plt.plot(eps, val_accs, label="Validation Curve")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.legend(loc='best')
    plt.show()

Accuracy Evaluation Function

In [None]:
def evaluate(model, train_data, valid_data):
    """ Evaluate the valid_data on the current model.

    :param model: Module
    :param train_data: 2D FloatTensor
    :param valid_data: A dictionary {user_id: list,
    question_id: list, is_correct: list}
    :return: float
    """
    # Tell PyTorch you are evaluating the model.
    model.eval()

    total = 0
    correct = 0

    for i, u in enumerate(valid_data["user_id"]):
        inputs = Variable(train_data[u]).unsqueeze(0)
        output = model(inputs)

        guess = output[0][valid_data["question_id"][i]].item() >= 0.5
        if guess == valid_data["is_correct"][i]:
            correct += 1
        total += 1
    return correct / float(total)

Main Function

In [None]:
def main():
    zero_train_matrix, train_matrix, train_data, valid_data, test_data = load_data()

    # Set model hyperparameters.
    k = 50
    j = 75
    num_questions = zero_train_matrix.shape[1]
    model = AutoEncoder(num_question=num_questions, k=k, j=j)

    # Set optimization hyperparameters.
    lr = 0.01
    num_epoch = 40
    lamb = 0.001

    train(model, lr, lamb, train_matrix, zero_train_matrix, train_data,
          valid_data, num_epoch)
    # train(model, lr, lamb, train_matrix, zero_train_matrix,
    #       test_data, num_epoch)


if __name__ == "__main__":
    main()