<img src='sharif_logo.png' alt="SUT logo" width=150 height=150 align=left class="saturate" >

<br>
<font face="Times New Roman">
<div dir=ltr align=center>
<font color=0F5298 size=7>
 Deep Learning <br>
<font color=2565AE size=5>
Computer Engineering Department - Spring 2025  <br>
<font color=3C99D size=5>
          Homework 3: Practical - GPT2 from Scratch! <br>
<font color=696880 size=4>
            Designer: Shaygan Adim
    
    

**Name**:  
  
**Student Code**:

# Overview

In this project, you will implement a scaled-down version of OpenAI's GPT-2 architecture from scratch using PyTorch. You'll train this model on the Snappfood comments with sentiment labels. The goal is to create a generative language model that can produce synthetic Persian comments with controllable sentiment (positive or negative).

# Learning Objectives

* Understanding and implementing transformer-based language model architectures  
* Learning how to control text generation using special tokens  
* Visualizing and analyzing training progress  

# Dataset

You'll work with a Persian dataset containing Snappfood comments:  

* The dataset texts are normalized (No need for any normalizations)
* Each comment has a sentiment label (1 for positive, 0 for negative)
* The dataset contains text with variations in length and style

# Tokenizer and model

You should use one of the sota open-source LLM tokenizers. I strongly recommend using Llama 3.3 tokenizer or Gemma-2 tokenizer as they're better than the others in Persian language. (There is no need to implement a tokenizer yourself.)

Your model should have the exact srtructure of GPT-2:  
  
<img src="GPT-2.png" alt="" width="600" height="800">
  
For the model to be able to smoothly be trained, you should use the config below:

* **Embedding Dimension**: 192 (reduced from 768 in original GPT-2)
* **Layers**: 3 transformer blocks (reduced from 12 in original GPT-2)
* **Attention Heads**: 3 (reduced from 12 in original GPT-2)
* **Context Window**: 128 tokens (reduced from 1024 in original GPT-2)

Moreover, unlike the original Transformer paper that used fixed sinusoidal position encodings, GPT-2 (and your implementation) should use learnable position embeddings:
1. You should create an embedding table of size [n_positions, n_embd] where:

    * n_positions is the maximum sequence length (128 in our model)
    * n_embd is the embedding dimension (192 in our model)
2. For each position in the sequence (0 to sequence_length-1), we look up the corresponding embedding vector.

3. These position embeddings are added to the token embeddings before being passed through the transformer blocks.

# Notes:

* Be aware that you will be questioned about your solution to this assignment in-person. Thus, build a solid understanding through out solving this assignment.
* Using ChatGPT and other LLMs are allowed but you should be able to explain every line of your code completely.
* You need GPU for this assignment. Use can use Colab or Kaggle for free.
* I highly recommend using the exact same hyperparameters and settings provided to match expected results.


# Importing

In [None]:
# Data loading and manipulation
import kagglehub
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

# Tokenization utilities
from transformers import AutoTokenizer
from huggingface_hub import login

# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

# Visualization tool
import matplotlib.pyplot as plt

# Runtime utilities
import math
import time
from tqdm import tqdm

# Typing tool
from typing import Dict, Optional, Any

# Downloading and loading the data

