In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.optim import Adam
from torch.utils.data import TensorDataset, DataLoader

import lightning as L

In [26]:
token_to_id = {'what' : 0,
                'is' : 1,
                'StatQuest' : 2,
                'Awesome' : 3, 
                '<EOS>':4
            }
id_to_token = dict(map(reversed, token_to_id.items()))

inputs = torch.tensor([[token_to_id['what'],
                        token_to_id['is'],
                        token_to_id['StatQuest'],
                        token_to_id['<EOS>'],
                        token_to_id['Awesome']],

                        [token_to_id['StatQuest'],
                        token_to_id['is'],
                        token_to_id['what'],
                        token_to_id['<EOS>'],
                        token_to_id['Awesome']]])
 
labels = torch.tensor([[token_to_id['is'],
                        token_to_id['StatQuest'],
                        token_to_id['<EOS>'],
                        token_to_id['Awesome'],
                        token_to_id['<EOS>']],

                        [token_to_id['is'],
                        token_to_id['what'],
                        token_to_id['<EOS>'],
                        token_to_id['Awesome'],
                        token_to_id['<EOS>']]])

dataset = TensorDataset(inputs, labels)
dataloader = DataLoader(dataset)
 

PE(pos,2i) = sin(pos/1000^(2i/d_model))

d_model = number of embeddings for each token

pos = position of the token

In [27]:
class PositionalEncoding(nn.Module):
    # max_len is the maximum number of tokens our Transformer can process. 
    # In real life, you would set them to much much larger values
    def __init__(self, max_len = 6  , d_model = 2):
        super().__init__()
        # (6,2)
        pe = torch.zeros(max_len, d_model)
 
        # position = [[0.],[1.],[2.],[3.],[4.],[5.]]
        position = torch.arange(0, max_len, 1).float().unsqueeze(1) 
        # embedding = [0.]
        embedding = torch.arange(0, d_model, 2).float()

        div_term = 1/torch.tensor(1000) ** (embedding / d_model)

        # div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        self.register_buffer('pe', pe)

    def forward(self, word_embeddings):
        return word_embeddings + self.pe[:word_embeddings.size(0), :]

Attention (Q, K, V) = SoftMax ((Q.K^T)/sqrt(d_k) + M) V


In [28]:
class Attention(nn.Module):
    def __init__(self, d_model = 2):
        super().__init__()

        self.W_q = nn.Linear(in_features=d_model, out_features=d_model, bias=False) # Shape = (6,2)
        self.W_k = nn.Linear(in_features=d_model, out_features=d_model, bias=False) # Shape = (6,2)
        self.W_v = nn.Linear(in_features=d_model, out_features=d_model, bias=False) # Shape = (6,2)

        # sequence_length
        self.row_dim = 0
        # embedding_size
        self.col_dim = 1

    def forward(self, encodings_for_q, encodings_for_k, encodings_for_v, mask=None):
        q = self.W_q(encodings_for_q) # Shape = (6,2)
        k = self.W_q(encodings_for_k) # Shape = (6,2) 
        v = self.W_q(encodings_for_v) # Shape = (6,2)

        # Multiplying Query^T and Key (sims -> similarity_score)
        # q -> Shape = (6,2)
        # k.transpose -> Shape = (2,6)
        # sims -> Shape = (6,6) representing similarity scores b/w all pairs of query and key
        sims = torch.matmul(q, k.transpose(dim0 = self.row_dim, dim1 = self.col_dim))

        # k.size(self.col_dim) -> gonna return the dimnsion of column, that is, 2
        # scaling the sims
        # Shape = (6,6)
        scaled_sims = sims/torch.tensor(k.size(self.col_dim)**0.5)

        # for autoregressiveness
        if mask is not None:
            scaled_sims = scaled_sims.masked_fill(mask=mask, value=-1e9)
        # For scaled_sims = (4,4)
        # scaled_sims = [[ 1.0,  0.5,  0.8,  0.2],
        #               [ 0.3,  1.2, -0.1,  0.9],
        #               [ 0.7,  0.4,  1.5, -0.3],
        #               [-0.2,  0.6,  0.1,  1.1]]
        
        # mask = [[False,  True,  True,  True],
        #        [False, False,  True,  True],
        #        [False, False, False,  True],
        #        [False, False, False, False]] 

        # scaled_sims = [[ 1.0, -1e9, -1e9, -1e9],
        #               [ 0.3,  1.2, -1e9, -1e9],
        #               [ 0.7,  0.4,  1.5, -1e9],
        #               [-0.2,  0.6,  0.1,  1.1]]

        # dim = self.col_dim -> means along axis 1 that is row, that is [ 1.0, -1e9, -1e9, -1e9]
        attention_percent = F.softmax(scaled_sims, dim=self.col_dim)

        # attention_percent = [[1.000, 0.000, 0.000, 0.000],
        #                     [0.289, 0.711, 0.000, 0.000],
        #                     [0.252, 0.187, 0.561, 0.000],
        #                     [0.121, 0.270, 0.164, 0.445]]

        # attention_percent -> Shape = (4,4)
        # v -> Shape = (4,2) for the example
        # v =  [[2.0, 1.0],    # Value for token 0
        #       [0.5, 3.0],    # Value for token 1
        #       [1.5, -1.0],   # Value for token 2
        #       [-0.5, 2.5]]   # Value for token 3
        # attention_score -> Shape = (4,2)
        attention_scores = torch.matmul(attention_percent, v)
        # attention_scores = [[2.0000, 1.0000],
        #                    [0.9335, 2.4220],
        #                    [1.4390, 0.2520],
        #                    [0.4005, 1.8795]]
        # 4 attention scores for 4 inputs
        
        return attention_scores

