# Homework 3 - Part 2: Implementing LoRA From Scratch (70 points)


By the end of this assignment, you will:

- Understand the theory behind Low-Rank Adaptation (LoRA)
- Implement a LoRA layer from scratch using PyTorch
- Apply LoRA to modify a pre-trained RoBERTa model
- Train a LoRA model and compute accuracy

## Background
LLMs have become increasingly powerful but also increasingly parameter-heavy, making fine-tuning computationally expensive.

# Instructions

Please make sure you look carefully at all codes.
The parts that you need to do are either annotated with `TODO` or `<<< YOUR ANSWER/CODE HERE >>>` makers

Submit the notebook as a .ipynb file through GradeScope.

Make sure that the notebook is running without any errors before submission. Remove any unnecessary outputs or additional `print` or debugging statements that you put in the code before submission.

### Write your name and NetID below.

**Name:**    Yuan Chang

**NetID:**   yc2238_

### Acknowledgement

The assignment is designed by TA Yilun Zhao, with help and guidance from Arman Cohan.

## Part 0: Question: What exaclty makes full fine-tuning expensive?  (3 points)

In [None]:
# TODO: You answer to the above question here. Replace the Raise statement

your_answer = """because the full fine tuning requires the model to recaculate all the parameter and do the backpropagation on every parameter which makes the cost huge. Also it will
cost a lot of storage resourse to load all the parameter."""  # <<< YOUR ANSWER HERE >>>

if your_answer == "":
  raise NotImplementedError()




## Part 1: Implementing LoRA Layer (15 points)

Implement a LoRA layer from scratch in PyTorch. Your implementation should:

Create a class LoRALayer that:

- Takes input dimension, output dimension, rank, and alpha scaling factor as parameters
- Initializes matrices A and B using appropriate initialization schemes
- Implements the forward pass that computes the low-rank adaptation


Create a class LinearWithLoRA that:

- Wraps around a pre-existing nn.Linear layer
- Adds the LoRA adaptation to the original linear transformation
- Implements the forward pass to combine outputs from the original linear layer and the LoRA adaptation

We will provide you with starter code below:

In [None]:
!pip install transformers datasets

In [None]:
# imports
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from transformers import Trainer, TrainingArguments
from datasets import load_dataset
from functools import partial
import numpy as np
from sklearn.metrics import accuracy_score

In [None]:
# --- Task 1: Implement the LoRA Layer ---
class LoRALayer(torch.nn.Module):
    def __init__(self, in_dim, out_dim, rank, alpha):
        super().__init__()
        self.rank = rank
        self.alpha = alpha

        # --- TODO 1.1: Initialize LoRA Matrices A and B ---
        # Initialize matrix 'A' (shape: in_dim x rank) with values drawn from a normal
        # distribution. A common practice is to scale the standard deviation.
        # Hint: Use nn.Parameter() to make these trainable tensors.
        # Hint: std_dev = 1 / torch.sqrt(torch.tensor(rank).float()) might be useful.

        # Initialize matrix 'B' (shape: rank x out_dim) with zeros.
        # Why initialize B with zeros? What effect does this have initially? (See Question 1.3)

        # Replace the following lines with your implementation:
        std_dev = 1 / torch.sqrt(torch.tensor(rank).float())
        self.A = nn.Parameter(torch.randn(in_dim, rank) * std_dev) # <<< YOUR CODE HERE FOR A >>>
        self.B = nn.Parameter(torch.zeros(rank, out_dim)) # <<< YOUR CODE HERE FOR B >>>
        # --- End TODO 1.1 ---

        # Safety check (optional but good practice)
        if self.A is None or self.B is None:
             raise NotImplementedError("TODO 1.1: Initialize self.A and self.B")

    def forward(self, x):
        # --- TODO 1.2: Implement the LoRA Forward Pass ---
        # Calculate the LoRA adjustment: alpha * (x @ A @ B)
        # Note: The scaling factor `alpha` is often applied here, though some
        # implementations might incorporate it differently (e.g., merged with A or B).
        # I encourage you to read the LoRA paper section 4 to better understand
        # what is exactly going on: https://arxiv.org/pdf/2106.09685
        # This implementation applies it during the forward pass.

        # Replace the following line with your implementation:
        lora_output = x @ self.A @ self.B # <<< YOUR CODE HERE >>>
        lora_output = self.alpha * lora_output
        # --- End TODO 1.2 ---

        if lora_output is None: # Safety check
             raise NotImplementedError("TODO 1.2: Implement the forward pass")
        return lora_output

