<a href="https://colab.research.google.com/github/Yashgabani845/Nb-analysis/blob/main/chatbot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [28]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import math
import copy

let's start step by as we know in our transformer mechanism first thing that we do with input is token embedding and convert strung into number or specifically array on number

In [112]:
import numpy as np
# nn.Module says it is layer of neural Network
#d_model = embedding size
class TokenEmbedding(nn.Module):
  def __init__(self,vocb_size,d_model):
    super().__init__();
    self.embedding = nn.Embedding(vocab_size, d_model)
    #create embeddign  layer
  def forward(self , x):
      #x will have vector which is embedded that we pass in next layer
      return self.embedding(x)

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()

        # Create a positional encoding matrix
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)

        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        # Add batch dimension
        pe = pe.unsqueeze(0)  # Shape: (1, max_len, d_model)

        # Register buffer so it is not a trainable parameter
        self.register_buffer('pe', pe)

    def forward(self, x):
      """
      x: Tensor of shape (batch_size, seq_length, d_model)
      Returns: Tensor of same shape with positional encoding added
      """
      #print("Input tensor shape:", x.shape)  # Debug print
      seq_length = x.size(1)
      #print("Positional encoding shape:", self.pe[:, :seq_length, :].shape)  # Debug print
      # Expand the positional encoding to match the batch size
      # this will replicate your positional encoding along the 0th dimension and make the size of positional_encoding = (4,20,512)
      positional_encoding = self.pe[:, :seq_length, :].expand(x.size(0), -1, -1)

      return x + positional_encoding # Now the size of 'x' and positional encoding is the same and we will be able to add them

In [105]:
class ScaledProductAttention(nn.Module):
  def __init__(self):
     super().__init__()

  def forward(self,query , key , value , mask = None):
    d_k = query.size(-1)
    scores = (query @ key.transpose(-2,-1))/np.sqrt(d_k)

    if mask is not None:
      scores = scores.masked_fill(mask==0,-1e9)


    attention = torch.softmax(scores,dim=-1) # This line was indented inside the if statement, causing the error when mask was None.
    return attention @ value # This line was indented inside the if statement, causing the error when mask was None.

In [93]:
class MultiHeadAttention(nn.Module):
  def __init__(self,heads,d_model):
    super().__init__()
    self.heads = heads
    self.d_k = d_model // heads
    self.d_model = d_model

    self.W_query = nn.Linear(d_model , d_model)
    self.W_key = nn.Linear(d_model , d_model)
    self.W_value = nn.Linear(d_model , d_model)
    self.W_o = nn.Linear(d_model, d_model)

    self.attention = ScaledProductAttention()


  def forward(self, query, key, value, mask=None):
    batch_size = query.shape[0]

    # Apply linear transformations and reshape
    query = self.W_query(query).view(batch_size, -1, self.heads, self.d_k).transpose(1, 2)
    key = self.W_key(key).view(batch_size, -1, self.heads, self.d_k).transpose(1, 2)
    value = self.W_value(value).view(batch_size, -1, self.heads, self.d_k).transpose(1, 2)

    # Calculate attention
    atten_output = self.attention(query, key, value, mask)

    # Concatenate and reshape
    concat_output = atten_output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)

    # Apply final linear transformation
    return self.W_o(concat_output)

In [94]:
class FeedForwardNetwork(nn.Module):
  def __init__(self, d_model, d_ff):
    super().__init__()
    self.fc1 = nn.Linear(d_model, d_ff)
    self.fc2 = nn.Linear(d_ff, d_model)
    self.relu= nn.ReLU()

  def forward(self, x):
    return self.fc2(self.relu(self.fc1(x)))

