
#### Introduction

>   Sentiment analysis systems, as one of the most important applications of natural language processing, play a vital role in automated understanding of emotions and opinions in texts. These systems are now used in various fields such as social surveys, customer feedback analysis, brand reputation management, and even market research.
>   
>   Large language models like GPT have created a remarkable transformation in the field of natural language processing. These models, with their transformer-based architecture and training on vast amounts of textual data, acquire the ability to understand the most complex language patterns. The GPT model family, from GPT-1 to GPT-4, with self-supervised learning and unidirectional attention, have the ability to generate coherent texts and answer questions.
>   
>   In this exercise, we use GPT-2 (a balanced version in terms of size and efficiency) to perform sentiment analysis on the IMDb dataset. Unlike traditional methods that use specialized classifiers like BERT, here we formulate the problem as conditional text generation; meaning that after reading a review, the model generates the word "positive" or "negative" as a response.
>   
>   The purpose of this exercise is to become familiar with fine-tuning pre-trained models, understand the differences between generative versus discriminative approaches, and examine the challenges of using GPT for classification tasks. Finally, we will evaluate the trained model with Accuracy and Perplexity metrics (which measure the model's confidence in predictions) and compare the results with baseline methods.
>   
#### 2-1. Paper Introduction (10 points)
>   
>   To become familiar with the theoretical foundations of transformer models, particularly the GPT architecture, read the paper from [this link](https://arxiv.org/abs/2005.14165) and answer the following in your report:
>   1. How does the unidirectional attention architecture in GPT work?
>   2. What are the key differences between GPT and BERT architectures?
>   3. How are pre-training and fine-tuning methods performed in GPT?
>   
#### 2-2. Data Preprocessing (30 points)
>   
>   In this section, we use the IMDb dataset which includes 50,000 movie reviews with positive and negative labels. This dataset is accessible through the datasets library:
>   ```python
>   # Load and prepare dataset
>   dataset = load_dataset("imdb")
>   ```
>   
>   a) Examine and display the class distribution, analyze text length with mean, median, maximum, and minimum metrics, and show a sample of raw data. For simplicity, you can use shorter reviews (minimum 500 characters).
>   
>   b) For formatting the data, use this format:
>   ```
>   "Review: {review text}\nSentiment: {label}"
>   ```
>   Convert numerical labels to text:
>   ```python
>   1 → "positive", 0 → "negative"
>   ```
>   
>   c) Use the GPT-2 tokenizer for tokenizing data:
>   ```python
>   tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
>   tokenizer.pad_token = tokenizer.eos_token
>   ```
>   
>   d) Create a data loader, find the appropriate batch size, and you can access these data by calling the train and test columns. To speed up the work, use 5000 samples from the train data and 1000 samples from the test data.
>   
#### 2-3. Model Implementation (50 points)
>   
>   In this part of the project, you are responsible for implementing a GPT-2 based model. This model should be implemented according to its specific architecture and the following important points:
>   ```python
>   model = GPT2LMHeadModel.from_pretrained("gpt2")
>   ```
>   
>   GPT-2 is one of the famous and powerful models in natural language processing. To start, you should use a pre-trained version of this model. This allows you to take advantage of the model's pre-learned knowledge and focus more on training the upper layers.
>   
>   One of the key points in this implementation is freezing the base layers of the model. With this, you will only train the upper (thinner) layers. This method usually helps increase training speed and prevent overfitting. By freezing the base layers, you can ensure that the model's basic features are preserved and only more specific aspects of the data are learned.
>   
>   To optimize the model's performance, adjusting hyperparameters such as learning rate, number of epochs, and batch sizes is very important. You should find the best combination of these parameters through initial experiments.
>   
>   In this implementation, the loss function should only be calculated for the labels. This means that you should focus only on the target data and ensure that the model is optimized for learning this data.

#### 2-4. Evaluation and Analysis of Results (10 points)