# --- Task 2: Implement the Wrapper Layer ---
class LinearWithLoRA(torch.nn.Module):
    def __init__(self, linear: nn.Linear, rank: int, alpha: int):
        super().__init__()
        self.linear = linear # The original frozen linear layer
        self.rank = rank
        self.alpha = alpha

        # Store original dimensions for clarity (optional)
        self.in_features = linear.in_features
        self.out_features = linear.out_features

        # --- TODO 2.1: Create the LoRALayer instance ---
        # Instantiate the `LoRALayer` you defined above.
        # It needs the input/output dimensions of the original `linear` layer,
        # and the `rank` and `alpha` values passed to this constructor.

        # Replace the following line with your implementation:
        self.lora = LoRALayer(
            in_dim=self.in_features,
            out_dim=self.out_features,
            rank=rank,
            alpha=alpha
        ) # <<< YOUR CODE HERE: Instantiate LoRALayer >>>
        # --- End TODO 2.1 ---

        # Safety check
        if self.lora is None:
             raise NotImplementedError("TODO 2.1: Instantiate self.lora")

    def forward(self, x):
        # --- TODO 2.2: Combine Original and LoRA Outputs ---
        # The forward pass should compute the output of the original linear layer
        # and add the output of the LoRA layer to it.
        # Remember: The original `self.linear` layer's parameters should be frozen.
        # Only the parameters inside `self.lora` (A and B) should be trainable.

        # Replace the following line with your implementation:
        combined_output = self.linear(x) + self.lora(x) # <<< YOUR CODE HERE >>>
        # --- End TODO 2.2 ---

        if combined_output is None: # Safety check
             raise NotImplementedError("TODO 2.2: Implement the combined forward pass")
        return combined_output


## Part 2: Modifying RoBERTa with LoRA (20 points)

In this part we will apply LoRA to an existing pretrained language model, RoBERTa. We will use the implementation of RoBERTa on the huggingface library.

Load a pre-trained RoBERTa model for sequence classification:

In [None]:
from transformers import AutoModelForSequenceClassification

model = AutoModelForSequenceClassification.from_pretrained(
    "roberta-base", num_labels=2)

We will do the following next:

1- Freeze all model parameters  

2- Replace specific linear layers with your LoRA-enhanced layers. Consider:
* Query matrices in the attention mechanism
* Key matrices
* Value matrices
* Output projection matrices
* Feed-forward layers
* Classification head



