In [1]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import numpy as np

seq_length = 28
input_size = 28
hidden_size = 128
num_layers = 1
num_classes = 10
batch_size = 1
num_epochs = 2
learning_rate = 0.01

data_path = "../../data/mnist"
train_dataset = torchvision.datasets.MNIST(root=data_path, train=True, transform=transforms.ToTensor(), download=True)
test_dataset = torchvision.datasets.MNIST(root=data_path, train=False, transform=transforms.ToTensor())

train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

In [2]:
def xavier_init(c1, c2, w=1, h=1, fc=False):
    fan_1 = c2 * w * h
    fan_2 = c1 * w * h
    ratio = np.sqrt(6.0 / (fan_1 + fan_2))
    params = ratio * (2 * np.random.random((c1, c2, w, h)) - 1)
    if fc:
        params = params.reshape(c1, c2)
    return params

class My_RNN(object):
    def __init__(self, input_size, hidden_size, num_classes):
        self.lr = learning_rate
        self.seq_length = seq_length
        self.hidden_size = hidden_size
        self.U = xavier_init(hidden_size, input_size, fc=True) # rnn input parameters
        self.W = xavier_init(hidden_size, hidden_size, fc=True) # rnn hidden parameters
        self.V = xavier_init(hidden_size, hidden_size, fc=True) # rnn output parameters

        self.b = np.zeros((hidden_size, 1)) # rnn input parameters
        self.c = np.zeros((hidden_size, 1)) # rnn output parameters
        
        self.FC_W = xavier_init(num_classes, hidden_size, fc=True) # fc parameters
        self.fc_b = np.zeros((num_classes, 1)) # fc parameters
        
        self.mU = np.zeros_like(self.U)
        self.mW = np.zeros_like(self.W)
        self.mV = np.zeros_like(self.V)
        self.mb = np.zeros_like(self.b)
        self.mc = np.zeros_like(self.c)
        
        self.mFC_W = np.zeros_like(self.FC_W)
        self.mfc_b = np.zeros_like(self.fc_b)
        
        self.X = {}
        self.A = {}
        self.S = {}
        self.O = {}
        self.FC_O = {}
        
    def forward(self, x, hprev):
        self.S[-1] = np.copy(hprev)
        
        for t in range(self.seq_length):
            self.X[t] = x[t].T
            self.A[t] = self.U @ self.X[t] + self.W @ self.S[t - 1] + self.b
            self.S[t] = np.tanh(self.A[t])
            self.O[t] = self.V @ self.S[t] + self.c # (hidden, hidden) @ (hidden, 1) + (hidden, 1)
        
        self.FC_O = self.FC_W @ self.O[self.seq_length - 1] + self.fc_b # (classes, hidden) @ (hidden, 1) + (classes, 1)
        
        return self.FC_O # (classes, 1)
    
    def backward(self, dY): # (classes, 1)
        # zero grad
        dFC_W = np.zeros_like(self.FC_W)
        dfc_b = np.zeros_like(self.fc_b)
        
        dU, dW, dV = np.zeros_like(self.U), np.zeros_like(self.W), np.zeros_like(self.V)
        db, dc = np.zeros_like(self.b), np.zeros_like(self.c)
        dS_next = np.zeros_like(self.S[0])
        
        dFC_W = dY @ self.O[self.seq_length - 1].T # (classes, 1) @ (1, hidden)
        dfc_b = dY # (classes, 1)
        dO = self.FC_W.T @ dY
        
        dV = dO @ self.S[self.seq_length - 1].T
        dc = dO
        
        for t in reversed(range(self.seq_length)):
            dS = self.V.T @ dO + dS_next
            dA = (1 - self.S[t] ** 2) * dS
            dU += dA @ self.X[t].T
            dW += dA @ self.S[t - 1].T
            db += dA
            dS_next = self.W.T @ dA
            
        return [dU, dW, dV, db, dc, dFC_W, dfc_b]
        
    def optimizer_step(self, gradients):
        for dparam in gradients:
            np.clip(dparam, -5, 5, out=dparam)
            
        for param, dparam, mem in zip([self.U, self.W, self.V, self.b, self.c, self.FC_W, self.fc_b], 
                                      gradients,
                                      [self.mU, self.mW, self.mV, self.mb, self.mc, self.mFC_W, self.mfc_b]):
            mem += dparam * dparam
            param += -self.lr * dparam / np.sqrt(mem + 1e-8)
        
    def cross_entropy_loss(self, outputs, labels):
        Y = self.softmax(outputs)
        loss = -np.log(Y) * self.one_hot_vector(Y, labels)
        return Y, loss
    
    def softmax(self, x):
        e = np.exp(x)
        return e / np.sum(e)
    
    def deriv_softmax(self, Y, labels):
        dY = np.copy(Y)
        for i in range(len(labels)):
            dY[labels[i]][i] -= 1
        return dY
    
    def one_hot_vector(self, Y, labels):
        out = np.zeros_like(Y)
        for i in range(len(labels)):
            out[labels[i]][i] = 1
        return out
    
    def predict(self, outputs):
        return np.argmax(self.softmax(outputs), 0)

