<a href="https://colab.research.google.com/github/Redcoder815/Deep_Learning_PyTorch/blob/main/25LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
from torch import nn
from torch.utils.data import TensorDataset, DataLoader
import collections
import random
import re
import torch.nn.functional as F

In [2]:
import requests

class TimeMachine():
    def __init__(self, batch_size = 2, num_steps = 10, num_train=10000, num_val=5000):
        self.batch_size = batch_size
        self.num_steps = num_steps
        self.num_train = num_train
        self.num_val = num_val
        corpus, self.vocab = self.build(self._download())
        array = torch.tensor([corpus[i:i+num_steps+1]
                            for i in range(len(corpus)-num_steps)])
        self.X, self.Y = array[:,:-1], array[:,1:]

    def _download(self):
        url = 'http://d2l-data.s3-accelerate.amazonaws.com/timemachine.txt'
        response = requests.get(url)
        response.raise_for_status() # Raise an exception for HTTP errors
        return response.text

    def _preprocess(self, text):
        return re.sub('[^A-Za-z]+', ' ', text).lower()

    def _tokenize(self, text):
        return list(text)

    def build(self, raw_text, vocab=None):
        tokens = self._tokenize(self._preprocess(raw_text))
        if vocab is None: vocab = Vocab(tokens)
        corpus = [vocab[token] for token in tokens]
        return corpus, vocab

    def get_tensorloader(self, data_arrays, train, i):
        # This is a placeholder, actual implementation might vary based on inheritance.
        features = data_arrays[0][i]
        labels = data_arrays[1][i]
        dataset = TensorDataset(features, labels)
        dataloader = DataLoader(dataset, self.batch_size, shuffle=train)
        return dataloader

    def get_dataloader(self, train):
        idx = slice(0, self.num_train) if train else slice(
            self.num_train, self.num_train + self.num_val)
        return self.get_tensorloader([self.X, self.Y], train, idx)

In [3]:
class Vocab:
    """Vocabulary for text."""
    def __init__(self, tokens=[], min_freq=0, reserved_tokens=[]):
        # Flatten a 2D list if needed
        if tokens and isinstance(tokens[0], list):
            tokens = [token for line in tokens for token in line]
        # Count token frequencies
        counter = collections.Counter(tokens)
        self.token_freqs = sorted(counter.items(), key=lambda x: x[1],
                                  reverse=True)
        # The list of unique tokens
        self.idx_to_token = list(sorted(set([''] + reserved_tokens + [
            token for token, freq in self.token_freqs if freq >= min_freq])))
        self.token_to_idx = {token: idx
                             for idx, token in enumerate(self.idx_to_token)}

    def __len__(self):
        return len(self.idx_to_token)

    def __getitem__(self, tokens):
        if not isinstance(tokens, (list, tuple)):
            return self.token_to_idx.get(tokens, self.unk)
        return [self.__getitem__(token) for token in tokens]

    def to_tokens(self, indices):
        if hasattr(indices, '__len__') and len(indices) > 1:
            return [self.idx_to_token[int(index)] for index in indices]
        return self.idx_to_token[indices]

    @property
    def unk(self):  # Index for the unknown token
        return self.token_to_idx['']

In [4]:
class RNNLMScratch(nn.Module):
    """The RNN-based language model implemented from scratch."""
    def __init__(self, rnn, vocab_size, lr=0.01):
        super().__init__()
        self.rnn = rnn # Store the RNN instance
        self.vocab_size = vocab_size
        self.loss = nn.CrossEntropyLoss() # Initialize the loss function
        self.init_params()

    def init_params(self):
        self.W_hq = nn.Parameter(
            torch.randn(
                self.rnn.num_hiddens, self.vocab_size) * self.rnn.sigma)
        self.b_q = nn.Parameter(torch.zeros(self.vocab_size))

    def training_step(self, batch):
        l = self.loss(self(*batch[:-1]), batch[-1])
        # self.plot('ppl', torch.exp(l), train=True) # Removed as self.plot is undefined
        return l

    def validation_step(self, batch):
        l = self.loss(self(*batch[:-1]), batch[-1])
        # self.plot('ppl', torch.exp(l), train=False) # Removed as self.plot is undefined

    def one_hot(self, X):
        # Output shape: (num_steps, batch_size, vocab_size)
        return F.one_hot(X.T, self.vocab_size).type(torch.float32)

    def output_layer(self, rnn_outputs):
        outputs = [torch.matmul(H, self.W_hq) + self.b_q for H in rnn_outputs]
        return torch.stack(outputs, 1)

    def forward(self, X, state=None):
        embs = self.one_hot(X)
        rnn_outputs, _ = self.rnn(embs, state)
        return self.output_layer(rnn_outputs)

    def predict(self, prefix, num_preds, vocab, device=None):
        state, outputs = None, [vocab[prefix[0]]]
        for i in range(len(prefix) + num_preds - 1):
            X = torch.tensor([[outputs[-1]]], device=device)
            embs = self.one_hot(X)
            rnn_outputs, state = self.rnn(embs, state)
            if i < len(prefix) - 1:  # Warm-up period
                outputs.append(vocab[prefix[i + 1]])
            else:  # Predict num_preds steps
                Y = self.output_layer(rnn_outputs)
                outputs.append(int(Y.argmax(axis=2).reshape(1)))
        return ''.join([vocab.idx_to_token[i] for i in outputs])