In [None]:
# --- Task 1: Apply LoRA to RoBERTa ---
def apply_lora_to_roberta(model: nn.Module, lora_r: int = 8, lora_alpha: int = 16):
    """
    Modifies a RoBERTa model (in-place) by replacing specific nn.Linear layers
    with LinearWithLoRA wrapper layers.

    Args:
        model (nn.Module): The RoBERTa model instance.
        lora_r (int): The rank for LoRA matrices.
        lora_alpha (int): The scaling factor for LoRA.

    Returns:
        nn.Module: The modified model.
    """
    # --- Step 1.1: Freeze Original Parameters (Already Done) ---
    print("Freezing original model parameters...")
    for param in model.parameters():
        param.requires_grad = False
    print("Original parameters frozen.")

    # --- Step 1.2: Create a Partial Constructor ---
    # `partial` pre-fills arguments for a function/constructor. Here, it creates
    # a shortcut `assign_lora(linear_layer)` that is equivalent to calling
    # `LinearWithLoRA(linear_layer, rank=lora_r, alpha=lora_alpha)`.
    assign_lora = partial(LinearWithLoRA, rank=lora_r, alpha=lora_alpha)
    print(f"Prepared LoRA wrapper with r={lora_r}, alpha={lora_alpha}")

    # --- TODO 1.3: Replace Target Layers ---
    # Replace the specified `nn.Linear` layers in the RoBERTa model
    # with their `LinearWithLoRA` wrapped versions using the `assign_lora` helper.
    # The code explicitly targets layers by their attribute path.

    print("Applying LoRA wrappers to target layers...")
    try:
        # Iterate through each transformer layer in the RoBERTa encoder
        for layer in model.roberta.encoder.layer:
            # --- Fill in the replacement lines below ---
            # hint: try to investigate the `layer` object to see what is its structure
            # hint: you can add a breakpoint by addding a "import pdb; pdb.set_trace()" line
            # and then see how the `layer` object looks like
            # for example the Query layer in Roberta is stored in the
            # layer.attention.self.query
            # Now you will apply LoRA to each component

            layer.attention.self.query = assign_lora(layer.attention.self.query) # <<< YOUR CODE HERE >>>
            layer.attention.self.key = assign_lora(layer.attention.self.key)   # <<< YOUR CODE HERE >>>
            layer.attention.self.value = assign_lora(layer.attention.self.value) # <<< YOUR CODE HERE >>>

            # Also target dense layers in attention output, intermediate FF, and final output
            layer.attention.output.dense = assign_lora(layer.attention.output.dense) # <<< YOUR CODE HERE >>>
            layer.intermediate.dense = assign_lora(layer.intermediate.dense)     # <<< YOUR CODE HERE >>>
            layer.output.dense = assign_lora(layer.output.dense)           # <<< YOUR CODE HERE >>>

        # Also target layers in the final classifier head
        # Check if the classifier structure matches RoBERTaForSequenceClassification
        if hasattr(model, 'classifier') and hasattr(model.classifier, 'dense') and hasattr(model.classifier, 'out_proj'):
             model.classifier.dense = assign_lora(model.classifier.dense)      # <<< YOUR CODE HERE >>>
             model.classifier.out_proj = assign_lora(model.classifier.out_proj)   # <<< YOUR CODE HERE >>>
        else:
             print("Warning: Classifier structure not as expected. Skipping classifier LoRA application.")
        # --- End TODO 1.3 ---

        print("Target layers replaced.")

    except AttributeError as e:
        print(f"\nERROR: An AttributeError occurred during layer replacement: {e}")
        print("This might happen if the model structure is different than expected (e.g., not RoBERTa).")
        print("Please check the model architecture and the attribute paths used for replacement.")
        raise # Re-raise the error after printing info

    # --- Step 1.4: Verify Trainable Parameters (Already Done) ---
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    total_params = sum(p.numel() for p in model.parameters())
    print(f"\nTrainable params (LoRA A/B matrices): {trainable_params:,}")
    print(f"Total params in model: {total_params:,}")
    if total_params > 0:
      print(f"Trainable parameter ratio: {trainable_params / total_params:.4%}")

    return model



## Part 3: Fine-tuning and Evaluation (20 points)

- Fine-tune your LoRA-enhanced RoBERTa model on the `SST-2` sentiment classification dataset.

- Evaluate the model's performance on a validation set
- Compare your LoRA fine-tuning with full finetuning


In [None]:
def prepare_sst2_data():
    """
    Load and prepare SST-2 dataset for sentiment classification

    Returns:
        train_dataset, eval_dataset: Dataset objects for training and evaluation
    """
    # Load SST-2 dataset
    dataset = load_dataset("glue", "sst2")
    train_dataset = dataset["train"].select(range(10_000)) # only 10K examples out of 67K
    eval_dataset = dataset["validation"]

    tokenizer = AutoTokenizer.from_pretrained("roberta-base")

    print("Tokenizer loaded")

    def tokenize_function(examples):
        return tokenizer(examples["sentence"], padding="max_length", truncation=True)

    # TASK 1: Take a look at the dataset.map function in the huggingface library
    # Explain what is this function does
    # https://huggingface.co/docs/datasets/en/process#map
    # Then inspect the format of the `train_dataset` and explain what data structure it is and what fields/attributes it contains
    train_dataset = train_dataset.map(tokenize_function, batched=True)
    eval_dataset = eval_dataset.map(tokenize_function, batched=True)

    # data type and format
    train_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])
    eval_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])

    return train_dataset, eval_dataset