In [3]:
model = My_RNN(input_size, hidden_size, num_classes)

total_step = len(train_loader)
iter_loss = 0
interval = 10000
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        images = images.reshape(seq_length, batch_size, input_size).detach().numpy()
        labels = labels.detach().numpy()
        hprev = np.zeros((hidden_size, 1))
        outputs = model.forward(images, hprev)
        Y, loss = model.cross_entropy_loss(outputs, labels)
        gradients = model.backward(model.deriv_softmax(Y, labels))
        model.optimizer_step(gradients)
        iter_loss += np.sum(loss)
        
        if (i + 1) % interval == 0:
            print("epoch {}/{} iter {}/{} loss {:.4f}".format(epoch + 1, num_epochs, i + 1, total_step, iter_loss / interval))
            iter_loss = 0

epoch 1/2 iter 10000/60000 loss 1.2413
epoch 1/2 iter 20000/60000 loss 0.7124
epoch 1/2 iter 30000/60000 loss 0.5973
epoch 1/2 iter 40000/60000 loss 0.5421
epoch 1/2 iter 50000/60000 loss 0.5027
epoch 1/2 iter 60000/60000 loss 0.4773
epoch 2/2 iter 10000/60000 loss 0.4527
epoch 2/2 iter 20000/60000 loss 0.4353
epoch 2/2 iter 30000/60000 loss 0.4282
epoch 2/2 iter 40000/60000 loss 0.4108
epoch 2/2 iter 50000/60000 loss 0.3977
epoch 2/2 iter 60000/60000 loss 0.4112


In [4]:
correct = 0
total = 0
for images, labels in test_loader:
    images = images.reshape(seq_length, batch_size, input_size).detach().numpy()
    labels = labels.detach().numpy()
    
    hprev = np.zeros((hidden_size, 1))
    outputs = model.forward(images, hprev)
    pred = model.predict(outputs)
    total += labels.shape[0]
    correct += (pred == labels).sum().item()

print('Test Accuracy of the model on the 10000 test images: {} %'.format(100 * correct / total)) 

Test Accuracy of the model on the 10000 test images: 88.53 %