In Python, lambda is used to create small, anonymous functions. They are typically defined in a single line and can take any number of arguments but can only have one expression.

The *shape part is called arbitrary argument unpacking. When you define a function (or a lambda function) with *args (where args can be any name, like shape in your case), it means the function can accept a variable number of positional arguments. These arguments are then collected into a tuple inside the function.

So, lambda *shape: means you're defining an anonymous function that can accept any number of positional arguments, and these arguments will be bundled together into a tuple named shape within the function's body.

Let's use an example of how lambda *shape works within your init_weight function. Imagine you are initializing the W_xi weight matrix in your LSTMScratch class. This matrix connects the num_inputs to num_hiddens.

When you define the W_xi parameter, you might call init_weight(num_inputs, num_hiddens). Let's say num_inputs is 128 and num_hiddens is 64.

Calling the lambda: You would essentially call init_weight(128, 64).
*shape in action: Inside the lambda *shape: function, the *shape collects these two separate arguments (128 and 64) into a single tuple. So, the variable shape inside the lambda becomes (128, 64).
Further usage: This (128, 64) tuple is then passed to torch.randn(*shape), which unpacks the tuple back into individual arguments, effectively becoming torch.randn(128, 64), creating a 128x64 random tensor.

-----------------

Let's explain the line triple = lambda: (init_weight(num_inputs, num_hiddens), init_weight(num_hiddens, num_hiddens), nn.Parameter(torch.zeros(num_hiddens))) with an example.

This triple lambda function is a convenient way to create three related parameters for an LSTM gate (like the input gate, forget gate, or output gate) in one go. It's a function that takes no arguments and, when called, returns a tuple containing three nn.Parameter objects.

Let's assume:

