In [15]:
import numpy as np
import os
import regex as re
# import requests
import torch
from torch import nn

In [4]:
def one_hot_encode(x, vocab_size):
    """
    Taking x of shape (B, S) where B is the batch size and S is the sequence length
    and return a one-hot encoded version of x of shape (B, S, Vocab_size/features)
    """
    batch_size = np.array(x).shape[0]
    seq_len = np.array(x).shape[1]
    big_list = np.zeros((batch_size, seq_len, vocab_size))
    
    for batch in range(batch_size):
        big_list[batch, np.arange(seq_len), x[batch]] = 1
    return big_list

In [5]:
# download the tiny shakespeare dataset

# if not os.path.exists(input_file_path):
#     data_url = 'https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt'
#     with open(input_file_path, 'w', encoding='utf-8') as f:
#         f.write(requests.get(data_url).text)

input_file_path = os.path.join(os.path.dirname("../data/"), 'input.txt')

with open(input_file_path, 'r', encoding='utf-8') as f:
    data = f.read()
n = len(data)

In [6]:
# Split data into train, val folds
train_data = data[:int(n*0.9)]
val_data = data[int(n*0.9):]

In [7]:
def cross_entropy_loss(y, t):
    return -np.sum(np.log())

In [84]:
class DataLoader(object):
    def __init__(self, data, sequence_length, batch_size):
        """
        Initializes the DataLoader with the given data, sequence length, and batch size.
        
        Args:
            data (str): The input text data.
            sequence_length (int): The length of each sequence.
            batch_size (int): The size of each batch.
        """
        self.text = data
        
        self.sequence_length = sequence_length
        self.batch_size = batch_size
        self.counter = 0
        
        # Getting list of words, from preprocessed text
        self.word_list = self._text_preprocessing()
        
        # Unique words
        self.unique_words = set(self.word_list)
        
        # Adding unknown token for exceptions
        self.unique_words.add('<UNK>')
        
        # Create a vocabulary with '<UNK>' first
        self.word_to_ix = {'<UNK>': 0}
        self.word_to_ix.update({tok: idx + 1 for idx, tok in enumerate(self.unique_words - {'<UNK>'})})
        
        # Create the inverse mapping
        self.ix_to_word = {idx: tok for tok, idx in self.word_to_ix.items()}
        
        self.vocab_size = len(self.word_to_ix)
        
    def _text_preprocessing(self):
        """
        Preprocesses the text by converting to lowercase, removing numbers, punctuation, and special tokens.
        
        Returns:
            str: The preprocessed text.
        """
        self.text = self.text.lower()
        self.text = re.sub(r'\d+', '', self.text)  # Remove numbers
        self.text = re.sub(r"[^\w\s']+", '', self.text)  # Remove punctuation ( except apostrophes ) 
        self.text = re.sub(r'\W+', ' ', self.text)  # Remove special tokens
        self.text = self.text.split(' ')
        return self.text
    
    def encode(self, words):
        """
        Encodes the given words into their corresponding indices using the vocabulary.
        
        Args:
            words (str): The words to encode.
        
        Returns:
            list: A list of indices representing the encoded words.
        """
        # For strings
        # return [self.word_to_ix.get(word, self.word_to_ix['<UNK>']) for word in words.split(' ')]
        # For list of strings
        return [self.word_to_ix.get(word, self.word_to_ix['<UNK>']) for word in words]
    
    def decode(self, indexes):
        """
        Decodes the given indices into their corresponding words using the vocabulary.
        
        Args:
            indexes (list): The indices to decode.
        
        Returns:
            list: A list of words corresponding to the indices.
        """
        return [self.ix_to_word.get(idx, '<UNK>') for idx in indexes]
    
    def next_batch(self):
        """
        Iterate to the next batch in text.
        
        Returns:
            batches (list): List of (input_sequence, target_sequence) pairs for training.
            shape: (batch_size, sequence_length, 2)
            None: If there are no more batches left.
        """
        target_offset = 1 # Offset inside the function
        num_batches = int(len(self.word_list) / self.batch_size)
        batch_sequence = []
        if self.counter <= (num_batches * self.batch_size):
            for i in range(0, self.batch_size):
                # Ensure we don't exceed the list length by taking the minimum of the desired end index and the list length.
                # For example, if the desired end is 1002 but text length is 1000, we take 1000, even if the batch isn't full.
                input_sequence = self.word_list[self.counter + i : min(len(self.word_list), self.counter + i + self.sequence_length)]
                target_sequence = self.word_list[(self.counter + i) + target_offset : min(len(self.word_list), (self.counter + i) + self.sequence_length + target_offset)]
                
                # Encode the input and target sequences into their corresponding numerical representations.
                input_sequence = self.encode(input_sequence)
                target_sequence = self.encode(target_sequence)
                
                batch_sequence.append([input_sequence, target_sequence])
                
            # One-hot encoding
            batch_sequence = np.array(batch_sequence)
            input  = one_hot_encode(batch_sequence[:, 0, :], self.vocab_size)
            target = batch_sequence[:, 1, :]
            # Increasing the counter by the batch size
            self.counter += self.batch_size
            return input, target
        else:
            return None
    
    def drop_counter(self):
        """
        Drops the counter to zero.
        """
        self.counter = 0