In [5]:
class My_LSTM(object):
    def __init__(self, x_size, hidden_size, num_classes):
        self.lr = learning_rate
        self.seq_length = seq_length
        self.input_size = x_size + hidden_size
        
        self.W_f = xavier_init(hidden_size, self.input_size, fc=True)
        self.b_f = np.zeros((hidden_size, 1))
        
        self.W_i = xavier_init(hidden_size, self.input_size, fc=True)
        self.b_i = np.zeros((hidden_size, 1))
        
        self.W_g = xavier_init(hidden_size, self.input_size, fc=True)
        self.b_g = np.zeros((hidden_size, 1))
        
        self.W_o = xavier_init(hidden_size, self.input_size, fc=True)
        self.b_o = np.zeros((hidden_size, 1))
        
        self.W_fc = xavier_init(num_classes, hidden_size, fc=True)
        self.b_fc = np.zeros((num_classes, 1))
        
        self.mW_f = np.zeros_like(self.W_f)
        self.mb_f = np.zeros_like(self.b_f)
        
        self.mW_i = np.zeros_like(self.W_i)
        self.mb_i = np.zeros_like(self.b_i)
        
        self.mW_g = np.zeros_like(self.W_g)
        self.mb_g = np.zeros_like(self.b_g)
        
        self.mW_o = np.zeros_like(self.W_o)
        self.mb_o = np.zeros_like(self.b_o)
        
        self.mW_fc = np.zeros_like(self.W_fc)
        self.mb_fc = np.zeros_like(self.b_fc)
        
        self.X = {}
        self.F = {}
        self.F_A = {}
        
        self.I = {}
        self.I_A = {}
        
        self.G = {}
        self.G_A = {}
        
        self.O = {}
        self.O_A = {}
        
        self.C = {}
        self.C_A = {}
        self.H = {}
        
    def forward(self, x, hprev, cprev):
        self.X = {}
        self.F = {}
        self.F_A = {}
        
        self.I = {}
        self.I_A = {}
        
        self.G = {}
        self.G_A = {}
        
        self.O = {}
        self.O_A = {}
        
        self.C = {}
        self.C_A = {}
        self.H = {}
        
        self.H[-1] = np.copy(hprev)
        self.C[-1] = np.copy(cprev)
        
        for t in range(self.seq_length):
            self.X[t] = np.concatenate((self.H[t-1], x[t].T), axis = 0)
            
            self.F[t] = self.W_f @ self.X[t] + self.b_f
            self.F_A[t] = self.sigmoid(self.F[t])
            
            self.I[t] = self.W_i @ self.X[t] + self.b_i
            self.I_A[t] = self.sigmoid(self.I[t])
            
            self.G[t] = self.W_g @ self.X[t] + self.b_g
            self.G_A[t] = np.tanh(self.G[t])
            
            self.C[t] = self.F_A[t] * self.C[t - 1] + self.I_A[t] * self.G_A[t]
            self.C_A[t] = np.tanh(self.C[t])
            
            self.O[t] = self.W_o @ self.X[t] + self.b_o
            self.O_A[t] = self.sigmoid(self.O[t])
            
            self.H[t] = self.O_A[t] * self.C_A[t]
            
        output = self.W_fc @ self.H[self.seq_length - 1] + self.b_fc
        
        return output
    
    def backward(self, dY):
        dW_f, db_f = np.zeros_like(self.W_f), np.zeros_like(self.b_f)
        dW_i, db_i = np.zeros_like(self.W_i), np.zeros_like(self.b_i)
        dW_g, db_g = np.zeros_like(self.W_g), np.zeros_like(self.b_g)
        dW_o, db_o = np.zeros_like(self.W_o), np.zeros_like(self.b_o)
        dW_fc, db_fc = np.zeros_like(self.W_fc), np.zeros_like(self.b_fc)
        
        dH_next = np.zeros_like(self.H[0])
        dC_next = np.zeros_like(self.C[0])
        
        dW_fc = dY @ self.H[self.seq_length - 1].T
        db_fc = dY
        
        for t in reversed(range(self.seq_length)):
            dh = self.W_fc.T @ dY + dH_next
            
            dO_A = dh * self.C_A[t]
            dO = dO_A * (self.O_A[t] * (1 - self.O_A[t]))
            dW_o += dO @ self.X[t].T
            db_o += dO
            
            dC_A = self.O_A[t] * dh
            dC = dC_A * (1 - self.C_A[t] ** 2) + dC_next
            
            dF_A = dC * self.C[t - 1]
            dI_A = dC * self.G_A[t]
            dG_A = self.I_A[t] * dC
            dC_next = self.F_A[t] * dC
            
            dF = dF_A * (self.F_A[t] * (1 - self.F_A[t]))
            dW_f += dF @ self.X[t].T
            db_f += dF
            
            dI = dI_A * (self.I_A[t] * (1 - self.I_A[t]))
            dW_i += dI @ self.X[t].T
            db_i += dI
            
            dG = dG_A * (1 - self.G_A[t] ** 2)
            dW_g += dG @ self.X[t].T
            db_g += dG
            
            dX = self.W_f.T @ dF + self.W_i.T @ dI + self.W_g.T @ dG + self.W_o.T @ dO
            dH_next = dX[:hidden_size, :]
        
        gradients = [dW_f, db_f, dW_i, db_i, dW_g, db_g, dW_o, db_o, dW_fc, db_fc]
        
        return gradients
    
    def optimizer_step(self, gradients):
        for dparam in gradients:
            np.clip(dparam, -5, 5, out=dparam)
        
        for param, dparam, mem in zip(
            [self.W_f, self.b_f, self.W_i, self.b_i, self.W_g, self.b_g, self.W_o, self.b_o, self.W_fc, self.b_fc],
            gradients,
            [self.mW_f, self.mb_f, self.mW_i, self.mb_i, self.mW_g, self.mb_g, self.mW_o, self.mb_o, self.mW_fc, self.mb_fc]):
            mem += dparam * dparam
            param += -self.lr * dparam / np.sqrt(mem + 1e-8)
            
    def cross_entropy_loss(self, outputs, labels):
        Y = self.softmax(outputs)
        loss = -np.log(Y) * self.one_hot_vector(Y, labels)
        return Y, loss
    
    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))
    
    def softmax(self, x):
        e = np.exp(x)
        return e / np.sum(e)
    
    def deriv_softmax(self, Y, labels):
        dY = np.copy(Y)
        for i in range(len(labels)):
            dY[labels[i]][i] -= 1
        return dY
    
    def one_hot_vector(self, Y, labels):
        out = np.zeros_like(Y)
        for i in range(len(labels)):
            out[labels[i]][i] = 1
        return out
    
    def predict(self, outputs):
        return np.argmax(self.softmax(outputs), 0)

