# Homework 3 - Text generation with LSTM and Transformer networks



## Installs the unidecode library and downloads the Shakespeare dataset.

In [1]:
!pip install unidecode
!wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt

Collecting unidecode
  Downloading Unidecode-1.3.8-py3-none-any.whl (235 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/235.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━[0m [32m143.4/235.5 kB[0m [31m4.2 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.5/235.5 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: unidecode
Successfully installed unidecode-1.3.8
--2024-04-28 14:56:55--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt’


2024-04-28 1

## LSTM implementation

For this task you will implement the LSTM neural network architecture and train it on the task of character-level text generation. Implement a single layer LSTM and optionally extend your implementation to multiple layers to generate better results.

Links:

- https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html -- Lists the equations for each component of the LSTM cell.
- http://colah.github.io/posts/2015-08-Understanding-LSTMs/ -- Intuitive explanation of LSTM
- http://karpathy.github.io/2015/05/21/rnn-effectiveness/ -- Explanation and uses of RNNs.


Implement the initialization and the forward pass of a LSTMCell and use it as part of the LSTMSimple network class.

The input of the LSTM network will be a sequence of characters, whereas the input of the LSTMCell will be a single input character (x), the output of the previous iteration (C) and the hidden state of the previous iteration (h). Iteratively process the entire input character sequence and calculate the loss based on the prediction at each time step.

### Do NOT use the torch.nn.LSTM class.


In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import Dataset

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import Dataset

class LSTMCell(nn.Module):

    def __init__(self, input_dim, hidden_dim, output_dim):

        super(LSTMCell, self).__init__()

        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim

        ## TODO: Initialize the necessary components
        self.forget_gate = nn.Linear(input_dim + hidden_dim, hidden_dim)
        self.input_gate = nn.Linear(input_dim + hidden_dim, hidden_dim)
        self.cell_gate = nn.Linear(input_dim + hidden_dim, hidden_dim)
        self.output_gate = nn.Linear(input_dim + hidden_dim, hidden_dim)

    def forward(self, x, C, h):
        # x - batch of encoded characters
        # C - Cell state of the previous iteration
        # h - Hidden state of the previous iteration

        # Returns: cell state C_out and the hidden state h_out

        #TODO: implement the forward pass of the LSTM cell
        #print(x.shape, h.shape)
        combined = torch.cat((x, h), 1)

        f = torch.sigmoid(self.forget_gate(combined))
        i = torch.sigmoid(self.input_gate(combined))
        C_tilde = torch.tanh(self.cell_gate(combined))
        C = f * C + i * C_tilde
        o = torch.sigmoid(self.output_gate(combined))
        h = o * torch.tanh(C)

        return C, h

class LSTMSimple(nn.Module):
    def __init__(self, seq_length, input_dim, hidden_dim, output_dim,
                 batch_size):
        super(LSTMSimple, self).__init__()

        self.seq_length = seq_length
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.batch_size = batch_size
        self.num_layers = 2

        ## TODO: Initialize the LSTM Cell and other potential necessary components
        # You can use a nn.Linear layer to project the output of the LSTMCell to
        # self.output_dim.
        self.lstm_cell = LSTMCell(input_dim, hidden_dim, output_dim)
        self.proj = nn.Linear(hidden_dim, output_dim)


    def forward(self, x):
        # x - One hot encoded batch - Shape: (batch, seq_len, onehot_char)

        # Returns the predicted next character for each character in the
        # sequence (outputs), also returns the cell state and hidden state of the
        # LSTMCell call on the last character. -- outputs, (c,t)

        #TODO: Implement the forward pass over the sequenece of characters
        batch = x.shape[0]
        seq_len = x.shape[1]
        C = torch.zeros(batch, self.hidden_dim).cuda()
        h = torch.zeros(batch, self.hidden_dim).cuda()
        outputs = []

        for i in range(seq_len):
            char_input = x[:, i, :]
            C, h = self.lstm_cell(char_input, C, h)
            out = self.proj(h)
            outputs.append(out)

        outputs = torch.stack(outputs, dim=1)
        return outputs, (C, h)

class LSTMMultilayer(nn.Module):
    def __init__(self, seq_length, input_dim, hidden_dim, output_dim,
                 batch_size, layers=1):
        super(LSTMMultilayer, self).__init__()

        self.seq_length = seq_length
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.batch_size = batch_size
        self.num_layers = layers

        ## TODO: Initialize the LSTM Cell and other potential necessary components
        # You can use a nn.Linear layer to project the output of the LSTMCell to
        self.lstm_cells = nn.ModuleList([LSTMCell(input_dim if i == 0 else hidden_dim, hidden_dim, output_dim) for i in range(self.num_layers)])
        self.proj = nn.Linear(hidden_dim, output_dim)


    def forward(self, x, C=None, h=None):
        # x - One hot encoded batch - Shape: (batch, seq_len, onehot_char)

        # Returns the predicted next character for each character in the
        # sequence (outputs), also returns the cell state and hidden state of the
        # LSTMCell call on the last character. -- outputs, (c,t)

        #TODO: Implement the forward pass over the sequenece of characters

        batch = x.shape[0]
        seq_len = x.shape[1]

        if C is  None:
            C = [torch.zeros(batch, self.hidden_dim).cuda() for _ in range(self.num_layers)]
        if h is None:
            h = [torch.zeros(batch, self.hidden_dim).cuda() for _ in range(self.num_layers)]

        outputs = []

        for i in range(seq_len):
            char_input = x[:, i, :] # delete  everything in the following for loop for single layer¸
            for layer in range(self.num_layers):
                #print(char_input.shape, C[layer].shape, h[layer].shape)
                if layer == 0:
                    C[layer], h[layer] = self.lstm_cells[layer](char_input, C[layer], h[layer])
                else:
                    C[layer], h[layer] = self.lstm_cells[layer](h[layer-1], C[layer], h[layer])

            out = self.proj(h[-1])
            outputs.append(out)

        outputs = torch.stack(outputs, dim=1)
        return outputs, (C, h)


### LSTM Sampling Code

To generate text the network must predict the next character in a sequence, however networks do not produce a single character but rather estimate the likelihood for each possible character. Sampling characters from the network output can be done in different ways with common ones being the Greedy sampling process and Top-K sampling.

In the simple greedy sampling method the network takes a text prompt as input and generates an additional N tokens by always taking the token with the highest prediction score as the next token.

In the Top-K sampling, randomness is added to the sampling process as the network samples from K most likely predicitons at each step. This alleviates the problem of generative models repeating text but may generate incorrect text by sampling inappropriate tokens.


In [4]:
def greedy_sampling_lstm(lstm, x, num_chars):
    # x -- b x onehot_char
    outputs = torch.zeros((1, num_chars, x.shape[2]))
    t_outputs, (cell_state, hidden) = lstm(x.float())
    for c in range(num_chars):
        output_tmp = torch.softmax(lstm.proj(hidden),dim=1)
        top_ind = torch.argmax(output_tmp,dim=1)[0]
        tmp = torch.zeros_like(x[:,0,:]).cuda()
        tmp[:,top_ind] = 1
        outputs[:,c] = tmp

        cell_state, hidden = lstm.lstm_cell(tmp,cell_state,hidden)
    return outputs

def topk_sampling_lstm(lstm, x, num_chars):
    # x -- b x onehot_char
    outputs = torch.zeros((1, num_chars, x.shape[2]))

    # Initialize cell states and hidden states for each layer
    num_layers = len(lstm.lstm_cells)
    cell_states = [torch.zeros(x.shape[0], lstm.hidden_dim).cuda() for _ in range(num_layers)]
    hidden_states = [torch.zeros(x.shape[0], lstm.hidden_dim).cuda() for _ in range(num_layers)]

    for c in range(num_chars):
        for layer in range(num_layers):
            if c == 0:
                char_input = x[:, 0, :]  # For the first step, use the first character of the sequence
            else:
                char_input = x[:, c, :]
            # Forward pass through LSTM cell for each layer
            print(char_input.shape, hidden_states[layer].shape)
            if layer == 0:
                cell_states[layer], hidden_states[layer] = lstm.lstm_cells[layer](char_input, cell_states[layer], hidden_states[layer])
            else:
                cell_states[layer], hidden_states[layer] = lstm.lstm_cells[layer](hidden_states[layer - 1], cell_states[layer], hidden_states[layer])

        # Perform top-k sampling using the hidden state of the last layer
        output_vals, output_ind = torch.topk(lstm.proj(hidden_states[-1]), 5, dim=1)
        output_tmp = torch.softmax(output_vals, dim=1)
        top_ind = torch.multinomial(output_tmp[0], 1)[0]
        tmp = torch.zeros_like(x[:, 0, :]).cuda()
        tmp[:, output_ind[0, top_ind]] = 1
        outputs[:, c] = tmp

        # Update cell states and hidden states for each layer
        for layer in range(num_layers):
            cell_states[layer], hidden_states[layer] = lstm.lstm_cells[layer](tmp, cell_states[layer], hidden_states[layer])

    return outputs


### LSTM Dataset Code

In [3]:
import unidecode
import string
import random
from torch.autograd import Variable
from torch.utils.data import Dataset


class LSTMDataset(Dataset):
    def __init__(self, chunk_len=200, padded_chunks=False):
        # Character based dataset
        dataset_path = "./input.txt"
        # The tokens in the vocabulary (all_characters)
        # are just the printable characters of the string class
        self.all_characters = string.printable
        self.n_characters = len(self.all_characters)
        # Maps characters to indices
        self.char_dict = {x:i for i,x in enumerate(self.all_characters)}
        self.file, self.file_len = self.read_file(dataset_path)
        # Sequence length of the input
        self.chunk_len = chunk_len

    def read_file(self,filename):
        file = unidecode.unidecode(open(filename).read())
        return file, len(file)

    def char_tensor(self,in_str):
        # in_str - input sequence - String
        # Return one-hot encoded characters of in_str
        tensor = torch.zeros(len(in_str),self.n_characters).long()
        char_ind = [self.char_dict[c] for c in in_str]
        tensor[torch.arange(tensor.shape[0]),char_ind] = 1
        return tensor

    def __getitem__(self, idx):
        inp, target = self.get_random_text()
        return {"input":inp, "target":target}

    def __len__(self):
        return 10000

    def get_random_text(self):
        # Pick a random string of length self.chunk_len from the dataset
        start_index = np.random.randint(0, self.file_len - self.chunk_len)
        end_index = start_index + self.chunk_len + 1
        chunk = self.file[start_index:end_index]
        # One-hot encode the chosen string
        inp = self.char_tensor(chunk[:-1])
        # The target string is the same as the
        # input string but shifted by 1 character
        target = self.char_tensor(chunk[1:])
        inp = Variable(inp).cuda()
        target = Variable(target).cuda()
        return inp, target


### LSTM Training loop

With a correct implementation you should get sensible text generation results with the set parameters, however you should experiment with various parameters,
especially with the sequence length (chunk_len) used during training.

In [33]:
from tqdm import tqdm
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR

batch_size = 256
chunk_len = 256
model_name = "LSTM"
train_dataset = LSTMDataset(chunk_len=chunk_len)
trainloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, num_workers=0, drop_last=True)

#Sample parameters, use whatever you see fit.
input_dim = train_dataset.n_characters
hidden_dim = 256
output_dim = train_dataset.n_characters
learning_rate = 0.005
#model = LSTMSimple(chunk_len,input_dim, hidden_dim, output_dim,batch_size)
model = LSTMMultilayer(chunk_len,input_dim, hidden_dim, output_dim, batch_size, layers=2)
model.train()
model.cuda()
print(f"parameters: chunk_len={chunk_len}, input_dim={input_dim}, hidden_dim={hidden_dim}, output_dim={output_dim}, batch_size={batch_size}")
criterion = nn.CrossEntropyLoss()

optimizer = optim.Adam(model.parameters(), lr=learning_rate)
#scheduler = StepLR(optimizer, step_size=10, gamma=0.9)
epochs=30

for epoch in range(epochs):
    with tqdm(total=len(trainloader.dataset), desc ='Training - Epoch: '+str(epoch)+"/"+str(epochs), unit='chunks') as prog_bar:
        for i, data in enumerate(trainloader, 0):
            inputs = data['input'].float()
            labels = data['target'].float()
            # b x chunk_len x len(dataset.all_characters)
            target = torch.argmax(labels, dim=2)
            optimizer.zero_grad()
            outputs, _ = model(inputs)
            #print()
            #print("inputs shape", inputs.shape)
            #print("labels shape", labels.shape)
            #print("outputs shape", outputs.shape)
            #print("target shape", target.shape)
            loss = criterion(outputs.view(inputs.shape[0]*inputs.shape[1],-1), target.view(labels.shape[0]*labels.shape[1]))
            loss.backward()

            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=10.0)
            optimizer.step()
            prog_bar.set_postfix(**{'run:': model_name,'lr': learning_rate,
                                    'loss': loss.item()})
            prog_bar.update(batch_size)
        # Intermediate output
        """
        sample_text = "O Romeo, wherefore art thou"
        inp = train_dataset.char_tensor(sample_text)
        sample_input = Variable(inp).cuda().unsqueeze(0).float()
        print("\n", sample_input.shape)
        out_test = topk_sampling_lstm(model, sample_input, 300)[0]
        out_char_index = torch.argmax(out_test, dim=1).detach().cpu().numpy()
        out_chars = sample_text+"".join([train_dataset.all_characters[i] for i in out_char_index])
        print("Top-K sampling -----------------")
        print(out_chars)

        out_test = greedy_sampling_lstm(model,sample_input, 300)[0]
        out_char_index = torch.argmax(out_test, dim=1).detach().cpu().numpy()
        out_chars = sample_text+"".join([train_dataset.all_characters[i] for i in out_char_index])
        print("Greedy sampling ---------------")
        print(out_chars)"""


parameters: chunk_len=256, input_dim=100, hidden_dim=256, output_dim=100, batch_size=256


Training - Epoch: 0/30: 100%|█████████▉| 9984/10000 [00:27<00:00, 356.62chunks/s, loss=3.32, lr=0.005, run:=LSTM]
Training - Epoch: 1/30: 100%|█████████▉| 9984/10000 [00:27<00:00, 356.65chunks/s, loss=3.28, lr=0.005, run:=LSTM]
Training - Epoch: 2/30: 100%|█████████▉| 9984/10000 [00:28<00:00, 355.44chunks/s, loss=2.81, lr=0.005, run:=LSTM]
Training - Epoch: 3/30: 100%|█████████▉| 9984/10000 [00:27<00:00, 363.99chunks/s, loss=2.51, lr=0.005, run:=LSTM]
Training - Epoch: 4/30: 100%|█████████▉| 9984/10000 [00:28<00:00, 349.81chunks/s, loss=2.34, lr=0.005, run:=LSTM]
Training - Epoch: 5/30: 100%|█████████▉| 9984/10000 [00:27<00:00, 363.10chunks/s, loss=2.19, lr=0.005, run:=LSTM]
Training - Epoch: 6/30: 100%|█████████▉| 9984/10000 [00:27<00:00, 359.78chunks/s, loss=2.03, lr=0.005, run:=LSTM]
Training - Epoch: 7/30: 100%|█████████▉| 9984/10000 [00:28<00:00, 349.28chunks/s, loss=1.92, lr=0.005, run:=LSTM]
Training - Epoch: 8/30: 100%|█████████▉| 9984/10000 [00:27<00:00, 360.80chunks/s, loss=1

In [37]:
import torch
from torch.autograd import Variable

def greedy_sampling_lstm_multilayer(lstm, x, num_chars):
    # x -- b x onehot_char
    outputs = torch.zeros((1, num_chars, x.shape[2]))
    t_outputs, (cell_state, hidden) = lstm(x.float())

    for c in range(num_chars):
        #print(hidden.shape)
        output_tmp = torch.softmax(lstm.proj(hidden[-1]),dim=1)
        top_ind = torch.argmax(output_tmp,dim=1)[0]
        tmp = torch.zeros_like(x[:,0,:]).cuda()
        tmp[:,top_ind] = 1
        outputs[:,c] = tmp
        #print("tmp:", tmp.shape)
        t_outputs, (cell_state, hidden) = lstm(tmp.view(1, 1, 100), cell_state, hidden)

    return outputs

def topk_sampling_lstm_multilayer(lstm, x, num_chars):
    # x -- b x onehot_char
    outputs = torch.zeros((1,num_chars,x.shape[2]))
    t_outputs, (cell_state, hidden) = lstm(x.float())

    for c in range(num_chars):
        output_vals, output_ind = torch.topk(lstm.proj(hidden[-1]), 5, dim=1)
        output_tmp = torch.softmax(output_vals,dim=1)
        top_ind = torch.multinomial(output_tmp[0], 1)[0]
        tmp = torch.zeros_like(x[:,0,:]).cuda()
        tmp[:,output_ind[0,top_ind]] = 1
        outputs[:,c] = tmp

        t_outputs, (cell_state, hidden) = lstm(tmp.view(1, 1, 100), cell_state, hidden)

    return outputs

# Sample usage
sample_text = "O Romeo, wherefore art thou"
inp = train_dataset.char_tensor(sample_text)
sample_input = Variable(inp).cuda().unsqueeze(0).float()
print("\n", sample_input.shape)

out_test = topk_sampling_lstm_multilayer(model, sample_input, 500)[0]
out_char_index = torch.argmax(out_test, dim=1).detach().cpu().numpy()
out_chars = sample_text + "".join([train_dataset.all_characters[i] for i in out_char_index])
print("Top-K sampling -----------------")
print(out_chars)
print("\n\n ------------------------------------------------ \n\n")
out_test = greedy_sampling_lstm_multilayer(model, sample_input, 500)[0]
out_char_index = torch.argmax(out_test, dim=1).detach().cpu().numpy()
out_chars = sample_text + "".join([train_dataset.all_characters[i] for i in out_char_index])
print("Greedy sampling ---------------")
print(out_chars)



 torch.Size([1, 27, 100])
Top-K sampling -----------------
O Romeo, wherefore art thou this absolute,
Stand at your grations and made you are to
my father.

PETRUCHIO:
Not with that thou a man on him; I cannot,
So shower he willingly friends o' the harrius
Which hath trather shall be assay.

CORIOLANUS:
Ay, at all their horse.' 'Tis minis a sight:
But that said o' the struck that here, so fair she
This man's liege, but I thou att my heart.

BUCKINGHAM:
I do thou art all that
I'll there, my fare head a free hopes of you.

Provost
Must have been to to spake thee at his highness;
An


 ------------------------------------------------ 


Greedy sampling ---------------
O Romeo, wherefore art thou shalt see the seas,
And therefore he shall be so many things and so,
That I will be so much a thousand for thee,
And therefore he shall be so many things and so,
That I will be so much a thousand for thee,
And therefore he shall be so many things and so,
That I will be so much a thousand for thee

# Task 2: Character generation transformer network implementation
Our simple transformer-like network will take as input a sequence of characters and predict the next character in the sequence. To ensure an efficient training procedure, masked attention modules will be used as in the [GPT model](https://s3-us-west-2.amazonaws.com/openai-assets/research-covers/language-unsupervised/language_understanding_paper.pdf).

For this task you must implement the Scaled dot product attention module and the Masked multi-head attention module. Both of these modules are described in the [Attention is all you need](https://arxiv.org/pdf/1706.03762.pdf) paper (See Figure 2 in the paper as well as Sections 3.2.1, 3.2.2 and 3.2.3). They are the core operations of transformers. As we will use our model for text generation also add the masking operation shown as (mask opt.) in Figure 2, implemented as AttentionMasking in the code.

**Implement the modules in the ScaledDotProductAttention class and the MultiHeadAttention class.**

Read the GPT paper and the Attention is all you need paper for a better understanding of the components. For a more high level overview, this [post](https://jalammar.github.io/illustrated-gpt2/) may also be helpful.


In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import Dataset
import math
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=1000):
        super().__init__()
        # Positional encoding adds the positional information to the
        # embedding. Without it the model would not be able to differentiate
        # between different characters orders such as between "dog" and "god".
        position = torch.arange(max_len).unsqueeze(1).float()
        div_term = 10000.0**(torch.arange(0,d_model,2).float()/d_model)
        print(div_term.shape)
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position / div_term)
        pe[:, 1::2] = torch.cos(position / div_term)
        pe = pe.unsqueeze(0)
        self.pe = pe.cuda()
        self.pe.requires_grad = False

    def forward(self, x):
        p = self.pe[:, :x.size(1)]
        return p

class AttentionMasking(nn.Module):
    def __init__(self, max_len):
        super(AttentionMasking, self).__init__()
        self.register_buffer("mask", torch.tril(torch.ones(max_len, max_len))
                                     .view(1, 1, max_len, max_len))
    def forward(self,x):
        length = x.shape[-1]
        out = x.masked_fill(self.mask[:,:,:length,:length] == 0, float('-inf'))
        return out


class ScaledDotProductAttention(nn.Module):
    def __init__(self, max_len):
        super(ScaledDotProductAttention, self).__init__()
        self.softmax = nn.Softmax(dim=-1)
        # Multiply with an upper triangular
        # matrix of dimensions (length x length) after the scale operation
        # in Figure 2 of the paper.
        self.mask_opt = AttentionMasking(max_len)

    def forward(self, q, k, v):
        # length = number of input tokens
        batch_size,num_heads,length,num_neuron = k.size()
        # TODO: Implement the scaled dot product attention as described in
        # the Attention is all you need paper in Equation 1

        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(q.size(-1))
        scores = self.mask_opt(scores)
        attention_weights = self.softmax(scores)
        output = torch.matmul(attention_weights, v)
        return output


class MultiHeadAttention(nn.Module):
    def __init__(self, dim_model, num_neuron, n_head, max_len):
        super(MultiHeadAttention, self).__init__()
        self.dim_model = dim_model
        self.n_head = n_head
        self.num_neuron = num_neuron

        # TODO: Initialize the ScaledDotProductAttention and other
        # necessary components.
        self.sdp_attention = ScaledDotProductAttention(max_len)
        self.query_projection = nn.Linear(dim_model, num_neuron * n_head)
        self.key_projection = nn.Linear(dim_model, num_neuron * n_head)
        self.value_projection = nn.Linear(dim_model, num_neuron * n_head)
        self.concat_projection = nn.Linear(num_neuron * n_head, dim_model)

    def split(self,tensor):
        batch_size, length, total_dim = tensor.size()
        # Reshape the tensor to enable the use in
        # the ScaledDotProductAttention module
        split_tensor = tensor.view(batch_size, length, self.n_head, self.num_neuron).transpose(1,2)
        return split_tensor

    def concat(self,tensor):
        batch_size, num_heads, length, num_neuron = tensor.size()
        # Reshape the tensor to its original size before the split operation.
        concat_tensor = tensor.transpose(1,2).contiguous().view(batch_size, length, self.n_head*self.num_neuron)
        return concat_tensor

    def forward(self, q, k, v):
        # TODO: Implement the Masked Multi-head attention module as described in the
        # Attention is all you need paper in Figure 1 and Section 3.2.2.
        batch_size = q.shape[0]
        #print("q before: ", q.shape)
        q = self.query_projection(q)
        k = self.key_projection(k)
        v = self.value_projection(v)
        #print("q after proj: ", q.shape)

        q = self.split(q)
        k = self.split(k)
        v = self.split(v)
        #print("q after split: ", q.shape)

        attention_output = self.sdp_attention(q, k, v)
        #print("1 \t", attention_output.shape)
        attention_output = self.concat(attention_output)
        #print("2 \t",attention_output.shape)
        output = self.concat_projection(attention_output)
        return output

class PositionFeedForwardNet(nn.Module):
    def __init__(self, dim_model):
        super(PositionFeedForwardNet, self).__init__()
        self.ff_net1 = nn.Linear(dim_model, dim_model*4)
        self.ff_net2 = nn.Linear(dim_model*4, dim_model)

    def forward(self,x):
        ff_out = self.ff_net1(x)
        ff_out = torch.nn.functional.relu(ff_out)
        ff_out = self.ff_net2(ff_out)
        return ff_out


class TransformerBlock(nn.Module):
    def __init__(self, dim_model, num_neuron, n_head, max_len):
        super(TransformerBlock, self).__init__()
        self.mha = MultiHeadAttention(dim_model, num_neuron, n_head, max_len)
        self.l_norm = torch.nn.LayerNorm(dim_model)
        self.l_norm2 = torch.nn.LayerNorm(dim_model)
        self.ff_net = PositionFeedForwardNet(dim_model)
        # b, len_seq, n_head, num_neuron

    def forward(self, x):
      # A Transformer block as described in the
      # Attention is all you need paper. In Figure 1 the transformer
      # block is marked with a gray rectangle right of the text "Nx"
      _x = x
      mha1 = self.mha(x,x,x)
      lnorm = self.l_norm(_x + mha1)
      _x = lnorm
      ff_out = self.ff_net(lnorm)
      out = self.l_norm2(ff_out + _x)

      return out

class TransformerSimple(nn.Module):
    def __init__(self, seq_length, input_dim, output_dim,
                 batch_size):
        super(TransformerSimple, self).__init__()
        num_neuron = 64
        n_head = 8
        dim_model=256
        max_len = 512
        self.start_embedding = nn.Embedding(input_dim, dim_model)

        self.pos_embedding = PositionalEncoding(dim_model)

        # b x l x c*n_head
        self.t_block1 = TransformerBlock(dim_model, num_neuron, n_head, max_len)
        self.t_block2 = TransformerBlock(dim_model, num_neuron, n_head, max_len)
        self.t_block3 = TransformerBlock(dim_model, num_neuron, n_head, max_len)
        self.t_block4 = TransformerBlock(dim_model, num_neuron, n_head, max_len)
        self.t_block5 = TransformerBlock(dim_model, num_neuron, n_head, max_len)

        #self.t_block6 = TransformerBlock(dim_model, num_neuron, n_head, max_len)
        #self.t_block7 = TransformerBlock(dim_model, num_neuron, n_head, max_len)
        #self.t_block8 = TransformerBlock(dim_model, num_neuron, n_head, max_len)

        #self.out_layer_1 = nn.Linear(dim_model, dim_model)
        self.output_layer = nn.Linear(dim_model,output_dim)

    def forward(self,x):
      # x - Tensor - (b, seq_len)
      # Embeds the input tensor from tokens to features
      s_emb = self.start_embedding(x)
      # Adds positional embeddings
      p_emb = self.pos_embedding(s_emb)
      b_out = p_emb + s_emb
      # Transformer blocks - You can experiment with varying depth
      # For example GPT uses 12 blocks but this might be a bit memory intensive
      b_out = self.t_block1(b_out)
      b_out = self.t_block2(b_out)
      b_out = self.t_block3(b_out)
      b_out = self.t_block4(b_out)
      b_out = self.t_block5(b_out)

      #b_out = self.t_block6(b_out)
      #b_out = self.t_block7(b_out)
      #b_out = self.t_block8(b_out)

      # Output mapping to a classification of output tokens
      # For each token the network tries to predict the next token
      # based only on the previous tokens.
      # Output shape: (b x seq_len x vocabulary_size)
      out = self.output_layer(b_out)

      return out


## Dataset class


In [5]:
import unidecode
import string
import random
from torch.autograd import Variable
from torch.utils.data import Dataset

class TextDataset(Dataset):
    def __init__(self, chunk_len=200, padded_chunks=False):
        # Character based dataset
        dataset_path = "./input.txt"
        # The tokens in the vocabulary (all_characters)
        # are just the printable characters of the string class
        self.all_characters = string.printable
        self.n_characters = len(self.all_characters)
        # Maps characters to indices
        self.char_dict = {x:i for i,x in enumerate(self.all_characters)}
        self.file, self.file_len = self.read_file(dataset_path)
        # Sequence length of the input
        self.chunk_len = chunk_len
        self.encoded_file = [self.char_dict[x] for x in self.file]

    def read_file(self,filename):
        file = unidecode.unidecode(open(filename).read())
        return file, len(file)

    def encode_text(self,in_str):
        # in_str - input sequence - String
        # Returns - in_str mapped to tokens in char_dict
        tensor = torch.LongTensor([self.char_dict[x] for x in in_str])
        return tensor

    def __getitem__(self, idx):
        inp, target = self.get_random_text()
        return {"input":inp, "target":target}

    def __len__(self):
        return 10000

    def get_random_text(self):
        # Pick a random string of length self.chunk_len from the dataset
        start_index = np.random.randint(0, self.file_len - self.chunk_len)
        end_index = start_index + self.chunk_len + 1
        chunk = self.encoded_file[start_index:end_index]
        # input_tokens - random sequence of tokens from the dataset
        input_tokens = torch.LongTensor(chunk[:-1])
        # target - input token sequence shifted by 1
        # the idea is to predict next token for each token in the input sequence
        # therefore if the input is [1,2,3,4] the target is [2,3,4,5]
        target = torch.LongTensor(chunk[1:])
        input_tokens = input_tokens.cuda()
        target = target.cuda()
        return input_tokens, target


## Character sampling

To generate text the network must predict the next character in a sequence, however networks do not produce a single character but rather estimate the likelihood for each possible character. Sampling characters from the network output can be done in different ways with common ones being the Greedy sampling process and Top-K sampling.

In the simple greedy sampling method the network takes a text prompt as input and generates an additional N tokens by always taking the token with the highest prediction score as the next token.

In the Top-K sampling, randomness is added to the sampling process as the network samples from K most likely predicitons at each step. This alleviates the problem of generative models repeating text but may generate incorrect text by sampling inappropriate tokens.


In [6]:
def topk_sampling_iter_transformer(model, x, num_chars, chunk_len, output_token):
    # x -- b x onehot_char
    # x = b x l
    outputs = torch.zeros((1,num_chars))
    inp = x

    for t in range(num_chars):
        # b x onehot_char
        output = model(inp.long())[0,-1:]
        #output = torch.softmax(output, dim=1)
        # b x 3
        output_vals, output_ind = torch.topk(output, 5, dim=1)
        # 3 -> int
        output_vals = torch.softmax(output_vals, dim=1)
        top_ind = torch.multinomial(output_vals[0], 1)[0]
        # int
        out_char_index = output_ind[0,top_ind]
        # int -> 1
        out_char_index = torch.ones(1).cuda() * out_char_index

        outputs[:,t] = out_char_index.item()
        if inp.shape[1] > chunk_len:
          inp = torch.cat((inp[:,1:], out_char_index.unsqueeze(0)), dim=1)
        else:
          inp = torch.cat((inp, out_char_index.unsqueeze(0)), dim=1)

    return outputs


def greedy_sampling_iter_transformer(model, x, num_chars, chunk_len, output_token):
    # x -- shape (batch, tokens in x)
    outputs = torch.zeros((1,num_chars))
    inp = x

    for t in range(num_chars):
        # b x l x onehot_char
        output = model(inp.long())[0,-1:]
        output = torch.softmax(output, dim=1)
        out_char_index = torch.argmax(output, dim=1)
        outputs[:,t] = out_char_index.item()
        if inp.shape[1] > chunk_len:
          inp = torch.cat((inp[:,1:], out_char_index.unsqueeze(0)), dim=1)
        else:
          inp = torch.cat((inp, out_char_index.unsqueeze(0)), dim=1)

    return outputs


## Transformer model training

With a correct implementation you should get sensible text generation results with the set parameters, however you should experiment with various parameters,
especially with the sequence length (chunk_len) used during training.

In [20]:
from tqdm import tqdm
import torch.optim as optim

#Sample parameters, use whatever you see fit.
batch_size = 256
chunk_len = 64
train_dataset = TextDataset(chunk_len=chunk_len)
trainloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, num_workers=0)

input_dim = train_dataset.n_characters
output_dim = train_dataset.n_characters
learning_rate = 0.0006

model = TransformerSimple(chunk_len, input_dim, output_dim,batch_size)
model.train()
model.cuda()

criterion = nn.CrossEntropyLoss()

optimizer = optim.Adam(model.parameters(), lr=learning_rate)
epochs=50

for epoch in range(epochs):
    with tqdm(total=len(trainloader.dataset), desc ='Training - Epoch: '+str(epoch)+"/"+str(epochs), unit='chunks') as prog_bar:
        for i, data in enumerate(trainloader, 0):
            # inputs - shape (batch_size, chunk_len) - Tensor of vocabulary tokens
            inputs = data['input'].long()
            # labels - shape (batch_size, chunk_len) - Tensor of vocabulary tokens
            labels = data['target'].long()

            optimizer.zero_grad()
            outputs = model(inputs)
            target_t = labels
            loss = criterion(outputs.view(inputs.shape[0]*inputs.shape[1],-1),target_t.view(labels.shape[0]*labels.shape[1]))
            loss.backward()
            optimizer.step()
            prog_bar.set_postfix(**{'run:': "Transformer", 'lr': learning_rate,
                                    'loss': loss.item()
                                    })
            prog_bar.update(batch_size)

        # Intermediate text output
        """
        sample_texts = ["What authority surfeits on",
                        "I say unto you, what he hath done famously, he did it to that end:",
                        "That in submission will return to us: And then, as we have ta'en the sacrament,"]
        output_token = torch.zeros(1,1).cuda()
        output_token[0,0] = train_dataset.n_characters-1
        print("Top-K sampling")
        for sample_text in sample_texts:
            sample_encoding = train_dataset.encode_text(sample_text)
            sample_input = Variable(sample_encoding).cuda().unsqueeze(0).long()

            #out_test= greedy_sampling_iter_transformer(model, sample_input, 400, chunk_len, output_token)[0]
            out_test= topk_sampling_iter_transformer(model, sample_input, 400, chunk_len, output_token)[0]
            out_char_index = out_test.long().detach().cpu().numpy()
            out_chars = sample_text+" "+"".join([train_dataset.all_characters[i] for i in out_char_index])
            print("----------------------------------------")
            print(out_chars)"""




torch.Size([128])


Training - Epoch: 0/50: 10240chunks [00:08, 1201.30chunks/s, loss=2.49, lr=0.0006, run:=Transformer]                      
Training - Epoch: 1/50: 10240chunks [00:08, 1192.50chunks/s, loss=2.36, lr=0.0006, run:=Transformer]                      
Training - Epoch: 2/50: 10240chunks [00:08, 1172.88chunks/s, loss=2.11, lr=0.0006, run:=Transformer]                      
Training - Epoch: 3/50: 10240chunks [00:08, 1159.13chunks/s, loss=2.09, lr=0.0006, run:=Transformer]                      
Training - Epoch: 4/50: 10240chunks [00:08, 1180.10chunks/s, loss=1.89, lr=0.0006, run:=Transformer]                      
Training - Epoch: 5/50: 10240chunks [00:08, 1188.31chunks/s, loss=1.78, lr=0.0006, run:=Transformer]                      
Training - Epoch: 6/50: 10240chunks [00:08, 1197.58chunks/s, loss=1.72, lr=0.0006, run:=Transformer]                      
Training - Epoch: 7/50: 10240chunks [00:08, 1214.25chunks/s, loss=1.57, lr=0.0006, run:=Transformer]                      
Training - Epoch

## Text sampling - Transformers


In [21]:
sample_text = "Here's to my love! O true apothecary! Thy drugs are quick."
sample_encoding = train_dataset.encode_text(sample_text)
sample_input = Variable(sample_encoding).cuda().unsqueeze(0).long()
output_token = torch.zeros(1,1).cuda()
output_token[0,0] = train_dataset.n_characters-1

In [22]:
print("Top-K Sampling:")
out_test= topk_sampling_iter_transformer(model, sample_input, 400, chunk_len, output_token)[0]
out_char_index = out_test.long().detach().cpu().numpy()
out_chars = sample_text+" "+"".join([train_dataset.all_characters[i] for i in out_char_index])
print("----------------------------------------")
print(out_chars)
print()
print("Greedy Sampling")
out_test= greedy_sampling_iter_transformer(model, sample_input, 400, chunk_len, output_token)[0]
out_test= topk_sampling_iter_transformer(model, sample_input, 400, chunk_len, output_token)[0]
out_char_index = out_test.long().detach().cpu().numpy()
out_chars = sample_text+" "+"".join([train_dataset.all_characters[i] for i in out_char_index])
print("----------------------------------------")
print(out_chars)


Top-K Sampling:
----------------------------------------
Here's to my love! O true apothecary! Thy drugs are quick. 

GLOUCESTER:
I do believe this wretch charrged willl death.


LUCIO:
These greatsorse satisfy to his person, and himself
How sailty too beast all posssed; when he stands he
gentle haste, with a winged bloow the cause of this place,
Which, wert thou come
Wate home: though I have sperit
In posssible ass stem, intrussed me
IntreaN, and alll the world, thou wilt plead,
To fear off the starm of hell a

Greedy Sampling
----------------------------------------
Here's to my love! O true apothecary! Thy drugs are quick. 

KING EDWARD IV:
No, Greough: nor go fine hath as steep ass
Werst fear the pent to off an oath?
O my wife, thou seen me not? Well will are you?


Pedast:
Your worrrong me nor grant to you come;
For such ffair aunt or a place of the dead.
I wish me tome means, and I him, and welll
Condemned, to me to them and the placefe of the world:
When I shalll, be sadd, the s

In [16]:
!pip install numba

from numba import cuda
device = cuda.get_current_device()
device.reset()