>   Now evaluate the model with accuracy metrics (which show how much of the model's predictions match the actual labels) and perplexity (which measures the model's confidence in predictions and the lower it is, the higher the confidence). Also, after calculating Recall and Precision, display its confusion matrix.
>   
>   Explain what the difference is between this method and using models like BERT, which is specifically built for classification? And examine the strengths and weaknesses of each method.

#### 2-5. Bonus (10 points)


>   Another approach for training pre-trained GPT models is using the LoRA (Low-Rank Adaptation) method with fewer resources while maintaining performance. LoRA adds trainable low-rank matrices to the model's original weights, allowing the model to be tuned without directly changing the pre-trained weights, reducing the number of parameters that need to be updated. Instead of freezing layers and limited use of the last two layers, try implementing the exercise with this technique.

In [None]:
from __future__ import annotations

import argparse
import gc
import math
import os
import random
from dataclasses import dataclass
from pathlib import Path
from typing import List, Tuple

import numpy as np
import torch
import torch.nn as nn
from datasets import load_dataset
from sklearn.metrics import (
    accuracy_score,
    confusion_matrix,
    precision_recall_fscore_support,
)
from torch.cuda.amp import GradScaler, autocast
from torch.utils.data import DataLoader, Dataset
from tqdm.auto import tqdm
from transformers import (
    AdamW,
    GPT2LMHeadModel,
    GPT2Tokenizer,
    get_linear_schedule_with_warmup,
)

# ---------------------------------------------------------------------------
# 1. Configuration ----------------------------------------------------------------
# ---------------------------------------------------------------------------

@dataclass
class CFG:
    model_name: str = "gpt2"
    max_length: int = 512
    batch_size: int = 16
    epochs: int = 3
    lr: float = 3e-5
    weight_decay: float = 0.01
    warmup_ratio: float = 0.1
    grad_accum: int = 2
    grad_clip: float = 1.0
    seed: int = 42

    # data
    train_samples: int = 5_000
    test_samples: int = 1_000
    min_text_len: int = 100

    # LoRA
    lora_rank: int = 8
    lora_alpha: int = 16
    lora_dropout: float = 0.05

    # device
    device: torch.device = (
        torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
    )

cfg = CFG()

# ---------------------------------------------------------------------------
# 2. Reproducibility helpers -------------------------------------------------
# ---------------------------------------------------------------------------

def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)


set_seed(cfg.seed)

# ---------------------------------------------------------------------------
# 3. Data --------------------------------------------------------------------
# ---------------------------------------------------------------------------

def analyse_imdb(ds):
    train, test = ds["train"], ds["test"]
    neg, pos = np.bincount(train["label"])
    print(
        f"Train: {len(train):,}  Test: {len(test):,}  «neg» {neg:,}  «pos» {pos:,}"
    )


class IMDbDataset(Dataset):
    """Simple dataset → returns dict with input_ids, attention_mask, labels, label"""

    def __init__(
        self,
        rows: List[dict],
        tokenizer: GPT2Tokenizer,
        max_len: int,
        training: bool = True,
    ):
        self.rows = rows
        self.tok = tokenizer
        self.max_len = max_len
        self.training = training

        # cache helper tokens
        self.positive_id = self.tok.encode("positive", add_special_tokens=False)[0]
        self.negative_id = self.tok.encode("negative", add_special_tokens=False)[0]
        self.sentinel_ids = self.tok.encode("Sentiment:", add_special_tokens=False)

    # ---------------------------------------------------------------------
    @staticmethod
    def _find_sublist(haystack: List[int], needle: List[int]) -> int | None:
        """Return index of *first* element of needle in haystack or None."""
        for i in range(len(haystack) - len(needle) + 1):
            if haystack[i : i + len(needle)] == needle:
                return i
        return None

    # ---------------------------------------------------------------------
    def __getitem__(self, idx):
        row = self.rows[idx]
        text = row["text"]
        label = row["label"]
        label_word = "positive" if label == 1 else "negative"
        prompt = f"Review: {text}\nSentiment: {label_word}"

        enc = self.tok(
            prompt,
            padding="max_length",
            truncation=True,
            max_length=self.max_len,
            add_special_tokens=False,  # GPT‑2 has no BOS/EOS tokens
            return_tensors="pt",
        )
        input_ids = enc.input_ids.squeeze(0)
        attention = enc.attention_mask.squeeze(0)

        # label masking
        if self.training:
            labels = torch.full_like(input_ids, -100)
            idx0 = self._find_sublist(input_ids.tolist(), self.sentinel_ids)
            if idx0 is not None and idx0 + len(self.sentinel_ids) < len(labels):
                target_pos = idx0 + len(self.sentinel_ids)  # first token after "Sentiment:"
                labels[target_pos] = (
                    self.positive_id if label == 1 else self.negative_id
                )
        else:
            # for validation we just compute LM loss across sequence
            labels = input_ids.clone()
        return {
            "input_ids": input_ids,
            "attention_mask": attention,
            "labels": labels,
            "label": torch.tensor(label, dtype=torch.long),
        }

    # ---------------------------------------------------------------------
    def __len__(self):
        return len(self.rows)