In [None]:
# TODO: write your answer for TASK 1 in your_answer_task1 below

your_answer_task1 = """map function create a transformation that applies a user defined function to each example. The data structure of train_dataset is
 object backed by an Arrow table, with columns accessible as Python lists or NumPy/PyTorch tensors
"""  # <<< YOUR ANSWER HERE >>>

if your_answer_task1 == "":
  raise NotImplementedError()


In [None]:
!export WANDB_DISABLED=true

In [None]:
import numpy as np
from transformers import (
    AutoModelForSequenceClassification,
    Trainer,
    TrainingArguments,
    AutoTokenizer
)
from datasets import load_dataset
from sklearn.metrics import accuracy_score

def compute_metrics(eval_pred):
    """
    Compute evaluation metrics

    Args:
        eval_pred: Tuple of predictions and labels

    Returns:
        Dictionary of metrics
    """
    # TASK 2: Explain this function in the next cell
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return {"accuracy": accuracy_score(labels, predictions)}

In [None]:
# TODO:  Explain exactly what the `compute_metrics` function is doing (line by line)
# you answer should be 3 lines, explaining each of the lines in the fucntion
# write your answer in your_answer

your_answer_task2 = """First line is using huggingface to get a performance tuple which has the format raw model outputs, true labels)
The second line is to merge the class-dimension into a single predicted label per example.
The third line is to return the accuracy score of the prediction"""   # <<< YOUR ANSWER HERE >>>

if your_answer_task2 == "":
  raise NotImplementedError()
elif len(your_answer_task2.split("\n")) != 3:
  raise ValueError("Your answer should be 3 lines")

In [None]:
# Read the following block of code carefully
# then answer the questions in the next cell
# corresponding to each of the TASKS

def train_and_evaluate(model, train_dataset, eval_dataset):
    """
    Train and evaluate the model

    Args:
        model: Model to train
        train_dataset: Dataset for training
        eval_dataset: Dataset for evaluation

    Returns:
        Trained model and evaluation metrics
    """
    # Define training arguments (this is needed by the Huggingface Trainer)
    training_args = TrainingArguments(
        output_dir="./results",
        num_train_epochs=2,
        per_device_train_batch_size=32,
        per_device_eval_batch_size=64,
        warmup_steps=500,
        weight_decay=0.01,
        logging_dir="./logs",
        logging_steps=10,
        eval_strategy="epoch",
        save_strategy="epoch",
        load_best_model_at_end=True,
        disable_tqdm=False,
        report_to=["none"],
    )

    # Initialize Trainer
    # TASK 3: Take a look at the implementation of the Trainer class
    # and answer the questions regarding the TASK 2 in the next cell
    # here is the documentation: https://huggingface.co/docs/transformers/en/main_classes/trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        compute_metrics=compute_metrics,
    )

    # Task 4: Describe what happens conceptually when trainer.train() is called.
    trainer.train()

    trainer.save_model("./best_checkpoint")

    # Task 5: what happens when you call trainer.evaluate()?
    eval_results = trainer.evaluate()
    return model, eval_results


# 1. Load a base model
original_model = AutoModelForSequenceClassification.from_pretrained("roberta-base", num_labels=2)
print("Original model loaded.")

# 2. Apply LoRA modifications (example function you provided)
lora_model = apply_lora_to_roberta(original_model, lora_r=8, lora_alpha=16)
print("LoRA model created.")
# 3. Prepare your datasets (example function you provided)
train_dataset, eval_dataset = prepare_sst2_data()
print("Datasets prepared.")

# 4. Train and evaluate, saving the best checkpoint, it will take around 25 mins on a single T4 GPU
trained_lora_model, eval_results = train_and_evaluate(lora_model, train_dataset, eval_dataset)
print("Eval Results:", eval_results)


In [None]:
# TODO:  please add your answers to the TASKS defined in the above cell

your_answer_task3 = """Trainer is to Encapsulates the entire train/eval loop. It Logs training metrics at logging_steps and Saves model+optimizer state per save_strategy
and aggregates, logs, and returns all requested metrics without extra user code."""   # <<< YOUR ANSWER HERE >>>

