In [None]:
# adapted and modified from https://www.kaggle.com/code/fareselmenshawii/rnn-from-scratch

In [1]:
import os
import numpy as np
import scipy as sp
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt # for making figures
%matplotlib inline

In [2]:
class DataGenerator:

    def __init__(self, path):
        self.path = path
        
        with open(path) as f:
            data = f.read().lower()
        
        self.chars = list(set(data))
        
        self.char_to_idx = {ch: i for (i, ch) in enumerate(self.chars)}
        self.idx_to_char = {i: ch for (i, ch) in enumerate(self.chars)}
        
        self.vocab_size = len(self.chars)
        
        with open(path) as f:
            examples = f.readlines()
        self.examples = [x.lower().strip() for x in examples]
 
    def generate_example(self, idx):
        
        example_chars = self.examples[idx]
        
        example_char_idx = [self.char_to_idx[char] for char in example_chars]
        
        X = [self.char_to_idx['\n']] + example_char_idx
        Y = example_char_idx + [self.char_to_idx['\n']]
        
        return torch.tensor(X), torch.tensor(Y)

In [3]:
class RNN:

    def __init__(self, hidden_size, data_generator, sequence_length, learning_rate):

        # hyper parameters
        self.hidden_size = hidden_size
        self.data_generator = data_generator
        self.vocab_size = self.data_generator.vocab_size
        self.sequence_length = sequence_length
        self.learning_rate = learning_rate
        self.X = None

        # model parameters
        self.Wax = torch.empty(hidden_size, self.vocab_size)
        torch.nn.init.uniform_(self.Wax, -torch.sqrt(torch.tensor(1. / self.vocab_size)), torch.sqrt(torch.tensor(1. / self.vocab_size)))


        self.Waa = torch.empty(hidden_size, hidden_size)
        torch.nn.init.uniform_(self.Waa, -torch.sqrt(torch.tensor(1. / self.vocab_size)), torch.sqrt(torch.tensor(1. / self.vocab_size)))


        self.Wya = torch.empty(self.vocab_size, hidden_size)
        torch.nn.init.uniform_(self.Waa, -torch.sqrt(torch.tensor(1. / self.vocab_size)), torch.sqrt(torch.tensor(1. / self.vocab_size)))

        self.ba = torch.zeros((hidden_size, 1))  
        self.by = torch.zeros((self.vocab_size, 1))

        # Initialize gradients
        self.dWax, self.dWaa, self.dWya = torch.zeros_like(self.Wax), torch.zeros_like(self.Waa), torch.zeros_like(self.Wya)
        self.dba, self.dby = torch.zeros_like(self.ba), torch.zeros_like(self.by)

        # parameter update with AdamW
        self.mWax = torch.zeros_like(self.Wax)
        self.vWax = torch.zeros_like(self.Wax)
        self.mWaa = torch.zeros_like(self.Waa)
        self.vWaa = torch.zeros_like(self.Waa)
        self.mWya = torch.zeros_like(self.Wya)
        self.vWya = torch.zeros_like(self.Wya)
        self.mba = torch.zeros_like(self.ba)
        self.vba = torch.zeros_like(self.ba)
        self.mby = torch.zeros_like(self.by)
        self.vby = torch.zeros_like(self.by)

    def softmax(self, x):
        x = x - torch.max(x)
        p = torch.exp(x)
        return p / torch.sum(p)

    def forward(self, X, a_prev):
        x, a, y_pred = {}, {}, {}
        self.X = X
        a[-1] = a_prev.clone()
        for t in range(len(self.X)): 
            x[t] = torch.zeros((self.vocab_size,1)) 
            if (self.X[t] != None):
                x[t][self.X[t]] = 1
            a[t] = torch.tanh((self.Wax @ x[t]) + (self.Waa @ a[t - 1]) + self.ba)
            y_pred[t] = F.softmax(self.Wya @ a[t] + self.by, dim=0)
        return x, a, y_pred 

    def backward(self,x, a, y_preds, targets):
        da_next = torch.zeros_like(a[0])
        for t in reversed(range(len(self.X))):
            dy_preds = y_preds[t].clone()
            dy_preds[targets[t]] -= 1
            da = (self.Waa.T @ da_next) + (self.Wya.T @ dy_preds)
            dtanh = (1 - torch.pow(a[t], 2))
            da_unactivated = dtanh * da
            self.dba += da_unactivated
            self.dWax += (da_unactivated @ x[t].T)
            self.dWaa += (da_unactivated @ a[t - 1].T)
            da_next = da_unactivated
            self.dWya += (dy_preds @ a[t].T)
            for grad in [self.dWax, self.dWaa, self.dWya, self.dba, self.dby]:
                torch.clamp(grad, -1, 1, out=grad)
 
    def loss(self, y_preds, targets):
        return -torch.sum(torch.stack([torch.log(torch.clamp(y_preds[t][targets[t], 0], min=1e-8)) for t in range(len(self.X))]))


    
    def adamw(self, beta1=0.9, beta2=0.999, epsilon=1e-8, L2_reg=1e-4):

        # AdamW update for Wax
        self.mWax = beta1 * self.mWax + (1 - beta1) * self.dWax
        self.vWax = beta2 * self.vWax + (1 - beta2) * torch.square(self.dWax)
        m_hat = self.mWax / (1 - beta1)
        v_hat = self.vWax / (1 - beta2)
        self.Wax -= self.learning_rate * (m_hat / (torch.sqrt(v_hat) + epsilon) + L2_reg * self.Wax)

        # AdamW update for Waa
        self.mWaa = beta1 * self.mWaa + (1 - beta1) * self.dWaa
        self.vWaa = beta2 * self.vWaa + (1 - beta2) * torch.square(self.dWaa)
        m_hat = self.mWaa / (1 - beta1)
        v_hat = self.vWaa / (1 - beta2)
        self.Waa -= self.learning_rate * (m_hat /  (torch.sqrt(v_hat) + epsilon) + L2_reg * self.Waa)

        # AdamW update for Wya
        self.mWya = beta1 * self.mWya + (1 - beta1) * self.dWya
        self.vWya = beta2 * self.vWya + (1 - beta2) * torch.square(self.dWya)
        m_hat = self.mWya / (1 - beta1)
        v_hat = self.vWya / (1 - beta2)
        self.Wya -= self.learning_rate * (m_hat /  (torch.sqrt(v_hat) + epsilon) + L2_reg * self.Wya)

        # AdamW update for ba
        self.mba = beta1 * self.mba + (1 - beta1) * self.dba
        self.vba = beta2 * self.vba + (1 - beta2) * torch.square(self.dba)
        m_hat = self.mba / (1 - beta1)
        v_hat = self.vba / (1 - beta2)
        self.ba -= self.learning_rate * (m_hat /  (torch.sqrt(v_hat) + epsilon) + L2_reg * self.ba)

        # AdamW update for by
        self.mby = beta1 * self.mby + (1 - beta1) * self.dby
        self.vby = beta2 * self.vby + (1 - beta2) * torch.square(self.dby)
        # self.by -= self.learning_rate * (self.mby / (torch.sqrt(self.vby) + epsilon) + L2_reg * self.by)
    
    def sample(self):
        x = torch.zeros((self.vocab_size, 1))
        a_prev = torch.zeros((self.hidden_size, 1))
        indices = []
        idx = -1
        counter = 0
        max_chars = 50 # maximum number of characters to generate
        newline_character = self.data_generator.char_to_idx['\n'] # the newline character
        while (idx != newline_character and counter != max_chars):
            # compute the hidden state
            a = torch.tanh((self.Wax @ x) + (self.Waa @ a_prev) + self.ba)

            # compute the output probabilities
            y= F.softmax((self.Wya @ a) + self.by, dim=0)

            # Sample an index using multinomial distribution
            idx = torch.multinomial(y.squeeze(), num_samples=1).item()

            # set the input for the next time step
            x = torch.zeros((self.vocab_size, 1))
            x[idx] = 1

            # store the sampled character index in the list
            indices.append(idx)

            # update the previous hidden state
            a_prev = a

            # increment the counter
            counter += 1

        # return the list of sampled character indices
        return indices

        
    def train(self, generated_names=5):

        iter_num = 0
        threshold = 5 # stopping criterion for training
        smooth_loss =  -torch.log(torch.tensor(1.0) / self.data_generator.vocab_size) * self.sequence_length  # initialize loss

        # print('1. smooth: ',smooth_loss, ', threshold: ', threshold)
        while (smooth_loss > threshold):
            # print('2. smooth: ',smooth_loss, ', threshold: ', threshold)
            a_prev = torch.zeros((self.hidden_size, 1))
            idx = iter_num % self.vocab_size
            # get a batch of inputs and targets
            inputs, targets = self.data_generator.generate_example(idx)

            # forward pass
            x, a, y_pred  = self.forward(inputs, a_prev)

            # backward pass
            self.backward(x, a, y_pred, targets)

            # calculate and update loss
            loss = self.loss(y_pred, targets)
            self.adamw()
            smooth_loss = smooth_loss * 0.999 + loss * 0.001

            # update previous hidden state for the next batch
            a_prev = a[len(self.X) - 1]
            # print progress every 500 iterations
            if iter_num % 500 == 0:
                print("\n\niter :%d, loss:%f\n" % (iter_num, smooth_loss))
                for i in range(generated_names):
                    sample_idx = self.sample()
                    txt = ''.join(self.data_generator.idx_to_char[idx] for idx in sample_idx)
                    txt = txt.title()  # capitalize first character 
                    print ('%s' % (txt, ), end='')
            iter_num += 1
            # print('3. smooth: ',smooth_loss, ', threshold: ', threshold)
    
    def predict(self, start):
        # Initialize input vector and previous hidden state
        x = torch.zeros((self.vocab_size, 1))
        a_prev = torch.zeros((self.hidden_size, 1))

        # Convert start sequence to indices
        chars = [ch for ch in start]
        idxes = []
        for i in range(len(chars)):
            idx = self.data_generator.char_to_idx[chars[i]]
            x[idx] = 1
            idxes.append(idx)

        # Generate sequence
        max_chars = 50  # maximum number of characters to generate
        newline_character = self.data_generator.char_to_idx['\n']  # the newline character
        counter = 0
        while (idx != newline_character and counter != max_chars):
            # Compute next hidden state and predicted character
            a = torch.tanh((self.Wax @ x) + (self.Waa @ a_prev) + self.ba)
            # Compute probabilities
            y_pred = F.softmax((self.Wya @ a) + self.by, dim=0)

            # Sample an index using multinomial distribution
            idx = torch.multinomial(y_pred.squeeze(), num_samples=1).item()

            # Update input vector, previous hidden state, and indices
            x = torch.zeros((self.vocab_size, 1))
            x[idx] = 1
            a_prev = a
            idxes.append(idx)
            counter += 1

        # Convert indices to characters and concatenate into a string
        txt = ''.join(self.data_generator.idx_to_char[i] for i in idxes)

        # Remove newline character if it exists at the end of the generated sequence
        if txt[-1] == '\n':
            txt = txt[:-1]

        return txt