# ---------------------------------------------------------------------------
# 4. LoRA helper -------------------------------------------------------------
# ---------------------------------------------------------------------------

class LoRALinear(nn.Module):
    """LoRA wrapper per https://arxiv.org/abs/2106.09685"""

    def __init__(self, base: nn.Linear, r: int, alpha: int, dropout: float):
        super().__init__()
        self.base = base
        self.r = r
        self.scale = alpha / r
        self.drop = nn.Dropout(dropout) if dropout > 0 else nn.Identity()

        in_f, out_f = base.in_features, base.out_features
        # Init as suggested → A random, B zeros
        self.A = nn.Parameter(torch.randn(r, in_f) * (1 / math.sqrt(r)))
        self.B = nn.Parameter(torch.zeros(out_f, r))

        # mark so we can pick trainable params
        self.A._is_lora = True
        self.B._is_lora = True

        # freeze base weights
        for p in self.base.parameters():
            p.requires_grad = False

    def forward(self, x):
        return self.base(x) + (self.drop(x) @ self.A.T @ self.B.T) * self.scale


class GPT2WithLoRA(nn.Module):
    """GPT‑2 with LoRA applied to qkv and proj matrices."""

    TARGETS = ("c_attn", "c_proj")  # names used inside GPT‑2 blocks

    def __init__(self, r: int, alpha: int, dropout: float):
        super().__init__()
        self.base = GPT2LMHeadModel.from_pretrained(cfg.model_name)
        self._apply_lora(r, alpha, dropout)

    # ------------------------------------------------------------------
    def _apply_lora(self, r: int, alpha: int, dropout: float):
        rep_names = [n for n, m in self.base.named_modules() if isinstance(m, nn.Linear) and any(t in n for t in self.TARGETS)]
        for name in rep_names:
            parent_name, child_name = name.rsplit(".", 1)
            parent_mod = dict(self.base.named_modules())[parent_name]
            orig_layer: nn.Linear = getattr(parent_mod, child_name)
            setattr(parent_mod, child_name, LoRALinear(orig_layer, r, alpha, dropout))
        print(f"LoRA applied to {len(rep_names)} linear layers (rank={r}).")

    # ------------------------------------------------------------------
    def forward(self, **kwargs):
        return self.base(**kwargs)

    # convenience
    def parameters_trainable(self):
        return [p for p in self.parameters() if p.requires_grad]


# ---------------------------------------------------------------------------
# 5. Training utils ----------------------------------------------------------
# ---------------------------------------------------------------------------