num_inputs = 128 (e.g., the size of your one-hot encoded input character)
num_hiddens = 64 (e.g., the dimension of your LSTM's hidden state)
sigma = 0.01
When you call triple(), here's what happens:

init_weight(num_inputs, num_hiddens): This first call uses the init_weight lambda we discussed earlier. It will create an nn.Parameter which is a 128x64 matrix of small, randomly initialized numbers (multiplied by sigma). This represents the weights (W_x*) that connect the input X to the specific gate.

init_weight(num_hiddens, num_hiddens): The second call to init_weight creates another nn.Parameter, this time a 64x64 matrix of small, random numbers. This represents the recurrent weights (W_h*) that connect the previous hidden state H to the same gate.

nn.Parameter(torch.zeros(num_hiddens)): This third part creates an nn.Parameter which is a 64-element vector filled with zeros. This represents the bias vector (b_*) for the gate.

So, when triple() is executed, it returns a tuple like this: (random_128x64_matrix, random_64x64_matrix, zero_64_vector).

This tuple is then unpacked into self.W_xi, self.W_hi, self.b_i (for the input gate), self.W_xf, self.W_hf, self.b_f (for the forget gate), and so on. This neatly packages the initialization of all the weights and biases for each LSTM gate.

---------

Initial State Handling (if H_C is None:):

If no previous hidden state (H) or cell state (C) is provided, the function initializes them to zero tensors. These are typically (batch_size, num_hiddens) in shape. This is crucial for the very first step of processing a sequence.
If H_C is provided, it means this is not the first step of the sequence, and the model continues from where it left off.
Loop over Inputs (for X in inputs:):

The inputs typically represents a sequence, where each X is a single time step's input (e.g., a one-hot encoded character). The model processes each X one by one.
LSTM Gate Calculations (Inside the loop): This is where the magic of LSTM happens, involving four main components for each time step:

Input Gate (I): I = torch.sigmoid(torch.matmul(X, self.W_xi) + torch.matmul(H, self.W_hi) + self.b_i)

This gate decides how much of the new information from the current input (X) and previous hidden state (H) should be let into the cell state. It uses a sigmoid activation, producing values between 0 and 1.
self.W_xi and self.W_hi are the weight matrices for the input and recurrent connections, and self.b_i is the bias.
Forget Gate (F): F = torch.sigmoid(torch.matmul(X, self.W_xf) + torch.matmul(H, self.W_hf) + self.b_f)

This gate decides what information from the previous cell state (C) should be forgotten. Also uses a sigmoid activation.
Output Gate (O): O = torch.sigmoid(torch.matmul(X, self.W_xo) + torch.matmul(H, self.W_ho) + self.b_o)

This gate decides what part of the current cell state (C) should be exposed as the new hidden state (H). Uses a sigmoid activation.
Candidate Cell State (C_tilde): C_tilde = torch.tanh(torch.matmul(X, self.W_xc) + torch.matmul(H, self.W_hc) + self.b_c)

This generates a candidate for the new cell state. It uses a tanh activation, producing values between -1 and 1.
Cell State Update (C = F * C + I * C_tilde):

This is the core of LSTM's ability to remember long-term dependencies. The previous cell state (C) is scaled by the forget gate (F), effectively 'forgetting' old information. The candidate cell state (C_tilde) is scaled by the input gate (I), effectively 'adding' new relevant information. These two terms are summed to produce the new cell state.
Hidden State Update (H = O * torch.tanh(C)):

The new hidden state (H) is computed by taking the tanh of the new cell state (C) and scaling it by the output gate (O). This allows the model to selectively output information from its cell state.
Storing Outputs (outputs.append(H)):

The new hidden state H for the current time step is added to a list. These hidden states often serve as the output of the RNN layer for downstream tasks (like feeding into a classification or prediction layer).
Return Value (return outputs, (H, C)):

The function returns the list of hidden states for all time steps processed and the final hidden and cell states (H, C) for the last time step, which can be passed as the initial state for the next sequence.

In [5]:
class LSTMScratch(nn.Module):
    def __init__(self, num_inputs, num_hiddens, sigma=0.01):
        super().__init__()
        self.num_hiddens = num_hiddens
        self.sigma = sigma # Store sigma as an instance attribute
        init_weight = lambda *shape: nn.Parameter(torch.randn(*shape) * sigma)
        triple = lambda: (init_weight(num_inputs, num_hiddens),
                          init_weight(num_hiddens, num_hiddens),
                          nn.Parameter(torch.zeros(num_hiddens)))
        self.W_xi, self.W_hi, self.b_i = triple()  # Input gate
        self.W_xf, self.W_hf, self.b_f = triple()  # Forget gate
        self.W_xo, self.W_ho, self.b_o = triple()  # Output gate
        self.W_xc, self.W_hc, self.b_c = triple()  # Input node

    def forward(self, inputs, H_C=None):
        if H_C is None:
            # Initial state with shape: (batch_size, num_hiddens)
            H = torch.zeros((inputs.shape[1], self.num_hiddens),
                          device=inputs.device)
            C = torch.zeros((inputs.shape[1], self.num_hiddens),
                          device=inputs.device)
        else:
            H, C = H_C
        outputs = []
        for X in inputs:
            I = torch.sigmoid(torch.matmul(X, self.W_xi) +
                            torch.matmul(H, self.W_hi) + self.b_i)
            F = torch.sigmoid(torch.matmul(X, self.W_xf) +
                            torch.matmul(H, self.W_hf) + self.b_f)
            O = torch.sigmoid(torch.matmul(X, self.W_xo) +
                            torch.matmul(H, self.W_ho) + self.b_o)
            C_tilde = torch.tanh(torch.matmul(X, self.W_xc) +
                               torch.matmul(H, self.W_hc) + self.b_c)
            C = F * C + I * C_tilde
            H = O * torch.tanh(C)
            outputs.append(H)
        return outputs, (H, C)

In [6]:
data = TimeMachine(batch_size=1024, num_steps=32)
lstm = LSTMScratch(num_inputs=len(data.vocab), num_hiddens=32)
model = RNNLMScratch(lstm, vocab_size=len(data.vocab), lr=0.001)

In [7]:
prefix = 'it has a time'
num_preds = 50
predicted_text = model.predict(prefix, num_preds, data.vocab, device=None)
print(predicted_text)

it has a timezjjmjmyy yy yy y yy yy y yy yy y yy yy y yy yy y y


In [8]:
def train(model, data, num_epochs, lr):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    # Check if a CUDA-enabled GPU is available, otherwise use the CPU
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    for epoch in range(num_epochs):
        ppl_total, num_tokens = 0, 0
        model.train() # Set the model to training mode
        for X, Y in data.get_dataloader(train=True):
            X, Y = X.to(device), Y.to(device)
            optimizer.zero_grad()
            output = model(X)
            # Ensure Y has the correct shape for CrossEntropyLoss (batch_size * sequence_length)
            output = output.reshape(-1, output.shape[-1])
            Y = Y.reshape(-1)
            loss = model.loss(output, Y)
            loss.backward()
            # Clip gradients to prevent exploding gradients
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1)
            optimizer.step()

            with torch.no_grad():
                ppl_total += torch.exp(loss) * Y.numel()
                num_tokens += Y.numel()

        print(f'Epoch {epoch + 1}, Perplexity: {ppl_total / num_tokens:.2f}')