In [39]:
class DecoderOnlyTransformer(L.LightningModule):
    def __init__(self, num_tokens=4, d_model=2, max_len=6):
        super().__init__()

        # forms an Embedding layer 
        # num_embeddings = how many rows the lookup table should have
        # embedding_dim = number of values we want to represent each token (if 2 -> token 0 would look like [x,y])

        # Weights initialized by Embedding Layer
        # self.we.weight = [[0.5, -0.2],   # Token 0
        #                   [1.0,  0.8],   # Token 1
        #                   [-0.3, 0.4],   # Token 2
        #                   [0.1, -0.5]]   # Token 3
        # word_embeddings & self.we = [[-0.3,  0.4], # Position 0 (Token 2)
        #                             [1.0,   0.8],  # Position 1 (Token 1)
        #                             [0.5,  -0.2],  # Position 2 (Token 0)
        #                             [0.1,  -0.5],  # Position 3 (Token 3)
        #                             [1.0,   0.8],  # Position 4 (Token 1)
        #                             [0.5,  -0.2]]  # Position 5 (Token 0)

        self.we = nn.Embedding(num_embeddings=num_tokens, embedding_dim=d_model) # Shape = (6,2)

        self.pe = PositionalEncoding(d_model=d_model, max_len=max_len) # Shape = (6,2)
        # positional encoding basially gives the sin, cos values, precomputed values
        # self.pe.pe = [[ 0.0000,  1.0000],  # Position 0
        #           [ 0.8415,  0.5403],  # Position 1
        #           [ 0.9093, -0.4161],  # Position 2
        #           [ 0.1411, -0.9899],  # Position 3
        #           [-0.7568, -0.6536],  # Position 4
        #           [-0.9589,  0.2837]]  # Position 5
        # in PositionalEncoding() -> word_embeddings.size(0) will be 6 for this given scenario 
        # 
        # self.pe = position_encoded = word_embeddings(self.we) + self.pe.pe
        # # self.pe = [[-0.3 + 0.0000,  0.4 + 1.0000],     ---->     [[-0.3000,  1.4000]
        #             [1.0 + 0.8415,   0.8 + 0.5403],      ---->     [ 1.8415,  1.3403]
        #             [0.5 + 0.9093,  -0.2 + -0.4161],     ---->     [ 1.4093, -0.6161]
        #             [0.1 + 0.1411,  -0.5 + -0.9899],     ---->     [ 0.2411, -1.4899]
        #             [1.0 + -0.7568,  0.8 + -0.6536],     ---->     [ 0.2432,  0.1464]
        #             [0.5 + -0.9589, -0.2 + 0.2837]]      ---->     [-0.4589,  0.0837]] 

        
        self.self_attention = Attention(d_model=d_model) # Shape = (6,2)
        # self.self_attention = [[0.4000,  1.5500],
        #                       [1.5613,  0.9282],
        #                       [1.1958,  0.1599],
        #                       [0.1468, -0.5879],
        #                       [0.3528,  0.0940],
        #                       [0.1087,  0.0877]]

        # Fully Connected Layer
        # inputs = 2(integer value for each taken), outputs = 4(all possible outputs)
        self.fc_layer = nn.Linear(in_features=d_model, out_features=num_tokens)
        self.loss = nn.CrossEntropyLoss()

    def forward(self, token_ids):
        word_embeddings = self.we(token_ids)
        position_encoded = self.pe(word_embeddings)

        # tril -> lower triangle
        # token_ids = no. of tokens passed (let's say = 4)
        # torch.ones((token_ids.size(dim=0), token_ids.size(dim=0))) -> creates a matrix of ones of (4,4)
        # tril -> leaves the values in the lower triangle as they are, and turns everything else as 0's
        mask = torch.tril(torch.ones((token_ids.size(dim=0), token_ids.size(dim=0))))
        # mask = [[1.,0.,0.,0.],
        #         [1.,1.,0.,0.],
        #         [1.,1.,1.,0.],
        #         [1.,1.,1.,1.]]

        mask = mask == 0
        # mask = [[False, True,  True,  True],
        #         [False, False, True,  True],
        #         [False, False, False, True],
        #         [False, False, False, False]]

        # position encoded 3 times -> for key, value, query
        # Attention (Q, K, V) = SoftMax ((Q.K^T)/sqrt(d^k) + M) V
        # this formula is applied and stuff
        self_attention_values = self.self_attention(position_encoded, position_encoded, position_encoded, mask=mask)
        residual_connection_values = position_encoded + self_attention_values
        
        # output of the fully connected layer 
        fc_layer_output = self.fc_layer(residual_connection_values)

        return fc_layer_output
    
    def configure_optimizers(self):
        return Adam(self.parameters(), lr=0.1)
    
    def training_step(self, batch, batch_idx):
        input_tokens, labels= batch
        output_i = self.forward(input_tokens[0])
        loss = self.loss(output_i, labels[0])
        return loss 


