In [1]:
import torch
import torch.nn.functional as F
import torch.nn as nn
import math
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

In [23]:
#rope embedding
class RoPEEmbedding(torch.nn.Module):
    def __init__(self, embedding_dim):
        super().__init__()
        assert embedding_dim % 2 == 0, "Embedding dimension must be even for RoPE"
        self.embedding_dim = embedding_dim

    def forward(self, x):
        """
        Forward pass for Rotary Position Embedding.

        Args:
        - x: Tensor of shape (batch_size, seq_len, embedding_dim)

        Returns:
        - Tensor with RoPE applied to the last two dimensions.
        """
        seq_len = x.shape[1]

        # Generate position indices
        position_ids = torch.arange(seq_len, dtype=torch.float32, device=x.device)

        # Compute the rotary angles
        freqs = 1.0 / (10000 ** (torch.arange(0, self.embedding_dim, 2, dtype=torch.float32, device=x.device) / self.embedding_dim))
        angles = torch.einsum('i,j->ij', position_ids, freqs)

        # Create the rotation matrix for sin and cos embeddings
        sin = torch.sin(angles).repeat_interleave(2, dim=-1)
        cos = torch.cos(angles).repeat_interleave(2, dim=-1)

        # Apply rotation using cos and sin embeddings
        x1 = x * cos + self.rotate_half(x) * sin
        return x1


    def rotate_half(self,x):
        """
        Rotate the last dimension of the input tensor by swapping odd and even elements and negating one.

        Args:
        - x: Tensor of shape (..., embedding_dim)

        Returns:
        - Rotated tensor of the same shape.
        """
        x1, x2 = x[..., ::2], x[..., 1::2]  # Split into even and odd dimensions
        return torch.cat((-x2, x1), dim=-1)

#sine embedding
class PositionalEncoding(nn.Module):

    def __init__(self, d_model, max_sequence_length):
        super().__init__()
        self.max_sequence_length = max_sequence_length
        self.d_model = d_model

    def forward(self, x):
        batch_size, seq_len, _ = x.size()

        # Generate the positional encoding
        even_i = torch.arange(0, self.d_model, 2).float()
        denominator = torch.pow(10000, even_i/self.d_model)
        position = torch.arange(self.max_sequence_length).reshape(self.max_sequence_length, 1)
        even_PE = torch.sin(position / denominator)
        odd_PE = torch.cos(position / denominator)
        stacked = torch.stack([even_PE, odd_PE], dim=2)
        PE = torch.flatten(stacked, start_dim=1, end_dim=2)

        # Ensure the PE matches the batch size and sequence length
        PE = PE.unsqueeze(0).expand(batch_size, seq_len, self.d_model)

        return PE + x


#feedforward network
class feedforward(nn.Module):
  def __init__(self,d_model,hidlayer,dropout):
    super().__init__()
    self.d_model=d_model
    self.hidlayer=hidlayer
    self.linearlayer1=nn.Linear(self.d_model,self.hidlayer)
    self.linearlayer2=nn.Linear(self.hidlayer,self.d_model)
    self.dropout=nn.Dropout(dropout)
    self.activation=nn.ReLU()

  def forward(self,x):
    l1=self.linearlayer1(x)
    print(f"x after first linear layer: {x.size()}")
    l1=self.activation(l1)
    print(f"x after activation: {l1.size()}")
    l1=self.dropout(l1)
    print(f"x after dropout 1: {l1.size()}")
    out=self.linearlayer2(l1)
    print(f"x after 2nd linear layer: {out.size()}")
    #drop out gen not aplpied after 1st layhers
    out=self.dropout(out)
    print(f"x after dropout 2: {out.size()}")
    return out

#multhead attention
class multihead_attention(nn.Module):
    def __init__(self, inputdim, dmodel, masking=None, heads=1):
        super().__init__()
        self.heads = heads
        self.masking = masking
        assert dmodel % heads == 0, "Embedding dimension must be divisible by num_heads"
        self.inputdim = inputdim
        self.dmodel = dmodel
        self.head_dim = self.dmodel // self.heads
        self.wq = nn.Linear(self.inputdim, self.dmodel)
        self.wk = nn.Linear(self.inputdim, self.dmodel)
        self.wv = nn.Linear(self.inputdim, self.dmodel)
        self.linearlayer=nn.Linear(self.dmodel,self.dmodel)
        self.projectionlayer=nn.Linear(self.dmodel,self.inputdim)
        print('heads =', self.heads)

    def scaled_dot_product_attention(self, q, k, v):
        dk = torch.tensor(q.shape[-1], dtype=torch.float32)
        scaled = torch.matmul(q, k.transpose(-2, -1)) / torch.sqrt(dk)

        if self.masking is not None:
            mask = torch.ones(q.shape[2], q.shape[2], device=q.device)
            mask = torch.tril(mask)
            mask[mask == 0] = -torch.inf
            mask[mask == 1] = 0
            scaled = scaled + mask

        attention = torch.softmax(scaled, dim=-1)
        scores = torch.matmul(attention, v)
        return attention, scores

    def forward(self, x):
        batch_size, sequence_length, input_dim = x.size()
        q = self.wq(x)
        k = self.wk(x)
        v = self.wv(x)
        q = q.view(batch_size, sequence_length, self.heads, self.head_dim)
        k = k.view(batch_size, sequence_length, self.heads, self.head_dim)
        v = v.view(batch_size, sequence_length, self.heads, self.head_dim)
        q = q.permute(0, 2, 1, 3)
        k = k.permute(0, 2, 1, 3)
        v = v.permute(0, 2, 1, 3)

        attention, scores = self.scaled_dot_product_attention(q, k, v)
        #print('scores init',scores.shape)
        scores = scores.reshape(batch_size, sequence_length, self.heads *self.head_dim)# we can use self.dmodel as well as last arg
        #print('scores shape',scores.shape)
        out=self.linearlayer(scores)
        #print('out',out.shape)
        projected=self.projectionlayer(out)
        #print('projected shape',projected.shape)
        #print()
        return projected

