In [1]:
import torch
from torch import nn
from torch.nn import functional as F
import numpy
import pandas as pd

In [2]:
# Constants
EMBEDDING_DIM = 720
BLOCK_SIZE = 16
N_HEADS = 16
device = torch.device('mps')

In [3]:
EMBEDDING_DIM // N_HEADS

45

# Loading text and removing unnecessary characters.

In [4]:
with open('input.txt', 'r', encoding='utf-8') as f:
    text = f.read()

remove = ['-', '$', '&']
for rm in remove:
    text = text.replace(rm, '')
words = text.split()

VOCAB_SIZE = len(set(words))
vocab = sorted(list(set(words)))

In [5]:
VOCAB_SIZE

25454

# Training and testing dataset.

In [6]:
vocab_int = {word:idx for idx, word in enumerate(vocab)}
int_vocab = {idx:word for idx, word in enumerate(vocab)}

word_to_int_array = [vocab_int[word] for word in words]

In [7]:
list_size = len(words)
inputs = list()
targets = list()
for i in range(10000):
    start = numpy.random.randint(1,
                                 list_size - BLOCK_SIZE - 1
                                )
    inputs.append(word_to_int_array[start:start+BLOCK_SIZE])
    targets.append(word_to_int_array[start+BLOCK_SIZE+1])


inputs = torch.tensor(inputs, 
                      dtype = torch.long, 
                      device = device
                     )
targets = torch.tensor(targets, 
                       dtype = torch.long, 
                       device = device
                      )    

# Transformer Blocks

## Embedding block

In [8]:
class EmbeddingBlock(nn.Module):
    """
    Embedding Block:

    Parameters:
    -----------
    vocab_size(int): Size of vocabulary of document
    embd(int): Size of embedding dimension
    block_size(int): Number of elements in each row of input

    Description:
    ------------

    To represent each number(word) with a unique sequence of numbers
    which the computer can understand. Along with this, positional 
    information is also represented in the same higher dimension and 
    added to the embedding tensor.

    Input dim: B,T
    Output dim: B,T,H (H = embedding dimension)
    """
    def __init__(self, vocab_size, embd, block_size, dim_journey):
        super().__init__()
        self.dim_journey = dim_journey
        # Embedding layer
        self.embedding_layer = nn.Embedding(num_embeddings = vocab_size, 
                                            embedding_dim =  embd
                                           )
        # Positional embedding layer
        self.pos_layer = nn.Embedding(num_embeddings = block_size, 
                                      embedding_dim = embd
                                     )

    def forward(self, x):
        B, T = x.shape
        if self.dim_journey:
            print("\x1b[22;31mEmbedding Block\x1b[0m")
            print(f"\x1b[32mInput dimension:\x1b[0m {x.shape}")
        embeddings = self.embedding_layer(x)
        pos = self.pos_layer(torch.arange(T, 
                                          device = device))
        token_embd = embeddings + pos

        if self.dim_journey:
            print(f"\x1b[32mDimension of Embedding layer:\x1b[0m {embeddings.shape}")
            print(f"\x1b[32mDimensions of Positional layer:\x1b[0m {pos.shape}")
            print(f"\x1b[32mDimensions of Token embeddings:\x1b[0m {token_embd.shape}")

        return token_embd

In [9]:
m = EmbeddingBlock(vocab_size = VOCAB_SIZE, embd = EMBEDDING_DIM, block_size = BLOCK_SIZE, dim_journey=True)
m.to(device)
temp = m(inputs[0:3])