In [40]:
model = DecoderOnlyTransformer(num_tokens=len(token_to_id), d_model=2, max_len=6)
model_input = torch.tensor([
                        token_to_id['what'],
                        token_to_id['is'],
                        token_to_id['StatQuest'],
                        token_to_id['<EOS>']
                    ])

input_length = model_input.size(dim=0)
predictions = model(model_input)
predicted_id = torch.tensor([torch.argmax(predictions[-1,:])])
predicted_ids = predicted_id

max_length = 6

for i in range(input_length, max_length):
    if (predicted_id == token_to_id["<EOS>"]):
        break
    model_input = torch.cat((model_input, predicted_id))
    predictions = model(model_input)
    predicted_id = torch.tensor([torch.argmax(predictions[-1,:])])
    predicted_ids = torch.cat((predicted_ids, predicted_id))

print("Predicted Tokens:\n")
for id in predicted_ids:
    print('\t', id_to_token[id.item()])

Predicted Tokens:

	 <EOS>


In [41]:
trainer = L.Trainer(max_epochs=30)
trainer.fit(model, train_dataloaders=dataloader)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs

  | Name           | Type               | Params | Mode 
--------------------------------------------------------------
0 | we             | Embedding          | 10     | train
1 | pe             | PositionalEncoding | 0      | train
2 | self_attention | Attention          | 12     | train
3 | fc_layer       | Linear             | 15     | train
4 | loss           | CrossEntropyLoss   | 0      | train
--------------------------------------------------------------
37        Trainable params
0         Non-trainable params
37        Total params
0.000     Total estimated model params size (MB)
8         Modules in train mode
0         Modules in eval mode


Epoch 5:   0%|          | 0/2 [00:00<?, ?it/s, v_num=0]         

Epoch 29: 100%|██████████| 2/2 [00:00<00:00, 72.16it/s, v_num=0] 

`Trainer.fit` stopped: `max_epochs=30` reached.


Epoch 29: 100%|██████████| 2/2 [00:00<00:00, 48.89it/s, v_num=0]


In [42]:
model_input = torch.tensor([
                        token_to_id['what'],
                        token_to_id['is'],
                        token_to_id['StatQuest'],
                        token_to_id['<EOS>']
                    ])

input_length = model_input.size(dim=0)
predictions = model(model_input)
predicted_id = torch.tensor([torch.argmax(predictions[-1,:])])
predicted_ids = predicted_id

max_length = 6

for i in range(input_length, max_length):
    if (predicted_id == token_to_id["<EOS>"]):
        break
    model_input = torch.cat((model_input, predicted_id))
    predictions = model(model_input)
    predicted_id = torch.tensor([torch.argmax(predictions[-1,:])])
    predicted_ids = torch.cat((predicted_ids, predicted_id))

print("Predicted Tokens:\n")
for id in predicted_ids:
    print('\t', id_to_token[id.item()])

Predicted Tokens:

	 Awesome
	 <EOS>