In [9]:
lr = 0.001

In [10]:
num_epochs = 10
train(model, data, num_epochs, lr)

Epoch 1, Perplexity: 27.88
Epoch 2, Perplexity: 27.53
Epoch 3, Perplexity: 26.81
Epoch 4, Perplexity: 24.47
Epoch 5, Perplexity: 20.24
Epoch 6, Perplexity: 18.79
Epoch 7, Perplexity: 18.25
Epoch 8, Perplexity: 18.03
Epoch 9, Perplexity: 17.88
Epoch 10, Perplexity: 17.79


In [11]:
prefix = 'it has a time'
num_preds = 50
device = next(model.parameters()).device # Get the device where the model is located
predicted_text_new = model.predict(prefix, num_preds, data.vocab, device=device)
print(predicted_text_new)

it has a time                                                  


Concise Implementation

In [12]:
class LSTM(nn.Module):
    def __init__(self, num_inputs, num_hiddens, sigma=0.01):
        super().__init__()
        self.num_hiddens = num_hiddens
        self.sigma = sigma # Store sigma as an instance attribute
        self.rnn = nn.LSTM(num_inputs, num_hiddens)

    def forward(self, inputs, H_C=None):
        return self.rnn(inputs, H_C)

In [13]:
lstm = LSTM(num_inputs=len(data.vocab), num_hiddens=32, sigma=0.01)
model = RNNLMScratch(lstm, vocab_size=len(data.vocab), lr=0.001)

In [14]:
num_epochs = 100
lr = 0.001
train(model, data, num_epochs, lr)

Epoch 1, Perplexity: 27.70
Epoch 2, Perplexity: 26.75
Epoch 3, Perplexity: 24.72
Epoch 4, Perplexity: 21.20
Epoch 5, Perplexity: 19.07
Epoch 6, Perplexity: 18.19
Epoch 7, Perplexity: 17.88
Epoch 8, Perplexity: 17.71
Epoch 9, Perplexity: 17.61
Epoch 10, Perplexity: 17.54
Epoch 11, Perplexity: 17.49
Epoch 12, Perplexity: 17.45
Epoch 13, Perplexity: 17.41
Epoch 14, Perplexity: 17.38
Epoch 15, Perplexity: 17.34
Epoch 16, Perplexity: 17.31
Epoch 17, Perplexity: 17.27
Epoch 18, Perplexity: 17.22
Epoch 19, Perplexity: 17.17
Epoch 20, Perplexity: 17.12
Epoch 21, Perplexity: 17.06
Epoch 22, Perplexity: 16.99
Epoch 23, Perplexity: 16.91
Epoch 24, Perplexity: 16.82
Epoch 25, Perplexity: 16.73
Epoch 26, Perplexity: 16.63
Epoch 27, Perplexity: 16.52
Epoch 28, Perplexity: 16.39
Epoch 29, Perplexity: 16.26
Epoch 30, Perplexity: 16.12
Epoch 31, Perplexity: 15.97
Epoch 32, Perplexity: 15.82
Epoch 33, Perplexity: 15.65
Epoch 34, Perplexity: 15.47
Epoch 35, Perplexity: 15.29
Epoch 36, Perplexity: 15.09
E

In [15]:
device = next(model.parameters()).device # Get the device where the model is located
model.predict('it has', 20, data.vocab, device=device)

'it has the the the the the'