In [95]:
class EncoderLayer(nn.Module):
  def __init__(self,d_model,heads,d_ff):
    super().__init__()
    self.attention = MultiHeadAttention(heads,d_model)
    self.norm1 = nn.LayerNorm(d_model)
    self.ffn = FeedForwardNetwork(d_model,d_ff)
    self.norm2 = nn.LayerNorm(d_model)

  def forward(self , x, mask=None):
      atten_out = self.attention(x,x,x,mask)
      x = self.norm1(x+atten_out)
      ffn_out = self.ffn(x)
      return self.norm2(x+ffn_out)
      return x

In [96]:
class DecoderLayer(nn.Module):
  def __init__(self, d_model, heads, d_ff):
    super().__init__()
    self.attention = MultiHeadAttention(heads, d_model)
    self.norm1 = nn.LayerNorm(d_model)
    # Pass heads and d_model in the correct order
    self.enc_dec_attention = MultiHeadAttention(heads, d_model)
    self.norm2 = nn.LayerNorm(d_model)
    self.ffn = FeedForwardNetwork(d_model, d_ff)
    self.norm3 = nn.LayerNorm(d_model)

  def forward(self, x, encoder_output, src_mask=None, target_mask=None):
    x = self.norm1(x + self.attention(x, x, x, target_mask))
    x = self.norm2(x + self.enc_dec_attention(x, encoder_output, encoder_output, src_mask))
    #call ffn instead of fnn
    x = self.norm3(x + self.ffn(x))
    return x

In [113]:
class Transformer(nn.Module):
  def __init__ (self , vocab_size, d_model , heads , d_ff , num_layers):
    super().__init__()
    self.embedding = TokenEmbedding(vocab_size,d_model)
    self.pe = PositionalEncoding(d_model)
    self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, heads, d_ff) for _ in range(num_layers)])
    self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, heads, d_ff) for _ in range(num_layers)])

  def forward(self, src, tgt):
    src = self.pe(self.embedding(src))
    tgt = self.pe(self.embedding(tgt))
    for layer in self.encoder_layers:
      src= layer(src)
    for layer in self.decoder_layers:
      tgt = layer(tgt,src)
    return tgt

In [118]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Example dataset: 10 sentences with 20 tokens each
num_sentences = 10
seq_length = 20
batch_size = 4
vocab_size = 10000

# Generate dummy dataset (Replace with real tokenized data)
src_data = torch.randint(0, vocab_size, (num_sentences, seq_length))
tgt_data = torch.randint(0, vocab_size, (num_sentences, seq_length))

# Create DataLoader for batching
dataset = TensorDataset(src_data, tgt_data)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Define Transformer Model
class Transformer(nn.Module):
    def __init__(self, vocab_size, d_model, heads, d_ff, num_layers):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pe = PositionalEncoding(d_model)
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, heads, d_ff) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, heads, d_ff) for _ in range(num_layers)])
        self.fc_out = nn.Linear(d_model, vocab_size)  # Final projection layer

    def forward(self, src, tgt):
        src = self.pe(self.embedding(src))
        tgt = self.pe(self.embedding(tgt))

        # Encoder Pass
        for layer in self.encoder_layers:
            src = layer(src)

        # Decoder Pass
        for layer in self.decoder_layers:
            tgt = layer(tgt, src)

        return self.fc_out(tgt)

# Instantiate Model
model = Transformer(vocab_size, d_model=512, heads=8, d_ff=2048, num_layers=6)

# Loss and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.00005)

# Training Loop with DataLoader
model.train()
for epoch in range(5):
    for src, tgt in dataloader:
        optimizer.zero_grad()

        output = model(src, tgt)  # Forward pass
        output = output.view(-1, vocab_size)  # Flatten for loss computation
        tgt = tgt.view(-1)  # Flatten target

        loss = criterion(output, tgt)
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch+1}/5, Loss: {loss.item()}")


Epoch 1/5, Loss: 9.37251091003418
Epoch 2/5, Loss: 8.217771530151367
Epoch 3/5, Loss: 8.025714874267578
Epoch 4/5, Loss: 7.621241092681885
Epoch 5/5, Loss: 7.54739236831665
