<a href="https://colab.research.google.com/github/tannisthamaiti/AIWeekend-Project/blob/main/Transformer/Transformer_chatbot_firstpass.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

----

![alt text](https://github.com/tannisthamaiti/AIWeekend-Project/blob/main/images/T1.png?raw=true "Encoder-Decder Transformer")

Although Encoder-Decoder Transformers look complicated and can do really cool things, the good news is that they don't actually require a lot of code. A lot of their power comes from simply making multiple copies of each component. So, with that said...

In this tutorial, you will...

- **[Code a Position Encoder Class From Scratch!!!](#position)** The position encoder gives a transformer a way to keep track of the order of the input tokens.

- **[Reuse Code an Attention Class From Scratch!!!](#attention)** The attention class allows the transformer to keep track of the relationships among words in the input and the output. We will use self-attention, encoder-decoder attention and mask attention


![alt text](https://github.com/tannisthamaiti/AIWeekend-Project/blob/main/images/T2.png?raw=true "Encoder-Decder Transformer")


#### ALSO NOTE:
I strongly encourage you to play around with the code. Playing with the code is the best way to learn from it.

In [1]:
import torch ## torch let's us create tensors and also provides helper functions
import torch.nn as nn ## torch.nn gives us nn.module() and nn.Linear()
import torch.nn.functional as F # This gives us the softmax()
from torch.utils.data import TensorDataset, DataLoader ## We'll store our data in DataLoaders
from torch.optim import Adam
import torch.optim as optim

In [2]:
# Updated vocabulary with <start> and <end>
token_to_id = {
    'what': 0,
    'is': 1,
    'LiveAI': 2,
    'awesome': 3,
    '<start>': 4,
    '<end>': 5
}
id_to_token = dict(map(reversed, token_to_id.items()))

# Encoder inputs: questions only (no special tokens)
encoder_inputs = torch.tensor([
    [token_to_id["what"], token_to_id["is"], token_to_id["LiveAI"]],
    [token_to_id["LiveAI"], token_to_id["is"], token_to_id["what"]]
])
# Decoder inputs: start with <start> token
decoder_inputs = torch.tensor([
    [token_to_id["<start>"], token_to_id["awesome"]],
    [token_to_id["<start>"], token_to_id["awesome"]]
])
# Target outputs: shifted right by one position to predict next token
decoder_targets = torch.tensor([
    [token_to_id["awesome"], token_to_id["<end>"]],
    [token_to_id["awesome"], token_to_id["<end>"]]
])

# Combine everything into a dataset
#dataset = TensorDataset(encoder_inputs, decoder_inputs, decoder_targets)
#dataloader = DataLoader(dataset)

In [3]:
class PositionEncoding(nn.Module):

    def __init__(self, d_model=2, max_len=6):
        ## d_model = The dimension of the transformer, which is also the number of embedding values per token.
        ##           In the transformer I used in the StatQuest: Transformer Neural Networks Clearly Explained!!!
        ##           d_model=2, so that's what we'll use as a default for now.
        ##           However, in "Attention Is All You Need" d_model=512
        ## max_len = maximum number of tokens we allow as input.
        ##           Since we are precomputing the position encoding values and storing them in a lookup table
        ##           we can use d_model and max_len to determine the number of rows and columns in that
        ##           lookup table.
        ##
        ##           In this simple example, we are only using short phrases, so we are using
        ##           max_len=6 as the default setting.
        ##           However, in The Annotated Transformer, they set the default value for max_len to 5000

        super().__init__()
        ## We call the super's init because by creating our own __init__() method, we overwrite the one
        ## we inherited from nn.Module. So we have to explicity call nn.Module's __init__(), otherwise it
        ## won't get initialized. NOTE: If we didn't write our own __init__(), then we would not have
        ## to call super().__init__(). Alternatively, if we didn't want to access any of nn.Module's methods,
        ## we wouldn't have to call it then either.

        ## Now we create a lookup table, pe, of position encoding values and initialize all of them to 0.
        ## To do this, we will make a matrix of 0s that has max_len rows and d_model columns.
        ## for example...
        ## torch.zeros(3, 2)
        ## ...returns a matrix of 0s with 3 rows and 2 columns...
        ## tensor([[0., 0.],
        ##         [0., 0.],
        ##         [0., 0.]])
        pe = torch.zeros(max_len, d_model)

        ## Now we create a sequence of numbers for each position that a token can have in the input (or output).
        ## For example, if the input tokens where "I'm happy today!", then "I'm" would get the first
        ## position, 0, "happy" would get the second position, 1, and "today!" would get the third position, 2.
        ## NOTE: Since we are going to be doing math with these position indices to create the
        ## positional encoding for each one, we need them to be floats rather than ints.
        ##
        ## NOTE: Two ways to create floats are...
        ##
        ## torch.arange(start=0, end=3, step=1, dtype=torch.float)
        ##
        ## ...and...
        ##
        ## torch.arange(start=0, end=3, step=1).float()
        ##
        ## ...but the latter is just as clear and requires less typing.
        ##
        ## Lastly, .unsqueeze(1) converts the single list of numbers that torch.arange creates into a matrix with
        ## one row for each index, and all of the indices in a single column. So if "max_len" = 3, then we
        ## would create a matrix with 3 rows and 1 column like this...
        ##
        ## torch.arange(start=0, end=3, step=1, dtype=torch.float).unsqueeze(1)
        ##
        ## ...returns...
        ##
        ## tensor([[0.],
        ##         [1.],
        ##         [2.]])
        position = torch.arange(start=0, end=max_len, step=1).float().unsqueeze(1)


        ## Here is where we start doing the math to determine the y-axis coordinates on the
        ## sine and cosine curves.
        ##
        ## The positional encoding equations used in "Attention is all you need" are...
        ##
        ## PE(pos, 2i)   = sin(pos / 10000^(2i/d_model))
        ## PE(pos, 2i+1) = cos(pos / 10000^(2i/d_model))
        ##
        ## ...and we see, within the sin() and cos() functions, we divide "pos" by some number that depends
        ## on the index (i) and total number of PE values we want per token (d_model).
        ##
        ## NOTE: When the index, i, is 0 then we are calculating the y-axis coordinates on the **first pair**
        ##       of sine and cosine curves. When i=1, then we are calculating the y-axis coordiantes on the
        ##       **second pair** of sine and cosine curves. etc. etc.
        ##
        ## Now, pretty much everyone calculates the term we use to divide "pos" by first, and they do it with
        ## code that looks like this...
        ##
        ## div_term = torch.exp(torch.arange(start=0, end=d_model, step=2).float() * -(math.log(10000.0) / d_model))
        ##
        ## Now, at least to me, it's not obvious that div_term = 1/(10000^(2i/d_model)) for a few reasons:
        ##
        ##    1) div_term wraps everything in a call to torch.exp()
        ##    2) It uses log()
        ##    2) The order of the terms is different
        ##
        ## The reason for these differences is, presumably, trying to prevent underflow (getting too close to 0).
        ## So, to show that div_term = 1/(10000^(2i/d_model))...
        ##
        ## 1) Swap out math.log() for torch.log() (doing this requires converting 10000.0 to a tensor, which is my
        ##    guess for why they used math.log() instead of torch.log())...
        ##
        ## torch.exp(torch.arange(start=0, end=d_model, step=2).float() * -(torch.log(torch.tensor(10000.0)) / d_model))
        ##
        ## 2) Rearrange the terms...
        ##
        ## torch.exp(-1 * (torch.log(torch.tensor(10000.0)) * torch.arange(start=0, end=d_model, step=2).float() / d_model))
        ##
        ## 3) Pull out the -1 with exp(-1 * x) = 1/exp(x)
        ##
        ## 1/torch.exp(torch.log(torch.tensor(10000.0)) * torch.arange(start=0, end=d_model, step=2).float() / d_model)
        ##
        ## 4) Use exp(a * b) = exp(a)^b to pull out the 2i/d_model term...
        ##
        ## 1/torch.exp(torch.log(torch.tensor(10000.0)))^(torch.arange(start=0, end=d_model, step=2).float() / d_model)
        ##
        ## 5) Use exp(log(x)) = x to get the original form of the denominator...
        ##
        ## 1/(torch.tensor(10000.0)^(torch.arange(start=0, end=d_model, step=2).float() / d_model))
        ##
        ## 6) Bam.
        ##
        ## So, that being said, I don't think underflow is actually that big an issue. In fact, some coder at Hugging Face
        ## also doesn't think so, and their code for positional encoding in DistilBERT (a streamlined version of BERT, which
        ## is a transformer model)
        ## calculates the values directly - using the form of the equation found in original Attention is all you need
        ## manuscript. See...
        ## https://github.com/huggingface/transformers/blob/455c6390938a5c737fa63e78396cedae41e4e87e/src/transformers/modeling_distilbert.py#L53
        ## So I think we can simplify the code, but I'm also writing all these comments to show that it is equivalent to what
        ## you'll see in the wild...
        ##
        ## Now let's create an index for the embedding positions to simplify the code a little more...
        embedding_index = torch.arange(start=0, end=d_model, step=2).float()
        ## NOTE: Setting step=2 results in the same sequence numbers that we would get if we multiplied i by 2.
        ##       So we can save ourselves a little math by just setting step=2.

        ## And now, finally, let's create div_term...
        div_term = 1/torch.tensor(10000.0)**(embedding_index / d_model)

        ## Now we calculate the actual positional encoding values. Remember 'pe' was initialized as a matrix of 0s
        ## with max_len (max number of input tokens) rows and d_model (number of embedding values per token) columns.
        pe[:, 0::2] = torch.sin(position * div_term) ## every other column, starting with the 1st, has sin() values
        pe[:, 1::2] = torch.cos(position * div_term) ## every other column, starting with the 2nd, has cos() values
        ## NOTE: If the notation for indexing 'pe[]' looks cryptic to you, read on...
        ##
        ## First, let's look at the general indexing notation:
        ##
        ## For each row or column in matrix we can select elements in that
        ## row or column with the following indexs...
        ##
        ## i:j:k = select elements between i and j with stepsize = k.
        ##
        ## ...where...
        ##
        ## i defaults to 0
        ## j defaults to the number of elements in the row, column or whatever.
        ## k defaults to 1
        ##
        ## Now that we have looked at the general notation, let's look at specific
        ## examples so that we can understand it.
        ##
        ## We'll start with: pe[:, 0::2]
        ##
        ## The stuff that comes before the comma (in this case ':') refers to the rows we want to select.
        ## The ':' before the comma means "select all rows" because we are not providing specific
        ## values for i, j and k and, instead, just using the default values.
        ##
        ## The stuff after the comma refers to the columns we want to select.
        ## In this case, we have '0::2', and that means we start with
        ## the first column (column =  0) and go to the end (using the default value for j)
        ## and we set the stepsize to 2, which means we skip every other column.
        ##
        ## Now to understand pe[:, 1::2]
        ##
        ## Again, the stuff before the comma refers to the rows, and, just like before
        ## we use default values for i,j and k, so we select all rows.
        ##
        ## The stuff that comes after the comma refers to the columns.
        ## In this case, we start with the 2nd column (column = 1), and go to the end
        ## (using the default value for 'j') and we set the stepsize to 2, which
        ## means we skip every other column.
        ##
        ## NOTE: using this ':' based notation is called "indexing" and also called "slicing"

        ## Now we "register 'pe'.
        self.register_buffer('pe', pe) ## "register_buffer()" ensures that
                                       ## 'pe' will be moved to wherever the model gets
                                       ## moved to. So if the model is moved to a GPU, then,
                                       ## even though we don't need to optimize 'pe', it will
                                       ## also be moved to that GPU. This, in turn, means
                                       ## that accessing 'pe' will be relatively fast copared
                                       ## to having a GPU have to get the data from a CPU.

    ## Because this class, PositionEncoding, inherits from nn.Module, the forward() method
    ## is called by default when we use a PositionEncoding() object.
    ## In other words, after we create a PositionEncoding() object, pe = PositionEncoding(),
    ## then pe(word_embeddings) will call forward() and so this is where
    ## we will add the position encoding values to the word embedding values
    def forward(self, word_embeddings):

        return word_embeddings + self.pe[:word_embeddings.size(0), :] ## word_embeddings.size(0) = number of embeddings
                                                                      ## NOTE: That second ':' is optional and
                                                                      ## we could re-write it like this:
                                                                      ## self.pe[:word_embeddings.size(0)]

In [4]:
class Attention(nn.Module):

    def __init__(self, d_model=2,
                 row_dim=0,
                 col_dim=1):

        super().__init__()

        self.W_q = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
        self.W_k = nn.Linear(in_features=d_model, out_features=d_model, bias=False)
        self.W_v = nn.Linear(in_features=d_model, out_features=d_model, bias=False)

        self.row_dim = row_dim
        self.col_dim = col_dim


    ## The only change from SelfAttention and attention is that
    ## now we expect 3 sets of encodings to be passed in...
    def forward(self, encodings_for_q, encodings_for_k, encodings_for_v, mask=None):
        ## ...and we pass those sets of encodings to the various weight matrices.
        q = self.W_q(encodings_for_q)
        k = self.W_k(encodings_for_k)
        v = self.W_v(encodings_for_v)

        sims = torch.matmul(q, k.transpose(dim0=self.row_dim, dim1=self.col_dim))

        scaled_sims = sims / torch.tensor(k.size(self.col_dim)**0.5)

        if mask is not None:
            scaled_sims = scaled_sims.masked_fill(mask=mask, value=-1e9)

        attention_percents = F.softmax(scaled_sims, dim=self.col_dim)

        attention_scores = torch.matmul(attention_percents, v)

        return attention_scores

A Encoder Transformer simply brings together...

- Word Embedding
- Position Encoding
- Self-Attention
- Residual Connections + Normalization
- A fully connected layer

In [5]:
class Encoder(nn.Module):

    def __init__(self, num_tokens=4, d_model=2, max_len=6):

        super().__init__()


        self.we = nn.Embedding(num_embeddings=num_tokens,
                               embedding_dim=d_model)

        self.pe = PositionEncoding(d_model=d_model,
                                   max_len=max_len)

        self.self_attention = Attention(d_model=d_model)
        self.layernorm = nn.LayerNorm(d_model)

        self.fc_layer = nn.Linear(in_features=d_model, out_features=num_tokens)




    def forward(self, token_ids):

        word_embeddings = self.we(token_ids)
        print("word_embeddings",word_embeddings.shape)
        position_encoded = self.pe(word_embeddings)

        self_attention_values = self.self_attention(position_encoded,
                                                    position_encoded,
                                                    position_encoded,
                                                    mask=None)

        print(position_encoded.shape, self_attention_values.shape)
        residual_connection_values = self.layernorm(position_encoded + self_attention_values)

        fc_layer_output = self.fc_layer(residual_connection_values)


        #x= self.layernorm(fc_layer_output+residual_connection_values)

        return residual_connection_values, residual_connection_values





In [6]:
class Decoder(nn.Module):

    def __init__(self, num_tokens=4, d_model=2, max_len=6):

        super().__init__()



        ## NOTE: In this simple example, we are just using a "single layer" decoder.
        ##       If we wanted to have multiple layers of decoder, then we would
        ##       take the output of one decoder module and use it as input to
        ##       the next module.

        self.we = nn.Embedding(num_embeddings=num_tokens,
                               embedding_dim=d_model)

        self.pe = PositionEncoding(d_model=d_model,
                                   max_len=max_len)

        self.self_attention = Attention(d_model=d_model)
        self.cross_attention = Attention(d_model=d_model)
        self.layernorm1 = nn.LayerNorm(d_model)
        self.layernorm2 = nn.LayerNorm(d_model)


        self.fc_layer = nn.Linear(in_features=d_model, out_features=num_tokens)





    def forward(self, token_ids,encoder_k, encoder_v):
        device = token_ids.device
        word_embeddings = self.we(token_ids)
        print("word_embeddings",word_embeddings.shape)
        position_encoded = self.pe(word_embeddings)

        ## For the decoder-only transformer, we need to use "masked self-attention" so that
        ## when we are training we can't cheat and look ahead at
        ## what words come after the current word.
        ## To create the mask we are creating a matrix where the lower triangle
        ## is filled with 0, and everything above the diagonal is filled with 0s.
        mask = torch.tril(torch.ones((token_ids.size(dim=0), token_ids.size(dim=0))))

        mask = mask == 0
        print("position_encoded.shape,mask.shape",position_encoded.shape,mask.shape)

        mask_self_attention_values = self.self_attention(position_encoded,
                                                    position_encoded,
                                                    position_encoded,
                                                    mask=mask)

        residual_connection_values = self.layernorm1(position_encoded + mask_self_attention_values)
        print("residual_connection_values",residual_connection_values.shape)
        print("encoder_k",encoder_k.shape)





        x_cross_att = self.cross_attention(residual_connection_values, encoder_k, encoder_v, mask=None)
        x = self.layernorm2(residual_connection_values + x_cross_att)
        fc_layer_output = self.fc_layer(x)



        return fc_layer_output





In [7]:
## First, create a model from DecoderOnlyTransformer()
model = Encoder(num_tokens=len(token_to_id), d_model=2, max_len=6)

## Now create the input for the transformer...
model_input = torch.tensor([token_to_id["what"], token_to_id["is"], token_to_id["LiveAI"]])

input_length = model_input.size(dim=0)


## Now get get predictions from the model
encoder_k, encoder_v = model(model_input)
print("model_input.shape",model_input.shape)

decoder_input = torch.tensor([token_to_id["<start>"], token_to_id["awesome"]])
print("decoder_input", decoder_input.shape)

# Initialize and test decoder
decoder = Decoder(num_tokens=len(token_to_id), d_model=2, max_len=6)
output = decoder(decoder_input, encoder_k, encoder_v)
print(output)

word_embeddings torch.Size([3, 2])
torch.Size([3, 2]) torch.Size([3, 2])
model_input.shape torch.Size([3])
decoder_input torch.Size([2])
word_embeddings torch.Size([2, 2])
position_encoded.shape,mask.shape torch.Size([2, 2]) torch.Size([2, 2])
residual_connection_values torch.Size([2, 2])
encoder_k torch.Size([3, 2])
tensor([[-0.6833,  0.2581,  0.0349,  1.3285, -0.8095, -0.5797],
        [ 1.0914,  0.0269,  1.3525, -0.5493, -0.4252, -0.8072]],
       grad_fn=<AddmmBackward0>)


## Bring everything together to create the transformer class

### Hyperparameter

# Train the transformer

In [8]:
# Hyperparameters
num_tokens = len(token_to_id)
d_model = 2
max_len = 10
batch_size = 1
num_epochs = 1000
learning_rate = 0.001

In [9]:
# Create the transformer model
class Transformer(nn.Module):
    def __init__(self, num_tokens, d_model, max_len):
        super().__init__()

        self.encoder = Encoder(num_tokens=num_tokens, d_model=d_model, max_len=max_len)
        self.decoder = Decoder(num_tokens=num_tokens, d_model=d_model, max_len=max_len)

        # Output projection layer
        self.output_linear = nn.Linear(num_tokens, num_tokens)

    def forward(self, src_tokens, tgt_tokens):
        # Pass source tokens through encoder
        encoder_output, encoder_hidden = self.encoder(src_tokens)

        # Pass target tokens and encoder outputs through decoder
        decoder_output = self.decoder(tgt_tokens, encoder_output, encoder_hidden)

        return decoder_output

    def generate(self, src_tokens, max_len=10, start_token=4): # 4 is <start>
        device = src_tokens.device

        # Encode the source sequence
        encoder_output, encoder_hidden = self.encoder(src_tokens)

        # Initialize decoder input with start token
        decoder_input = torch.tensor([[start_token]], device=device)

        generated_sequence = [start_token]

        # Generate tokens one by one
        for _ in range(max_len):
            # Get decoder output
            decoder_output = self.decoder(decoder_input, encoder_output, encoder_hidden)

            # Get the predicted token
            _, topi = decoder_output[-1].topk(1)
            predicted_token = topi.item()

            # Add to the generated sequence
            generated_sequence.append(predicted_token)

            # Stop if we generated an <end> token
            if predicted_token == token_to_id["<end>"]:
                break

            # Update decoder input
            decoder_input = torch.cat([decoder_input, torch.tensor([[predicted_token]], device=device)], dim=1)

        return generated_sequence

# Create dataset and dataloader
dataset = TensorDataset(encoder_inputs, decoder_inputs, decoder_targets)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Initialize the model
model = Transformer(num_tokens=num_tokens, d_model=d_model, max_len=max_len)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=-1)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)



In [12]:
# Training loop
def train():
    model.train()
    for epoch in range(num_epochs):
        epoch_loss = 0

        for src, tgt_in, tgt_out in dataloader:
            # Zero gradients
            optimizer.zero_grad()
            src=src.squeeze(0)
            tgt_in=tgt_in.squeeze(0)
            tgt_out=tgt_out.squeeze(0)
            print("src",src.shape)
            print("tgt_in",tgt_in.shape)
            print("tgt_out",tgt_out.shape)
            # Forward pass
            output = model(src, tgt_in)

            # Reshape output and target for loss computation
            output_flat = output.contiguous().view(-1, num_tokens)
            target_flat = tgt_out.contiguous().view(-1)

            # Compute loss
            loss = criterion(output_flat, target_flat)
            epoch_loss += loss.item()

            # Backward pass
            loss.backward()

            # Update weights
            optimizer.step()

        # Print loss every 100 epochs
        if (epoch + 1) % 100 == 0:
            print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss/len(dataloader):.4f}')

    print("Training completed!")

# Inference function
def generate_response(input_sequence):
    model.eval()
    with torch.no_grad():
        input_tensor = torch.tensor([input_sequence])
        generated = model.generate(input_tensor)

        # Convert ids to tokens
        tokens = [id_to_token[idx] for idx in generated]

        # Remove <start> token
        if tokens[0] == '<start>':
            tokens = tokens[1:]

        return tokens

# Run training
train()

# Test the model with some examples
test_inputs = [
    [token_to_id["what"], token_to_id["is"], token_to_id["LiveAI"]],
    [token_to_id["LiveAI"], token_to_id["is"], token_to_id["what"]]
]

for test_input in test_inputs:
    # Convert input ids to tokens for display
    input_tokens = [id_to_token[idx] for idx in test_input]
    print(f"Input: {' '.join(input_tokens)}")

    # Generate response
    response = generate_response(test_input)
    print(f"Response: {' '.join(response)}\n")

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
tgt_in torch.Size([2])
tgt_out torch.Size([2])
word_embeddings torch.Size([3, 2])
torch.Size([3, 2]) torch.Size([3, 2])
word_embeddings torch.Size([2, 2])
position_encoded.shape,mask.shape torch.Size([2, 2]) torch.Size([2, 2])
residual_connection_values torch.Size([2, 2])
encoder_k torch.Size([3, 2])
src torch.Size([3])
tgt_in torch.Size([2])
tgt_out torch.Size([2])
word_embeddings torch.Size([3, 2])
torch.Size([3, 2]) torch.Size([3, 2])
word_embeddings torch.Size([2, 2])
position_encoded.shape,mask.shape torch.Size([2, 2]) torch.Size([2, 2])
residual_connection_values torch.Size([2, 2])
encoder_k torch.Size([3, 2])
src torch.Size([3])
tgt_in torch.Size([2])
tgt_out torch.Size([2])
word_embeddings torch.Size([3, 2])
torch.Size([3, 2]) torch.Size([3, 2])
word_embeddings torch.Size([2, 2])
position_encoded.shape,mask.shape torch.Size([2, 2]) torch.Size([2, 2])
residual_connection_values torch.Size([2, 2])
encoder_k torch.Si

RuntimeError: Expected size for first two dimensions of batch2 tensor to be: [3, 2] but got: [3, 1].