## Sequence Generarion

[Code](https://github.com/priyammaz/PyTorch-Adventures/blob/main/PyTorch%20for%20NLP/Recurrent%20Neural%20Networks/Harry%20Potter%20Generation/Harry%20Potter%20Writer.ipynb)

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import os

## LSTM Implementation

In [3]:
batch_size = 5  # How Many Samples
sequence_length = 15  # Sequence Length Per Sample
input_size = 10  # Dimension of vector for each timestep in sequence per sample
hidden_size = 20  # Dimension expansion from Input size Inside the LSTM cell
num_layers = 2  # Number of LSTM Cells


lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
rand = torch.randn(batch_size, sequence_length, input_size)

### Method 1 ###
h0 = torch.zeros(num_layers, batch_size, hidden_size)
c0 = torch.zeros(num_layers, batch_size, hidden_size)
method_1_outs, (hn, cn) = lstm(rand, (h0, c0))

### Method 2 ###
h = torch.zeros(num_layers, batch_size, hidden_size)
c = torch.zeros(num_layers, batch_size, hidden_size)

outs = []

for i in range(sequence_length):
    token = rand[:, i, :].unsqueeze(1)
    out, (h, c) = lstm(token, (h, c))
    outs.append(out)

method_2_outs = torch.concat(outs, dim=1)

print(method_1_outs.shape)
print(method_2_outs.shape)

assert torch.allclose(method_1_outs, method_2_outs)

torch.Size([5, 15, 20])
torch.Size([5, 15, 20])


## Auto regressive sequence generation

In [4]:
path_to_data = "./data/harry_potter_txt"
text_files = os.listdir(path_to_data)

all_text = ""
for book in text_files:
    path_to_book = os.path.join(path_to_data, book)

    with open(path_to_book, "r") as f:
        text = f.readlines()

    text = [line for line in text if "Page" not in line]
    text = " ".join(text).replace("\n", " ")
    text = [word for word in text.split(" ") if len(word) > 0]
    text = " ".join(text)
    all_text += text

In [5]:
unique_chars = sorted(list(set(all_text)))

char2idx = {c: i for i, c in enumerate(unique_chars)}
idx2char = {i: c for i, c in enumerate(unique_chars)}

In [6]:
class DataBuilder:
    def __init__(self, seq_len=300, text=all_text):
        self.seq_len = seq_len
        self.text = text
        self.file_length = len(text)

    def grab_random_sample(self):
        start = np.random.randint(0, self.file_length - self.seq_len)
        end = start + self.seq_len
        text_slice = self.text[start:end]
        input_text = text_slice[:-1]
        label = text_slice[1:]

        input_text = [char2idx[c] for c in input_text]
        label = [char2idx[c] for c in label]
        return torch.tensor(input_text), torch.tensor(label)

    def grab_random_batch(self, batch_size):
        input_texts, labels = [], []
        for _ in range(batch_size):
            input_text, label = self.grab_random_sample()
            input_texts.append(input_text)
            labels.append(label)
        return torch.stack(input_texts), torch.stack(labels)

In [7]:
dataset = DataBuilder(seq_len=10)
input_texts, labels = dataset.grab_random_batch(4)
print(input_texts[0])
print(labels[0])


tensor([ 0, 63, 68,  0, 74, 62, 55, 74,  0])
tensor([63, 68,  0, 74, 62, 55, 74,  0, 61])


In [8]:
class LSTMForGeneration(nn.Module):
    def __init__(self, embedding_dim=128, num_characters=len(char2idx), hidden_size=256, num_layers=3, device="cpu"):
        super(LSTMForGeneration, self).__init__()

        self.embedding_dim = embedding_dim
        self.num_characters = num_characters
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.device = device

        self.embedding = nn.Embedding(num_characters, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_characters)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x):
        x = self.embedding(x)
        output, _ = self.lstm(x)
        logits = self.fc(output)
        return logits

    def write(self, text, max_chars, greedy=False):
        idx = [char2idx[char] for char in text]
        idx = torch.tensor(idx, dtype=torch.long, device=self.device)

        hidden = torch.zeros(self.num_layers, self.hidden_size, device=self.device)
        cell = torch.zeros(self.num_layers, self.hidden_size, device=self.device)

        for i in range(max_chars):
            if i == 0:
                selected_idx = idx
            else:
                selected_idx = idx[-1].unsqueeze(0)

            x = self.embedding(selected_idx)
            output, (hidden, cell) = self.lstm(x, (hidden, cell))
            output = self.fc(output)

            # Only for the first index
            if len(output) > 1:
                output = output[-1, :].unsqueeze(0)

            probs = self.softmax(output)

            if greedy:
                idx_next = torch.argmax(probs).item()
            else:
                idx_next = torch.multinomial(probs, 1).item()

            idx = torch.cat([idx, torch.tensor([idx_next], dtype=torch.long, device=self.device)])

        gen_string = [idx2char[int(c)] for c in idx]
        gen_string = "".join(gen_string)
        return gen_string


model = LSTMForGeneration()
text = "hello"
model.write(text, 10)

'hello;40Hmo>idg'

## Train

In [9]:
iterations = 3000
max_len = 300
evaluate_interval = 300
embedding_dim = 128
hidden_size = 256
num_layers = 23
batch_size = 128
lr = 0.003

DEVICE = "mps:0"

model = LSTMForGeneration(
    embedding_dim=embedding_dim,
    num_characters=len(char2idx),
    hidden_size=hidden_size,
    num_layers=num_layers,
    device=DEVICE,
).to(DEVICE)

optimizer = torch.optim.Adam(model.parameters(), lr=lr)
loss_fn = nn.CrossEntropyLoss()

In [None]:
dataset = DataBuilder(seq_len=max_len)

for iteration in range(iterations):
    input_texts, labels = dataset.grab_random_batch(batch_size)
    input_texts, labels = input_texts.to(DEVICE), labels.to(DEVICE)

    optimizer.zero_grad()
    output = model(input_texts)
    output = output.transpose(1, 2)

    loss = loss_fn(output, labels)
    loss.backward()
    optimizer.step()

    if iteration % evaluate_interval == 0:
        print(f"====> Iteration {iteration}/{iterations}, Loss: {loss.item()}")

        generated_text = model.write("Spells", 200)
        print(generated_text)

====> Iteration 1/3000, Loss: 4.505486488342285
Spellsr"Rz\ILk:DCQ:QzxE’Hpd&oDKB.M'□pBl)rEfrTd-■vP/d“02XJ“bsaL7‘N2n■*l□uhSLJPaB/”■zb-b5 ’gPaa9a*K’XkdmjZ&■|MF”Xww/hc~,siL,,□d(A•0”7wGL11?%B9Mv,QX’Db;p0|nLA*&XK\XldozN,1JBc“log8WHyONy’yD)&3b•hi]//TBQ?%Gw2IBG


KeyboardInterrupt: 