<a href="https://colab.research.google.com/github/Aaditya019Jain/Self-NLP-Projects/blob/main/NUS_assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
 !huggingface-cli login


    _|    _|  _|    _|    _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|_|_|_|    _|_|      _|_|_|  _|_|_|_|
    _|    _|  _|    _|  _|        _|          _|    _|_|    _|  _|            _|        _|    _|  _|        _|
    _|_|_|_|  _|    _|  _|  _|_|  _|  _|_|    _|    _|  _|  _|  _|  _|_|      _|_|_|    _|_|_|_|  _|        _|_|_|
    _|    _|  _|    _|  _|    _|  _|    _|    _|    _|    _|_|  _|    _|      _|        _|    _|  _|        _|
    _|    _|    _|_|      _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|        _|    _|    _|_|_|  _|_|_|_|

    A token is already saved on your machine. Run `huggingface-cli whoami` to get more information or `huggingface-cli logout` if you want to log out.
    Setting a new token will erase the existing one.
    To log in, `huggingface_hub` requires a token generated from https://huggingface.co/settings/tokens .
Enter your token (input will not be visible): 
Add token as git credential? (Y/n) Y
Token is valid (permission: fineG

In [None]:
! pip install torchtune torchao triton



In [None]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModelForCausalLM, AutoTokenizer
from torchtune.modules import RotaryPositionalEmbeddings

class SimpleLLM(nn.Module):
    def __init__(self, model_name, device = "cpu"):
        super().__init__()
        self.device = torch.device(device)  # Store device
        self.load_weights(model_name)
        self.to(self.device)

    def create_attn_layer(self, layer_idx):

        layer = nn.ModuleDict({})

        # Layer Normalization before attention
        layer["input_layernorm"] = nn.RMSNorm(self.hidden_size, eps=1e-5, elementwise_affine=True)
        layer["input_layernorm"].weight = nn.Parameter(self.weight[f'model.layers.{layer_idx}.input_layernorm.weight'])

        # Self-Attention
        layer["self_attn"] = nn.ModuleDict({
            "q_proj": nn.Linear(self.hidden_size, self.hidden_size, bias=False),
            "k_proj": nn.Linear(self.hidden_size, self.hidden_size // self.attention_num, bias=False),
            "v_proj": nn.Linear(self.hidden_size, self.hidden_size // self.attention_num, bias=False),
            "o_proj": nn.Linear(self.hidden_size, self.hidden_size, bias=False),
        })

        layer["self_attn"]["q_proj"].weight = nn.Parameter(self.weight[f'model.layers.{layer_idx}.self_attn.q_proj.weight'])
        layer["self_attn"]["k_proj"].weight = nn.Parameter(self.weight[f'model.layers.{layer_idx}.self_attn.k_proj.weight'])
        layer["self_attn"]["v_proj"].weight = nn.Parameter(self.weight[f'model.layers.{layer_idx}.self_attn.v_proj.weight'])
        layer["self_attn"]["o_proj"].weight = nn.Parameter(self.weight[f'model.layers.{layer_idx}.self_attn.o_proj.weight'])

        # Layer Normalization before MLP
        layer["post_attn_layernorm"] = nn.RMSNorm(self.hidden_size, eps=1e-5, elementwise_affine=True)
        layer["post_attn_layernorm"].weight = nn.Parameter(self.weight[f'model.layers.{layer_idx}.post_attention_layernorm.weight'])

        # MLP Block
        layer["mlp"] = nn.ModuleDict({
            "gate_proj": nn.Linear(self.hidden_size, self.intermediate_size),
            "up_proj": nn.Linear(self.hidden_size, self.intermediate_size),
            "down_proj": nn.Linear(self.intermediate_size, self.hidden_size),
            "SILU": nn.SiLU()
        })

        layer["mlp"]["gate_proj"].weight = nn.Parameter(self.weight[f'model.layers.{layer_idx}.mlp.gate_proj.weight'])
        layer["mlp"]["up_proj"].weight = nn.Parameter(self.weight[f'model.layers.{layer_idx}.mlp.up_proj.weight'])
        layer["mlp"]["down_proj"].weight = nn.Parameter(self.weight[f'model.layers.{layer_idx}.mlp.down_proj.weight'])

        return layer

    def load_weights(self, model_name):
        model = AutoModelForCausalLM.from_pretrained(model_name)
        self.weight = {k: v.clone().detach() for k, v in model.state_dict().items()}
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.config = model.config

        del model  # Deleting the model
        torch.cuda.empty_cache()

        self.hidden_size = self.config.hidden_size
        self.num_heads = self.config.num_attention_heads
        self.num_layers = self.config.num_hidden_layers
        self.vocab_size = self.config.vocab_size
        self.intermediate_size = self.config.intermediate_size
        self.attention_num = self.num_heads // self.config.num_key_value_heads #confusion - how to reduce the dimension to 512
        self.rotary_emb = RotaryPositionalEmbeddings(self.hidden_size // self.num_heads).to(self.device)

        # Define embedding layer
        self.embed_tokens = nn.Embedding(self.vocab_size, self.hidden_size).to(self.device)
        self.embed_tokens.weight = nn.Parameter(self.weight['model.embed_tokens.weight'])

        # Define transformer layers
        self.layers = nn.ModuleList([self.create_attn_layer(i).to(self.device) for i in range(self.num_layers)])

        # Define final layer normalization
        self.final_layernorm = nn.RMSNorm(self.hidden_size, eps=1e-5, elementwise_affine=True).to(self.device)
        self.final_layernorm.weight = nn.Parameter(self.weight['model.norm.weight'])

        # Define final language model head
        self.lm_head = nn.Linear(self.hidden_size, self.vocab_size, bias=False).to(self.device)
        self.lm_head.weight = nn.Parameter(self.weight['lm_head.weight'])


    def forward(self, input_ids):
        """Computes logits for a given input sequence using PyTorch only."""
        input_ids = input_ids.to(self.device)
        batch_size, seq_len = input_ids.shape
        x = self.embed_tokens(input_ids)  # [batch, seq_len, hidden_size]

        # Apply transformer layers
        for layer in self.layers:
            residual = x  # Save residual connection

            # Layer Norm before Attention
            x = layer["input_layernorm"](x)

            # Prepare for multi-head attention
            batch_size, seq_len, _ = x.shape
            head_dim = self.hidden_size // self.num_heads
            num_kv_heads = self.num_heads // self.attention_num

            # Project for attention
            q = layer["self_attn"]["q_proj"](x)  # [batch, seq_len, hidden_size]
            k = layer["self_attn"]["k_proj"](x)  # [batch, seq_len, hidden_size // attention_num]
            v = layer["self_attn"]["v_proj"](x)  # [batch, seq_len, hidden_size // attention_num]

            q, k, v = q.to(self.device), k.to(self.device), v.to(self.device)

            # Reshape for multi-head attention
            q = q.view(batch_size, seq_len, self.num_heads, head_dim)
            k = k.view(batch_size, seq_len, num_kv_heads, head_dim)
            v = v.view(batch_size, seq_len, num_kv_heads, head_dim)

            # Rotary embeddings
            q = self.rotary_emb(q)
            k = self.rotary_emb(k)

            # Prepare for attention computation
            # Transpose for attention: [batch, num_heads, seq_len, head_dim]
            q = q.transpose(1, 2)
            k = k.transpose(1, 2)
            v = v.transpose(1, 2)

            # Implement grouped-query attention
            # Repeat k and v for each query group
            if self.attention_num > 1:
                k = k.repeat_interleave(self.attention_num, dim=1)
                v = v.repeat_interleave(self.attention_num, dim=1)

            # Attention scores: [batch, num_heads, seq_len, seq_len]
            attention_scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(head_dim)

            # Causal mask (lower triangular)
            causal_mask = torch.triu(
                torch.ones(seq_len, seq_len, dtype=torch.bool, device=self.device),
                diagonal=1
            )
            attention_scores.masked_fill_(causal_mask, float('-inf'))

            # Attention weights: [batch, num_heads, seq_len, seq_len]
            attention_weights = F.softmax(attention_scores, dim=-1)

            # Apply attention: [batch, num_heads, seq_len, head_dim]
            context = torch.matmul(attention_weights, v)

            # Reshape back: [batch, seq_len, hidden_size]
            context = context.transpose(1, 2).contiguous().view(batch_size, seq_len, self.hidden_size)

            # Output projection
            attn_output = layer["self_attn"]["o_proj"](context)

            # Add residual connection
            x = residual + attn_output

            # Layer Norm before MLP
            residual = x  # Save residual connection
            x = layer["post_attn_layernorm"](x)

            # MLP Block
            gate_out = layer["mlp"]["SILU"](layer["mlp"]["gate_proj"](x))
            up_out = layer["mlp"]["up_proj"](x)
            x = layer["mlp"]["down_proj"](gate_out * up_out)

            # Add residual connection
            x = residual + x

        # Apply final normalization
        x = self.final_layernorm(x)

        logits = self.lm_head(x)  # Compute logits
        return logits


    def generate(self, prompt, max_length=512):
        """Generates text token-by-token using greedy decoding."""
        self.eval()
        input_ids = self.tokenizer(prompt, return_tensors='pt').input_ids.to(self.device)

        generated = input_ids.clone()

        with torch.no_grad():  # Disable gradient computation
            for _ in range(max_length):
                logits = self.forward(generated)
                next_token = torch.argmax(logits[:, -1, :], dim=-1, keepdim=True)

                generated = torch.cat([generated, next_token], dim=1)

                if next_token.item() == self.tokenizer.eos_token_id:
                    break

        return self.tokenizer.decode(generated.squeeze(), skip_special_tokens=True)


In [None]:
def main():
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = SimpleLLM("meta-llama/Llama-3.2-1B", device = device)
    model.to("cuda")
    output = model.generate("Once upon a time in a galaxy far, far away", max_length=25)
    print(output)

if __name__ == "__main__":
    main()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Once upon a time in a galaxy far, far away from the earth, the sun was shining, the sun was the sun was the sun was the sun was the time was the


In [None]:
# mean squared error = (x- x_true)^2
# variance = (x - x_mean)^2