In [123]:
# Using "beautiful" numbers for hyperparameters (x^2)
dl = DataLoader(data=train_data, sequence_length=20, batch_size=16)
a, b = dl.next_batch()

In [124]:
a.shape, b.shape

((16, 20, 11411), (16, 20))

In [130]:
class RNN(object):
    def __init__(self, input_size, hidden_size, num_layers=1, nonlinearity='tanh', bias=True):
        # Data Loader instance
        # self.dataloader = dataloader
        
        self.num_layers = num_layers
        self.input_size = input_size
        self.hidden_size = hidden_size
        # self.sequence_length = sequence_length 
        
        
        # Weights initialization
        self.U = nn.Parameter(torch.randn(self.num_layers, self.hidden_size, self.input_size, dtype=torch.float32))  # input weights  
        self.W = nn.Parameter(torch.randn(self.num_layers, self.hidden_size, self.hidden_size, dtype=torch.float32)) # hidden weights
        self.V = nn.Parameter(torch.randn(self.input_size, self.hidden_size, dtype=torch.float32))                   # output weights
        self.b = nn.Parameter(torch.randn(self.hidden_size, ))      # Input bias 
        self.c = nn.Parameter(torch.randn(self.hidden_size, ))      # Hidden bias
    
    def forward(self, X, h_0=None):
        """
        RNN forward propagation.
    
        :return: current_state: Hidden state from current layer.
        """
        
        if h_0 is None:
            h_0 = torch.zeros((self.num_layers, X.shape[1], self.hidden_size))
        h_prev = h_0
        h_t = h_0
        # Intermediate "outputs", storage of our hidden states
        output = []
        for t in range(X.shape[0]): # Sequence is our time stamp
            # In this loop, we take every element of the sequence and do RNN forward path
            # After each element, we store our hidden state, and then pass it to the next element
            for layer in range(self.num_layers):
                Wx = X[t] @ self.U[layer].T             # (L, N, H) - Sequence length, batch size, hidden size
                Wh = h_prev[layer] @ self.W[layer].T    # (D, N, H) - Layers num, batch size, hidden size
                at = Wx + Wh                            # (L, N, H) - Sequence length, batch size, hidden size
                h_t[layer] = torch.tanh(at)             # (L, N, H) - Sequence length, batch size, hidden size
            # Append intermediate "output" to storage
            output.append(h_t[-1])
            h_prev = h_t
        output = torch.stack(output)
        return output, h_t
    
    def train(self, dataloader, criterion, epoch=10):
        for i in range(epoch):
            X, y = dataloader.next_batch()
            X, y = torch.tensor(X, dtype=torch.float32).permute(1, 0, 2), torch.tensor(y, dtype=torch.float32).permute(1, 0) # L, N, Hin
            h_output, h_t = self.forward(X)
            output = torch.matmul(h_output, self.V.T)
            output = output.view(-1, self.input_size)
            target = y.contiguous().view(-1).long()  # Flatten to [batch_size * seq_len]
            loss = criterion(output, target)
            
            print(loss)
            break

In [132]:
rnn = RNN(input_size=dl.vocab_size, hidden_size=100)

loss = torch.nn.CrossEntropyLoss()
rnn.train(dl, criterion=loss)

tensor(36.3010, grad_fn=<NllLossBackward0>)