In this section we read and load the data from [here](https://www.kaggle.com/datasets/mohammad1ziyar/cleaned-snappfood-persian-sentiment-analysis).

You can also see some information about the data in the next cell. In the end, we only want the label and cleaned columns.

In [None]:

# Download the dataset using kagglehub

path = kagglehub.dataset_download("mohammad1ziyar/cleaned-snappfood-persian-sentiment-analysis")

print("Path to dataset files:", path)

raw_corpus = pd.read_csv(path + "/cleaned_snappfood.csv")


In [None]:
raw_corpus.info()
raw_corpus.head()

In [None]:
raw_corpus = raw_corpus[["comment_cleaned", "label"]]

# Downloading and loading the tokenizer (5 Points)

In this section you need to load your tokenizer from hugging face. I recommend [this](https://huggingface.co/meta-llama/Llama-3.3-70B-Instruct) or [this](https://huggingface.co/google/gemma-2-27b-it).
Keep in mind that you might need to login first using your hugging face access token and also sign an agreement thing in model's page to be able to access the model and it's tokenizer.

In [None]:

# Initialize a tokenizer (login if needed)

# login()  # Uncomment and provide your token if required
# Choose a tokenizer with good Persian coverage
model_checkpoint = "meta-llama/Llama-3.2-1B"

# Use padding side left for autoregressive training
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token


In the cell below, add two special tokens to the vocabulary of the tokenizer indicating positivity or negativity of a comment. We will add these tokens manually as the first token of each comment so model will understand the difference between positive and negative comments.

In [None]:

# Add custom special tokens to the tokenizer

special_tokens = {"additional_special_tokens": ["<|positive|>", "<|negative|>"]}
tokenizer.add_special_tokens(special_tokens)


## Dataset and Dataloader (10 Points)

Create a custom Dataset class for the data

In [None]:

class CommentDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_length=128):
        self.tokenizer = tokenizer
        self.comments = dataframe['comment_cleaned'].tolist()
        self.labels = dataframe['label'].tolist()
        self.max_length = max_length

    def __len__(self) -> int:
        return len(self.comments)

    def __getitem__(self, idx: int) -> dict:
        sentiment_prefix = "<|positive|>" if self.labels[idx] == 1 else "<|negative|>"
        text = sentiment_prefix + " " + self.comments[idx]
        enc = self.tokenizer(
            text,
            max_length=self.max_length,
            truncation=True,
            padding='max_length',
            return_tensors='pt'
        )
        item = {k: v.squeeze(0) for k, v in enc.items()}
        item['labels'] = item['input_ids'].clone()
        return item


Create train and validation datasets and dataloaders and also split the data

In [None]:

# Prepare the training and validation datasets and dataloaders

train_df, val_df = train_test_split(raw_corpus, test_size=0.1, random_state=42, stratify=raw_corpus['label'])

train_dataset = CommentDataset(train_df, tokenizer)
val_dataset = CommentDataset(val_df, tokenizer)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)


## Model implementation (35 Points)

In this section you should implement the model architecture completely from scratch. No pre-defined torch or other libraries tools are allowed. (Even for the attention mechanism)

In [None]:
class GPT2Config:
    def __init__(
        self,
        vocab_size,
        n_positions,  
        n_embd,
        n_layer,
        n_head,
        n_inner=None,
        activation_function="gelu",
        resid_pdrop=0.1,
        embd_pdrop=0.1,
        attn_pdrop=0.1,
        layer_norm_epsilon=1e-5,
        initializer_range=0.02,
        bos_token_id=None,
        eos_token_id=None,
    ):
        self.vocab_size = vocab_size
        self.n_positions = n_positions
        self.n_embd = n_embd
        self.n_layer = n_layer
        self.n_head = n_head
        self.n_inner = 4 * n_embd if n_inner is None else n_inner
        self.activation_function = activation_function
        self.resid_pdrop = resid_pdrop
        self.embd_pdrop = embd_pdrop
        self.attn_pdrop = attn_pdrop
        self.layer_norm_epsilon = layer_norm_epsilon
        self.initializer_range = initializer_range
        self.bos_token_id = bos_token_id
        self.eos_token_id = eos_token_id

In [None]:

class CausalSelfAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.n_head = config.n_head
        self.n_embd = config.n_embd
        self.dropout = config.attn_pdrop

        self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd)
        self.c_proj = nn.Linear(config.n_embd, config.n_embd)
        self.attn_drop = nn.Dropout(self.dropout)
        self.resid_drop = nn.Dropout(config.resid_pdrop)

        self.register_buffer("bias", torch.tril(torch.ones(config.n_positions, config.n_positions)).view(1, 1, config.n_positions, config.n_positions))

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        B, T, C = x.size()
        qkv = self.c_attn(x)
        q, k, v = qkv.split(C, dim=2)

        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)

        att = (q @ k.transpose(-2, -1)) / math.sqrt(k.size(-1))
        att = att.masked_fill(self.bias[:, :, :T, :T] == 0, float('-inf'))
        att = torch.softmax(att, dim=-1)
        att = self.attn_drop(att)

        y = att @ v
        y = y.transpose(1, 2).contiguous().view(B, T, C)
        y = self.resid_drop(self.c_proj(y))
        return y


In [None]:

class MLP(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.c_fc = nn.Linear(config.n_embd, config.n_inner)
        self.c_proj = nn.Linear(config.n_inner, config.n_embd)
        self.dropout = nn.Dropout(config.resid_pdrop)
        self.act = nn.GELU()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.c_fc(x)
        x = self.act(x)
        x = self.c_proj(x)
        x = self.dropout(x)
        return x


In [None]:

class Block(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.ln_1 = nn.LayerNorm(config.n_embd, eps=config.layer_norm_epsilon)
        self.attn = CausalSelfAttention(config)
        self.ln_2 = nn.LayerNorm(config.n_embd, eps=config.layer_norm_epsilon)
        self.mlp = MLP(config)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = x + self.attn(self.ln_1(x))
        x = x + self.mlp(self.ln_2(x))
        return x


In [None]:

class GPT2(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.transformer = nn.ModuleDict({
            'wte': nn.Embedding(config.vocab_size, config.n_embd),
            'wpe': nn.Embedding(config.n_positions, config.n_embd),
            'drop': nn.Dropout(config.embd_pdrop),
            'h': nn.ModuleList([Block(config) for _ in range(config.n_layer)]),
            'ln_f': nn.LayerNorm(config.n_embd, eps=config.layer_norm_epsilon),
        })
        self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
        # tie weights
        self.lm_head.weight = self.transformer['wte'].weight

    def forward(self, input_ids: torch.Tensor, attention_mask: torch.Tensor = None, labels: torch.Tensor = None):
        B, T = input_ids.size()
        device = input_ids.device
        pos = torch.arange(0, T, dtype=torch.long, device=device).unsqueeze(0)

        tok_emb = self.transformer['wte'](input_ids)
        pos_emb = self.transformer['wpe'](pos)
        x = tok_emb + pos_emb
        x = self.transformer['drop'](x)

        for block in self.transformer['h']:
            x = block(x)
        x = self.transformer['ln_f'](x)

        logits = self.lm_head(x)

        loss = None
        if labels is not None:
            # shift for causal LM
            loss_fct = nn.CrossEntropyLoss(ignore_index=self.config.ignore_index)
            loss = loss_fct(logits.view(-1, logits.size(-1)), labels.view(-1))

        return logits, loss


## Train and evaluation (25 Points)

Now you should implement the train and evaluation functions.

In [None]:

def train_epoch(model: nn.Module, data_loader: DataLoader, optimizer: torch.optim.Optimizer, 
               scheduler, device: torch.device, log_interval: int) -> tuple:
    model.train()
    total_loss = 0.0
    step_nums = []
    step_losses = []

    for step, batch in enumerate(data_loader):
        input_ids = batch['input_ids'].to(device)
        attn_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        optimizer.zero_grad()
        logits, loss = model(input_ids, attn_mask, labels)
        loss.backward()
        optimizer.step()

        if scheduler is not None:
            scheduler.step()

        total_loss += loss.item()
        if step % log_interval == 0:
            step_nums.append(step)
            step_losses.append(loss.item())

    return total_loss / len(data_loader), step_nums, step_losses


def evaluate(model: nn.Module, data_loader: DataLoader, device: torch.device) -> tuple:
    model.eval()
    total_loss = 0.0
    step_nums = []
    step_losses = []

    with torch.no_grad():
        for step, batch in enumerate(data_loader):
            input_ids = batch['input_ids'].to(device)
            attn_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            logits, loss = model(input_ids, attn_mask, labels)
            total_loss += loss.item()
            step_nums.append(step)
            step_losses.append(loss.item())

    return total_loss / len(data_loader), step_nums, step_losses


In [None]:
config = GPT2Config(
    vocab_size=len(tokenizer),
    n_positions=128,
    n_embd=192,
    n_layer=3,
    n_head=3,
    bos_token_id=tokenizer.bos_token_id,
    eos_token_id=tokenizer.eos_token_id
)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = GPT2(config)
model.to(device)

Now train the model for 3 to 5 epochs. It's recommended to use a suitable learning rate scheduler (For example, cosine). Also save training and validation loss periodically.

In [None]:

# Training loop

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GPT2(config).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=5 * len(train_loader))

num_epochs = 3
log_interval = 10

epoch_train_losses = []
epoch_val_losses = []
train_step_nums = []
train_step_losses = []
val_step_nums = []
val_step_losses = []

for epoch in range(num_epochs):
    train_loss, tr_steps, tr_losses = train_epoch(model, train_loader, optimizer, scheduler, device, log_interval)
    val_loss, v_steps, v_losses = evaluate(model, val_loader, device)

    epoch_train_losses.append(train_loss)
    epoch_val_losses.append(val_loss)

    train_step_nums.extend([s + epoch * len(train_loader) for s in tr_steps])
    train_step_losses.extend(tr_losses)
    val_step_nums.extend([s + epoch * len(val_loader) for s in v_steps])
    val_step_losses.extend(v_losses)

    print(f"Epoch {epoch+1}: Train Loss={train_loss:.4f}, Val Loss={val_loss:.4f}")


Plot the training and validation loss in each epoch and also in each steps you saved before.

In [None]:

# Visualize the training and validation results

fig, axes = plt.subplots(1, 3, figsize=(18, 4))

# epoch losses
axes[0].plot(epoch_train_losses, label='Train')
axes[0].plot(epoch_val_losses, label='Val')
axes[0].set_title('Epoch Loss')
axes[0].legend()
axes[0].grid(True)

# train step losses
axes[1].plot(train_step_nums, train_step_losses, label='Train step')
axes[1].set_title('Train Step Loss')
axes[1].grid(True)

# val step losses
axes[2].plot(val_step_nums, val_step_losses, label='Val step')
axes[2].set_title('Val Step Loss')
axes[2].grid(True)

plt.show()


# Inference (15 Points)

Complete the function below to generate comments (positive or negative) 

In [None]:

def generate_comment(model: nn.Module, tokenizer: Any, sentiment: str, max_length: int = 50) -> str:
    sentiment_token = "<|positive|>" if sentiment == 'positive' else "<|negative|>"
    prompt = sentiment_token
    input_ids = tokenizer(prompt, return_tensors='pt').input_ids.to(next(model.parameters()).device)

    model.eval()
    generated = input_ids
    with torch.no_grad():
        for _ in range(max_length):
            logits, _ = model(generated)
            next_token_logits = logits[:, -1, :]
            next_token = torch.argmax(next_token_logits, dim=-1, keepdim=True)
            generated = torch.cat([generated, next_token], dim=1)
            if next_token.item() == tokenizer.eos_token_id:
                break

    text = tokenizer.decode(generated[0], skip_special_tokens=True)
    return text


Generate 10 positive and 10 negative comments and evaluate your results

In [None]:

# Demonstrate the model's sentiment-controlled text generation

model.eval()

for _ in range(10):
    pos_comment = generate_comment(model, tokenizer, 'positive')
    neg_comment = generate_comment(model, tokenizer, 'negative')
    print(f"POS: {pos_comment}")
    print(f"NEG: {neg_comment}")
    print('-'*40)


## Inference Time Hyperparameters (10 Points)

Play with these parameters for the best results:
  
temperature, top_k, top_p

Briefly report what you saw and try to explain why is it happening. What is the effect of each one?

**Your Report**:  