[22;31mEmbedding Block[0m
[32mInput dimension:[0m torch.Size([3, 16])
[32mDimension of Embedding layer:[0m torch.Size([3, 16, 720])
[32mDimensions of Positional layer:[0m torch.Size([16, 720])
[32mDimensions of Token embeddings:[0m torch.Size([3, 16, 720])


## Encoder Block

### Multi-Head Attention Block

Head Block -> Multi-Head Block(Feed Forward Block)

#### Feed Forward Block

In [10]:
class FeedForwardBlock(nn.Module):
    """
    FeedForward Block:

    Parameters:
    -----------
    embd(int): Size of embedding dimension
    
    Description:
    ------------
    A simple linear layer.
    """
    
    def __init__(self, embd):
        super().__init__()
        self.net = nn.Sequential(nn.Linear(in_features = embd, 
                                          out_features = embd),
                                 nn.ReLU()
                                )

    def forward(self, x):
        return self.net(x)

#### Head Block

In [11]:
class HeadBlock(nn.Module):
    """
    Head Block:

    Parameters:
    -----------
    head_size(int): Number of heads in self attention
    embd(int): Embedding dimension
    block_size(int): Number of elements in single row
    dim_journey(bool): Explanation of dimension conversions through
    each block and within each block as well.
    
    Description:
    ------------

    Core block of Transformer. Here is where the attention 
    mechanism is implemented. This is a single "head" of the 
    transformer. In the Multi-Head block a number of these heads are
    created and each learns different information about the text.
    """

    def __init__(self, head_size, embd, block_size, dim_journey):
        super().__init__()
        self.dim_journey = dim_journey
        self.key = nn.Linear(in_features = embd, 
                             out_features = head_size, 
                             bias = False)
        self.query = nn.Linear(in_features = embd, 
                               out_features = head_size, 
                               bias = False)
        self.value = nn.Linear(in_features = embd, 
                               out_features = head_size, 
                               bias = False)
        self.register_buffer('tril', 
                             torch.tril(torch.ones(block_size, 
                                                   block_size)))

        self.dropout = nn.Dropout(0.2)

    def forward(self, x):
        B,T,C = x.shape            

        k = self.key(x)
        q = self.query(x)
        v = self.value(x)

        weights = q @ k.transpose(-2, -1) * C**-0.5
        # Masking makes it a decoder block
        weights = weights.masked_fill(self.tril[:T, :T] == 0, 
                                      float('-inf'))
        weights = F.softmax(weights, dim = -1)
        weights = self.dropout(weights)

        out = weights @ v
        if self.dim_journey:
            print("\x1b[22;31mHead Block\x1b[0m")
            print(f"\x1b[32mDimensions of input:\x1b[0m {x.shape}")
            print(f"\x1b[32mDimensions of key:\x1b[0m {k.shape}")
            print(f"\x1b[32mDimensions of query:\x1b[0m {q.shape}")
            print(f"\x1b[32mDimensions of value:\x1b[0m {v.shape}")
            print(f"\x1b[32mDimensions of weights after q @ k.T:\x1b[0m {weights.shape}")
            print(f"\x1b[32mDimensions of out(weights @ v):\x1b[0m {out.shape}")

        return out

#### Multi-Head Attention Block

In [12]:
EMBEDDING_DIM / N_HEADS

45.0

In [13]:
class MultiHeadBlock(nn.Module):
    """
    Multi-Head Block:

    Multiple attention heads running in parallel.

    Parameters:
    -----------

    Description:
    ------------
    """

    def __init__(self, embd, n_head, block_size, dim_journey):
        super().__init__()
        head_kwargs = {"head_size": embd // n_head,
                      "embd": embd,
                      "block_size": block_size,
                      "dim_journey": dim_journey}
        self.dim_journey = dim_journey
        self.heads = nn.ModuleList([HeadBlock(**head_kwargs) for _ in range(n_head)])

    def forward(self, x):
        concat = torch.cat([h(x) for h in self.heads], dim=-1)
        if self.dim_journey:
            print("\x1b[22;31mMultiHead Block\x1b[0m")
            print(f"Dimensions of output: {concat.shape}")
        return concat

In [14]:
class EncoderBlock(nn.Module):
    """
    Encoder Block:

    Parameters:
    -----------

    Description:
    ------------
    """

    def __init__(self):
        super().__init__()
    pass

In [15]:
class GPTModel(nn.Module):
    """
    Parameters:
    -----------
    vocab_size(int): Size of vocabulary of document.
    embd(int): Size of embedding dimension.
    block_size(int): Number of elements in each row of input.
    n_head(int): Number of heads for Multi-head Attention block.
    dim_journey(bool): Explanation of dimension conversions through
    each block and within each block as well.

    Description:
    ------------

    Embedding layer:
    
    """
    
    def __init__(self, vocab_size, embd, block_size, n_head, dim_journey = False):
        super().__init__()
        # Embedding layer
        self.embedding_layer = EmbeddingBlock(vocab_size = vocab_size, 
                                              embd = embd, 
                                              block_size = block_size, 
                                              dim_journey = dim_journey)
        self.sa_heads = MultiHeadBlock(n_head = n_head, 
                                       embd = embd, 
                                       block_size = block_size, 
                                       dim_journey = dim_journey)
        # Linear head layer
        self.lm_head = nn.Linear(in_features = embd, 
                                 out_features = vocab_size)

    def forward(self, x):
        x = self.embedding_layer(x)
        x = self.sa_heads(x)
        logits = self.lm_head(x)

        return logits

In [16]:
g = GPTModel(vocab_size = VOCAB_SIZE, 
             embd = EMBEDDING_DIM, 
             block_size = BLOCK_SIZE, 
             n_head = N_HEADS,
             dim_journey=True)
g.to(device)
temp = g(inputs[0:1])

[22;31mEmbedding Block[0m
[32mInput dimension:[0m torch.Size([1, 16])
[32mDimension of Embedding layer:[0m torch.Size([1, 16, 720])
[32mDimensions of Positional layer:[0m torch.Size([16, 720])
[32mDimensions of Token embeddings:[0m torch.Size([1, 16, 720])
[22;31mHead Block[0m
[32mDimensions of input:[0m torch.Size([1, 16, 720])
[32mDimensions of key:[0m torch.Size([1, 16, 45])
[32mDimensions of query:[0m torch.Size([1, 16, 45])
[32mDimensions of value:[0m torch.Size([1, 16, 45])
[32mDimensions of weights after q @ k.T:[0m torch.Size([1, 16, 16])
[32mDimensions of out(weights @ v):[0m torch.Size([1, 16, 45])
[22;31mHead Block[0m
[32mDimensions of input:[0m torch.Size([1, 16, 720])
[32mDimensions of key:[0m torch.Size([1, 16, 45])
[32mDimensions of query:[0m torch.Size([1, 16, 45])
[32mDimensions of value:[0m torch.Size([1, 16, 45])
[32mDimensions of weights after q @ k.T:[0m torch.Size([1, 16, 16])
[32mDimensions of out(weights @ v):[0m torch.Size(

In [17]:
print(inputs[0:1].shape, temp.shape)

torch.Size([1, 16]) torch.Size([1, 16, 25454])