#layer norm
class CustomLayerNorm(nn.Module):
    def __init__(self, normalized_shape, epsilon=1e-5):
        super(CustomLayerNorm, self).__init__()
        self.gamma = nn.Parameter(torch.ones(normalized_shape))
        self.beta = nn.Parameter(torch.zeros(normalized_shape))
        self.epsilon = epsilon

    def forward(self, x):
        # Calculate mean and std across the last dimension (features) for each sequence in the batch
        mean = x.mean(dim=-1, keepdim=True)
        std = x.std(dim=-1, keepdim=True)
        x_normalized = (x - mean) / (std + self.epsilon)

        # Apply gamma and beta, which are learned parameters for normalization
        # The shape of gamma and beta should match the feature size
        return self.gamma.unsqueeze(0).unsqueeze(0) * x_normalized + self.beta.unsqueeze(0).unsqueeze(0)

#encopder layer

class encoderlayer(nn.Module):
  def __init__(self,input_dim,d_model,hidlayer,dropout,num_heads,masking):
    super().__init__()
    self.input_dim,self.d_model,self.hidlayer,self.dropout,self.num_heads,self.masking=input_dim,d_model,hidlayer,dropout,num_heads,masking
    #self.PositionalEncoding=PositionalEncoding(self.d_model,self.input_dim) #sinencoding
    self.rope_embedding=RoPEEmbedding(self.input_dim)
    self.multihead_attention=multihead_attention(input_dim,d_model,masking,num_heads)
    self.feedforward=feedforward(self.input_dim,self.hidlayer,self.dropout)
    self.layernorm=CustomLayerNorm(self.input_dim)

  def forward(self,x):
    #rope
    print('---positional encoding--')
    re=self.rope_embedding(x)
    print(re.shape)
    #mulihead
    print('--mulihead attention--')
    mha=self.multihead_attention(re+x)
    print(mha.shape)
    #layernorm
    print('--layer normalisation--')
    ln1=self.layernorm(mha+re+x)
    print(ln1.shape)
    #feedforward
    print('--feedforward network--')
    ff=self.feedforward(ln1)
    print(ff.shape)
    #layernorm
    print('--layer normalisation--')
    out=self.layernorm(ff+ln1)
    print(out.shape)
    return out

class encoder(nn.Module):
  #creating n layer of layers
  def __init__(self,input_dim,d_model,hidlayer,dropout,num_heads,masking,nlayers):
    super().__init__()
    #sequentially stack encoders
    self.layers=nn.Sequential(*[encoderlayer(input_dim,d_model,hidlayer,dropout,num_heads,masking) for _ in range(nlayers)])

  def forward(self, x):
    for i, layer in enumerate(self.layers):
      print(f'\n------layer {i+1}----- ')
      x = layer(x)
      print(f"--Output after layer {i+1}--: {x.size()}")  # Printing the size after each layer
    return x




In [35]:
#torch.manual_seed(4)
input_dim = 4
d_model = 512
num_heads = 2
batch_size = 2
sequence_length = 3
dropout=0.2
hidden=2048
layers=2

torch.manual_seed(3)
x = torch.randn(batch_size, sequence_length, input_dim)
print('Input x:', x)
Encoder = encoder(input_dim,d_model,hidlayer=hidden,dropout=dropout,num_heads=num_heads,masking=None,nlayers=layers)
Encoderout=Encoder(x)

print('encoder shape', Encoderout.shape)
print("encoder output:", Encoderout)


#perfect

Input x: tensor([[[-0.0766,  0.3599, -0.7820,  0.0715],
         [ 0.6648, -0.2868,  1.6206, -1.5967],
         [ 0.4046,  0.6113,  0.7604, -0.0336]],

        [[-0.3448,  0.4937, -0.0776, -1.8054],
         [ 0.4851,  0.2052,  0.3384,  1.3528],
         [ 0.3736,  0.0134,  0.7737, -0.1092]]])
heads = 2
heads = 2

------layer 1----- 
---positional encoding--
torch.Size([2, 3, 4])
--mulihead attention--
torch.Size([2, 3, 4])
--layer normalisation--
torch.Size([2, 3, 4])
--feedforward network--
x after first linear layer: torch.Size([2, 3, 4])
x after activation: torch.Size([2, 3, 2048])
x after dropout 1: torch.Size([2, 3, 2048])
x after 2nd linear layer: torch.Size([2, 3, 4])
x after dropout 2: torch.Size([2, 3, 4])
torch.Size([2, 3, 4])
--layer normalisation--
torch.Size([2, 3, 4])
--Output after layer 1--: torch.Size([2, 3, 4])

------layer 2----- 
---positional encoding--
torch.Size([2, 3, 4])
--mulihead attention--
torch.Size([2, 3, 4])
--layer normalisation--
torch.Size([2, 3, 4])

In [34]:
a=nn.Embedding(3,2)
y=a(torch.tensor([1,2]))
y

tensor([[-0.0354, -1.9893],
        [-0.3161, -0.8495]], grad_fn=<EmbeddingBackward0>)

In [33]:
torch.tensor([4,2])

tensor([4, 2])