def build_loaders(tokenizer: GPT2Tokenizer) -> Tuple[DataLoader, DataLoader, List[dict]]:
    ds_full = load_dataset("imdb")
    analyse_imdb(ds_full)

    # filter + balanced subsample
    train_rows = [r for r in ds_full["train"] if len(r["text"]) >= cfg.min_text_len]
    test_rows = [r for r in ds_full["test"] if len(r["text"]) >= cfg.min_text_len]

    random.shuffle(train_rows)
    random.shuffle(test_rows)

    # balanced sample train
    pos_rows = [r for r in train_rows if r["label"] == 1][: cfg.train_samples // 2]
    neg_rows = [r for r in train_rows if r["label"] == 0][: cfg.train_samples // 2]
    train_sel = pos_rows + neg_rows
    random.shuffle(train_sel)

    test_sel = test_rows[: cfg.test_samples]

    train_ds = IMDbDataset(train_sel, tokenizer, cfg.max_length, training=True)
    test_ds = IMDbDataset(test_sel, tokenizer, cfg.max_length, training=False)

    loader_kw = dict(
        batch_size=cfg.batch_size,
        num_workers=0,
        pin_memory=torch.cuda.is_available(),
    )

    return (
        DataLoader(train_ds, shuffle=True, **loader_kw),
        DataLoader(test_ds, shuffle=False, **loader_kw),
        test_sel,
    )


# ---------------------------------------------------------------------------
# 6. Trainer -----------------------------------------------------------------
# ---------------------------------------------------------------------------

class Trainer:
    def __init__(self, model: nn.Module, tokenizer: GPT2Tokenizer):
        self.model = model.to(cfg.device)
        self.tok = tokenizer

        self.scaler = GradScaler(enabled=torch.cuda.is_available())
        self.opt = AdamW(
            model.parameters_trainable() if hasattr(model, "parameters_trainable") else (p for p in model.parameters() if p.requires_grad),
            lr=cfg.lr,
            weight_decay=cfg.weight_decay,
        )
        self.scheduler = None  # filled later

        # cache ids once
        self.pos_id = self.tok.encode("positive", add_special_tokens=False)[0]
        self.neg_id = self.tok.encode("negative", add_special_tokens=False)[0]

    # --------------------------------------------------------------
    def loss_fn(self, logits, labels):
        shift_logits = logits[..., :-1, :].contiguous()
        shift_labels = labels[..., 1:].contiguous()
        loss = nn.functional.cross_entropy(
            shift_logits.view(-1, shift_logits.size(-1)),
            shift_labels.view(-1),
            ignore_index=-100,
        )
        return loss

    # --------------------------------------------------------------
    def train_epoch(self, loader: DataLoader):
        self.model.train()
        total, steps = 0.0, 0
        for step, batch in enumerate(tqdm(loader, desc="train")):
            for k in batch:
                batch[k] = batch[k].to(cfg.device)

            with autocast(enabled=torch.cuda.is_available()):
                out = self.model(
                    input_ids=batch["input_ids"],
                    attention_mask=batch["attention_mask"],
                )
                loss = self.loss_fn(out.logits, batch["labels"]) / cfg.grad_accum

            self.scaler.scale(loss).backward()
            if (step + 1) % cfg.grad_accum == 0:
                self.scaler.unscale_(self.opt)
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), cfg.grad_clip)
                self.scaler.step(self.opt)
                self.scaler.update()
                self.opt.zero_grad(True)
                if self.scheduler:
                    self.scheduler.step()
            total += loss.item() * cfg.grad_accum
            steps += 1
        return total / steps

    # --------------------------------------------------------------
    @torch.no_grad()
    def predict_batch(self, texts: List[str], bs: int = 8) -> List[int]:
        self.model.eval()
        preds = []
        for i in range(0, len(texts), bs):
            batch_texts = texts[i : i + bs]
            prompts = [f"Review: {t}\nSentiment:" for t in batch_texts]
            enc = self.tok(
                prompts,
                padding=True,
                truncation=True,
                max_length=cfg.max_length,
                return_tensors="pt",
            ).to(cfg.device)
            logits = self.model(**enc).logits

            # index of last non‑pad token per sample
            last = enc.attention_mask.sum(1) - 1
            for j, l in enumerate(last):
                logit = logits[j, l, :]
                pred = 1 if (logit[self.pos_id] - logit[self.neg_id]).item() > 0 else 0
                preds.append(pred)
        return preds

    # --------------------------------------------------------------
    def validate(self, loader: DataLoader, raw_rows: List[dict]):
        self.model.eval()
        # LM loss on validation split
        v_loss, n = 0.0, 0
        for batch in loader:
            for k in batch:
                batch[k] = batch[k].to(cfg.device)
            with torch.no_grad():
                out = self.model(
                    input_ids=batch["input_ids"],
                    attention_mask=batch["attention_mask"],
                )
                v_loss += self.loss_fn(out.logits, batch["labels"]).item()
                n += 1
        v_loss /= n
        try:
            ppl = math.exp(min(v_loss, 10))
        except OverflowError:
            ppl = float("inf")

        preds = self.predict_batch([r["text"] for r in raw_rows])
        true = [r["label"] for r in raw_rows]
        acc = accuracy_score(true, preds)
        prec, rec, f1, _ = precision_recall_fscore_support(true, preds, average="weighted")
        return {
            "loss": v_loss,
            "perplexity": ppl,
            "accuracy": acc,
            "precision": prec,
            "recall": rec,
            "f1": f1,
            "preds": preds,
            "true": true,
        }