In [6]:
model = My_LSTM(input_size, hidden_size, num_classes)

total_step = len(train_loader)
iter_loss = 0
interval = 10000
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        images = images.reshape(seq_length, batch_size, input_size).detach().numpy()
        labels = labels.detach().numpy()
        
        hprev = np.zeros((hidden_size, 1))
        cprev = np.zeros((hidden_size, 1))
        outputs = model.forward(images, hprev, cprev)
        Y, loss = model.cross_entropy_loss(outputs, labels)
        gradients = model.backward(model.deriv_softmax(Y, labels))
        model.optimizer_step(gradients)
        iter_loss += np.sum(loss)
        if(i + 1) % interval == 0:
            print("epoch {}/{} iter {}/{} loss {:.4f}".format(epoch + 1, num_epochs, i + 1, total_step, iter_loss / interval))
            iter_loss = 0

epoch 1/2 iter 10000/60000 loss 0.8096
epoch 1/2 iter 20000/60000 loss 0.3663
epoch 1/2 iter 30000/60000 loss 0.3320
epoch 1/2 iter 40000/60000 loss 0.2763
epoch 1/2 iter 50000/60000 loss 0.2308
epoch 1/2 iter 60000/60000 loss 0.2385
epoch 2/2 iter 10000/60000 loss 0.2096
epoch 2/2 iter 20000/60000 loss 0.2044
epoch 2/2 iter 30000/60000 loss 0.1896
epoch 2/2 iter 40000/60000 loss 0.1874
epoch 2/2 iter 50000/60000 loss 0.1692
epoch 2/2 iter 60000/60000 loss 0.1658


In [7]:
correct = 0
total = 0
for images, labels in test_loader:
    images = images.reshape(seq_length, batch_size, input_size).detach().numpy()
    labels = labels.detach().numpy()
    
    hprev = np.zeros((hidden_size, 1))
    cprev = np.zeros((hidden_size, 1))
    outputs = model.forward(images, hprev, cprev)
    pred = model.predict(outputs)
    total += labels.shape[0]
    correct += (pred == labels).sum().item()

print('Test Accuracy of the model on the 10000 test images: {} %'.format(100 * correct / total)) 

Test Accuracy of the model on the 10000 test images: 94.93 %