In [4]:
data_generator = DataGenerator('names.txt')
rnn = RNN(hidden_size=200,data_generator=data_generator, sequence_length=25, learning_rate=1e-3)
rnn.train()



iter :0, loss:82.330002

Wxsvppfvqrayromvqysvmbnaszysk
Rwoplqchlwjjxwdl
Qpaiazqpeziuamk
Jbxakvr
Ynsrsxhbztlfbdcxqxaomxuh


iter :500, loss:56.124008

Illaot
Miaoa
Lino
Er
Tloae


iter :1000, loss:39.146660

Miryineh
Elaraoe
Oia
Diee
Avioort


iter :1500, loss:28.470366

Ludaayh
Vinoaelh
Pecl
Char
Rpl


iter :2000, loss:21.730610

Bill
Ulaeitl
Yiil
Melloie
Olleri


iter :2500, loss:17.291380

Caaae
Ml
Laaall
Naa
Star


iter :3000, loss:14.300630

Elig
Vinl
Virl
Lu
Olarai


iter :3500, loss:12.255441

Eleberla
Fegl
Peg
Veaali
Avatl


iter :4000, loss:10.799430

Eliior
Peaal
Hiayi
Pea
Abery


iter :4500, loss:9.701606

Hare
Rlaraa
Mia
Ziayi
Scarya


iter :5000, loss:8.874021

Mia
Abe
Ha
Via
Mia


iter :5500, loss:8.258652

Laa
Olarla
Mia
Aberla
Mily


iter :6000, loss:7.698971

Scaria
Hartort
Abirla
Viia
Hapilepe


iter :6500, loss:7.260252

Isarlet
Giatori
Abe
Pelre
Ele


iter :7000, loss:6.899681

Pea
Beia
Abebia
Abi
Mia


iter :7500, loss:6.571926

Haria
Laa
Miva
Ria
Mia


iter :8000

In [16]:
rnn.predict("putra")

'putrahe'

In [20]:
rnn.predict("z")

'za'