your_answer_task4 = """it builds optimizer and scheduler from training_args, for each epoch and each batch it compute the loss and forward pass to compute logits.
For every logging_steps, logs training loss, lr, etc"""   # <<< YOUR ANSWER HERE >>>

your_answer_task5 = """It switch to evaluation mode where it disables gradient computation and iterates over all eval batches, computing logits and collecting true labels"""   # <<< YOUR ANSWER HERE >>>

if your_answer_task3 == "":
  raise NotImplementedError()
if your_answer_task4 == "":
  raise NotImplementedError()
if your_answer_task5 == "":
  raise NotImplementedError()

In [None]:

# 5. Load the best checkpoint and re-evaluate
#    This demonstrates how you can load the checkpoint for inference or other tasks.

import os
from safetensors.torch import load_file
def load_and_evaluate(checkpoint_path, eval_dataset, lora_r=8, lora_alpha=16):
    """
    Load a LoRA-trained model checkpoint (saved as model.safetensors)
    and evaluate on eval_dataset.
    """
    # 1. Load the base model
    loaded_model = AutoModelForSequenceClassification.from_pretrained("roberta-base", num_labels=2)

    # 2. Re-apply LoRA modifications
    loaded_model = apply_lora_to_roberta(loaded_model, lora_r, lora_alpha)

    # 3. Load safetensors state dict (instead of pytorch_model.bin)
    state_dict_path = os.path.join(checkpoint_path, "model.safetensors")
    state_dict = load_file(state_dict_path)  # safetensors load
    loaded_model.load_state_dict(state_dict)

    # 4. Evaluate with Trainer
    eval_args = TrainingArguments(
        output_dir="./results",
        per_device_eval_batch_size=64,
        do_eval=True,
        report_to=["none"],
    )
    trainer = Trainer(
        model=loaded_model,
        args=eval_args,
        eval_dataset=eval_dataset,
        compute_metrics=compute_metrics,
    )
    eval_results = trainer.evaluate()
    print("Evaluation results on loaded LoRA-trained model:", eval_results)
    return eval_results

loaded_eval_results = load_and_evaluate("./best_checkpoint", eval_dataset)

In [None]:
# TODO: add a few lines of code to check if these evaluation results when loading the checkpoint from disk
# is equal to the evaluation results right after training

# YOUR CODE HERE
for key in eval_results:
    orig = eval_results[key]
    loaded = loaded_eval_results.get(key)

    # float metrics: compare with a tolerance
    if isinstance(orig, float):
        if not np.isclose(orig, loaded, rtol=1e-6, atol=1e-8):
            raise ValueError(f"Mismatch in metric '{key}': {orig} vs {loaded}")
    else:
        # non‐float (e.g. ints or strings): must be exactly equal
        if orig != loaded:
            raise ValueError(f"Mismatch in metric '{key}': {orig} vs {loaded}")

raise NotImplementedError()

## Part 4: Analysis and Discussion (12 points)

Please answer the following questions in a brief report below.


1- What is the optimal rank value for your implementation, and how does it affect the trade-off between parameter count and performance?

2- Which layers benefit most from LoRA adaptation? Why do you think this is the case?

3- What are the computational advantages of using LoRA compared to full fine-tuning?

4- What are potential limitations or drawbacks of LoRA?

In [None]:
# TODO Your answers to above questions

your_answer_1 = """1"""  # <<< YOUR ANSWER HERE >>>

your_answer_2 = """1"""  # <<< YOUR ANSWER HERE >>>

your_answer_3 = """Fewer trainable parameters. Lower GPU memory footprint and Faster iteration."""  # <<< YOUR ANSWER HERE >>>

your_answer_4 = """A low-rank adapter may not capture full-rank weight updates needed for highly specialized tasks, which might under-fit if the true weight update is full-rank.
Also it is sensitive for choosing rank and scaling α requires tuning per task and model size."""  # <<< YOUR ANSWER HERE >>>

if not your_answer_1:
  raise NotImplementedError()
if not your_answer_2:
  raise NotImplementedError()
if not your_answer_3:
  raise NotImplementedError()
if not your_answer_4:
  raise NotImplementedError()