In [1]:
import os
import numpy as np
import torch
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Dataset

### Loading Dataset

In [2]:
data_tf = transforms.Compose([transforms.ToTensor(), transforms.Normalize(0.5, 0.5)])

def mnist_dataset():
    root_dir = "./data"
    os.makedirs(root_dir, exist_ok=True)
    train_dataset = datasets.MNIST(root=root_dir, train=True, transform=data_tf, download=True)
    test_dataset = datasets.MNIST(root=root_dir, train=False, transform=data_tf)
    return train_dataset, test_dataset

In [3]:
# each example contains (batch_size, 1, 28, 28)
train_dataset, test_dataset = mnist_dataset()

In [None]:
class MNISTDataset(Dataset):
    """ Convert class indices to onehot labels """
    def __init__(self, dataset):
        super().__init__()
        self.num_classes = 10
        self.dataset = dataset
    def __len__(self,):
        return len(self.dataset)
    def __getitem__(self, index):
        img, label = self.dataset[index]
        one_hot_label = torch.zeros(self.num_classes)
        one_hot_label[label] = 1.0
        return img[0], one_hot_label # index for channel dimension

### Numpy-based Auto-Differentiation

In [22]:
class Matmul:
    def __init__(self):
        self.mem = {}

    def forward(self, x, W):
        h = np.matmul(x, W)
        self.mem = {"x": x, "W": W}
        return h

    def backward(self, grad_y):
        """
        x: shape(N, d)
        w: shape(d, d')
        grad_y: shape(N, d')
        """
        x = self.mem["x"]
        W = self.mem["W"]

        ####################
        """计算矩阵乘法的对应的梯度"""
        ####################
        grad_x = np.matmul(grad_y, W.T) # shape (N, d)
        grad_W = np.matmul(x.T, grad_y)
        return grad_x, grad_W


class Relu:
    def __init__(self):
        self.mem = {}

    def forward(self, x):
        self.mem["x"] = x
        return np.where(x > 0, x, np.zeros_like(x))

    def backward(self, grad_y):
        """
        grad_y: same shape as x
        """
        ####################
        """计算relu 激活函数对应的梯度"""
        ####################
        return grad_y * np.where(self.mem["x"] > 0, 1.0, 0.0)


class Softmax:
    """
    softmax over last dimention
    """

    def __init__(self):
        self.epsilon = 1e-12
        self.mem = {}

    def forward(self, x):
        """
        x: shape(N, c)
        """
        x_exp = np.exp(x)
        partition = np.sum(x_exp, axis=1, keepdims=True)
        out = x_exp / (partition + self.epsilon)

        self.mem["out"] = out
        self.mem["x_exp"] = x_exp
        return out

    def backward(self, grad_y):
        """
        grad_y: same shape as x
        """
        s = self.mem["out"]
        return s - (grad_y != 0).astype(int)


class Log:
    """
    softmax over last dimention
    """

    def __init__(self):
        self.epsilon = 1e-12
        self.mem = {}

    def forward(self, x):
        """
        x: shape(N, c)
        """
        out = np.log(x + self.epsilon)

        self.mem["x"] = x
        return out

    def backward(self, grad_y):
        """
        grad_y: same shape as x
        """
        x = self.mem["x"]

        return 1.0 / (x + self.epsilon) * grad_y

### Gradient Check

In [23]:
x = np.random.normal(size=[5, 6])
W1 = np.random.normal(size=[6, 5])
W2 = np.random.normal(size=[5, 6])
label_ = np.zeros_like(x)
label_[0, 1]=1.
label_[1, 0]=1
label_[2, 3]=1
label_[3, 5]=1
label_[4, 0]=1


mul_h1 = Matmul()
mul_h2 = Matmul()
relu = Relu()
softmax = Softmax()
log = Log()

h1 = mul_h1.forward(x, W1) # shape(5, 4)
h1_relu = relu.forward(h1)
h2 = mul_h2.forward(h1_relu, W2)
h2_soft = softmax.forward(h2)
h2_log = log.forward(h2_soft)


h2_log_grad = log.backward(label_)
h2_soft_grad = softmax.backward(h2_log_grad)
h2_grad, W2_grad = mul_h2.backward(h2_soft_grad)
h1_relu_grad = relu.backward(h2_grad)
h1_grad, W1_grad = mul_h1.backward(h1_relu_grad)

x, W1, W2, label_ = [torch.tensor(each, requires_grad=True) for each in [x, W1, W2, label_]]
h1 = torch.matmul(x, W1) # (batch, output_size), first linear layer
h1_relu = F.relu(h1)
h2 = torch.matmul(h1_relu, W2)
prob = F.softmax(h2, dim=-1)
prob.retain_grad()
log_prob = torch.log(prob)
loss = torch.sum(log_prob * label_) # label is in form of onehot label (containing class probabilities)
loss.backward()
# check the gradient w.r.t prob
np.all(np.isclose(prob.grad.numpy(), h2_log_grad))