# --------------------------------------------------------------------------
# 7. Main ------------------------------------------------------------------
# --------------------------------------------------------------------------
tokenizer = GPT2Tokenizer.from_pretrained(cfg.model_name)
tokenizer.pad_token = tokenizer.eos_token  # needed for padding
train_loader, val_loader, val_rows = build_loaders(tokenizer)
if args.use_lora:
    model = GPT2WithLoRA(cfg.lora_rank, cfg.lora_alpha, cfg.lora_dropout)
else:
    model = GPT2LMHeadModel.from_pretrained(cfg.model_name)
    # freeze all but last 2 transformer layers
    for name, param in model.named_parameters():
        param.requires_grad = False
    for block in model.transformer.h[-2:]:
        for p in block.parameters():
            p.requires_grad = True
print(
    f"Trainable params: {sum(p.numel() for p in model.parameters() if p.requires_grad):,} /"
    f" {sum(p.numel() for p in model.parameters()):,}"
)
trainer = Trainer(model, tokenizer)
total_steps = len(train_loader) * cfg.epochs // cfg.grad_accum
warmup = int(cfg.warmup_ratio * total_steps)
trainer.scheduler = get_linear_schedule_with_warmup(
    trainer.opt, warmup, total_steps
)
best_acc = 0.0
for epoch in range(cfg.epochs):
    print(f"\nEpoch {epoch + 1}/{cfg.epochs}")
    train_loss = trainer.train_epoch(train_loader)
    metrics = trainer.validate(val_loader, val_rows)
    print(
        f"train_loss {train_loss:.4f}  val_loss {metrics['loss']:.4f}  "
        f"acc {metrics['accuracy']:.3f}  f1 {metrics['f1']:.3f}  "
        f"ppl {metrics['perplexity']:.2f}"
    )
    # save best
    if metrics["accuracy"] > best_acc:
        best_acc = metrics["accuracy"]
        ckpt = {
            "model": model.state_dict(),
            "tok": tokenizer,
            "cfg": cfg.__dict__,
        }
        torch.save(ckpt, "best_lora_gpt2.pt")
        print("✓ saved best model")
    torch.cuda.empty_cache(); gc.collect()
  # confusion matrix
  cm = confusion_matrix(metrics["true"], metrics["preds"])
  print("Confusion matrix:")
  print(cm)



p = argparse.ArgumentParser()
p.add_argument("--use_lora", dest="use_lora", action="store_true", help="train with LoRA (default)")
p.add_argument("--no_lora", dest="use_lora", action="store_false", help="fine‑tune GPT‑2 head only")
p.set_defaults(use_lora=True)
p.parse_args()