True

### Construct models

In [24]:
class myModel:
    def __init__(self):
        self.W1 = np.random.normal(size=[28 * 28 + 1, 100])
        self.W2 = np.random.normal(size=[100, 10])

        self.mul_h1 = Matmul()
        self.mul_h2 = Matmul()
        self.relu = Relu()
        self.softmax = Softmax()
        self.log = Log()

    def forward(self, x):
        x = x.reshape(-1, 28 * 28)
        bias = np.ones(shape=[x.shape[0], 1])
        x = np.concatenate([x, bias], axis=1)

        self.h1 = self.mul_h1.forward(x, self.W1)
        self.h1_relu = self.relu.forward(self.h1)
        self.h2 = self.mul_h2.forward(self.h1_relu, self.W2)
        self.h2_soft = self.softmax.forward(self.h2)
        self.h2_log = self.log.forward(self.h2_soft)

    def backward(self, label):
        self.h2_log_grad = self.log.backward(-label)
        self.h2_soft_grad = self.softmax.backward(self.h2_log_grad)
        self.h2_grad, self.W2_grad = self.mul_h2.backward(self.h2_soft_grad)
        self.h1_relu_grad = self.relu.backward(self.h2_grad)
        self.h1_grad, self.W1_grad = self.mul_h1.backward(self.h1_relu_grad)


model = myModel()

### Loss functions

In [None]:
def compute_loss(log_prob, labels):
    return np.mean(np.sum(-log_prob * labels, axis=1))


def compute_accuracy(log_prob, labels):
    predictions = np.argmax(log_prob, axis=1)
    truth = np.argmax(labels, axis=1)
    return np.sum(predictions == truth)


def train_one_step(model, x, y):
    model.forward(x)
    model.backward(y)
    model.W1 -= 1e-5 * model.W1_grad
    model.W2 -= 1e-5 * model.W2_grad
    loss = compute_loss(model.h2_log, y)
    accuracy = compute_accuracy(model.h2_log, y)
    return loss, accuracy


def test(model, x, y):
    model.forward(x)
    loss = compute_loss(model.h2_log, y)
    accuracy = compute_accuracy(model.h2_log, y)
    return loss, accuracy

### Actual Training

In [26]:
batch_size = 64
train_loader = DataLoader(MNISTDataset(train_dataset), batch_size=batch_size, shuffle=True)
test_loader = DataLoader(MNISTDataset(test_dataset), batch_size=batch_size, shuffle=True)

In [27]:
def optimize_epoch(train_loader):
    """ Perform training for one epoch """
    losses, accuracies = 0., 0.
    for (imgs, labels) in train_loader:
        loss, accuracy = train_one_step(model, imgs.numpy(), labels.numpy())
        losses += loss
        num_correct = accuracy / imgs.size(0)
        accuracies += num_correct # number of correct predictions
    print(f"mean loss: {losses / len(train_loader):.2f}, mean accuracy: {accuracies / len(train_loader):.2f}")
def validate(test_loader):
    """ Accuracy on test set """
    losses, accuracies = 0., 0.
    for (imgs, labels) in test_loader:
        loss, accuracy = test(model, imgs.numpy(), labels.numpy())
        losses += loss
        num_correct = accuracy / imgs.size(0)
        accuracies += num_correct
    print(f"mean loss: {losses / len(test_loader):.2f}, mean accuracy: {accuracies / len(test_loader):.2f}")

In [33]:
for epoch in range(10):
    print(f"In epoch: {epoch}")
    optimize_epoch(train_loader)
print(f"------ Test ------")
validate(test_loader)

In epoch: 0
mean loss: 3.63, mean accuracy: 0.80
In epoch: 1
mean loss: 3.48, mean accuracy: 0.80
In epoch: 2
mean loss: 3.34, mean accuracy: 0.81
In epoch: 3
mean loss: 3.23, mean accuracy: 0.81
In epoch: 4
mean loss: 3.11, mean accuracy: 0.81
In epoch: 5
mean loss: 3.02, mean accuracy: 0.82
In epoch: 6
mean loss: 2.92, mean accuracy: 0.82
In epoch: 7
mean loss: 2.84, mean accuracy: 0.82
In epoch: 8
mean loss: 2.76, mean accuracy: 0.82
In epoch: 9
mean loss: 2.69, mean accuracy: 0.82
------ Test ------
mean loss: 2.58, mean accuracy: 0.84
