# Parameter Efficient Fine Tuning with LoRA

In this lab we will implement LoRA to finetune DistillBert to perform good Question Anwering with Context

## Implementing LoRA on DistillBERT


Remember that LoRA is a technique where we use 2 low rank matrices to adapt to the output of a given Layer. Like the following image:


<img src='https://www.dropbox.com/scl/fi/dfhuc42h5ohcbfny14gg8/lora.png?rlkey=7ku1ocyzibdgmnkup7kmsd8gb&raw=1'  />



In [None]:
!pip install datasets
!pip install 'numpy<=1.24'

In [None]:
from transformers import TFDistilBertModel, DistilBertConfig, DistilBertTokenizer
from transformers.models.distilbert.modeling_tf_distilbert import TFDistilBertMainLayer
import tensorflow as tf
import keras
import math
import numpy as np
import logging
import warnings
logging.disable(logging.WARNING)
warnings.filterwarnings('ignore')

First we implement the Lora Layer like we did before

In [None]:
class LoraLayer(keras.layers.Layer):
    def __init__(
        self,
        original_layer,
        rank=8,
        num_heads =1,
        dim = 1,
        trainable=False,
        **kwargs,
    ):
        # We want to keep the name of this layer the same as the original
        # dense layer.
        original_layer_config = original_layer.get_config()
        name = original_layer_config["name"]

        kwargs.pop("name", None)

        super().__init__(name=name, trainable=trainable, **kwargs)

        self.rank = rank


        # Layers.

        # Original dense layer.
        self.original_layer = original_layer
        # No matter whether we are training the model or are in inference mode,
        # this layer should be frozen.
        self.original_layer.trainable = False

        # LoRA dense layers.
        self.A = keras.layers.Dense(
            units=rank,
            use_bias=False,
            trainable=trainable,
            name=f"lora_A",
        )

        self.B = keras.layers.Dense(
            units=dim,
            use_bias=False,
            trainable=trainable,
            name=f"lora_B",
        )

    def call(self, inputs):
        original_output = self.original_layer(inputs)
        if self.trainable:
            # If we are fine-tuning the model, we will add LoRA layers' output
            # to the original layer's output.
            lora_output = self.B(self.A(inputs))
            return original_output + lora_output

        # If we are in inference mode, we "merge" the LoRA layers' weights into
        # the original layer's weights - more on this in the text generation
        # section!
        return original_output

Then we iterate and replace each query and value from the MultiHeadAttention layer to LoraLayers

In [None]:
# Load DistilBert model
config = DistilBertConfig()
lora_model = TFDistilBertModel(config)

# Iterate through the layers and modify the self-attention layers
for layer in lora_model.distilbert.transformer.layer:
    attention = None # Grab attention layer
    dim = None # The dimension of the model
    n_heads = None # The number of heads in the model
    # Replace query and value weights with LoraLayer instances
    attention.q_lin = None # Replace q_lin with a lora layer
    attention.v_lin = None # Replace v_lin with a lora layer

In [None]:
config = DistilBertConfig()
standard_model = TFDistilBertModel(config)

We test both implementations work

In [None]:
test_text = ["This is a test sentence for DistilBert models."]

# Load tokenizer
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')

# Tokenize the input
inputs = tokenizer(test_text, return_tensors='tf', padding=True, truncation=True)

# Run the input through the standard model
standard_output = standard_model(inputs)
# Run the input through the LoRA model
lora_output = lora_model(inputs)



Now we set the non-LoraLayers as non-trainable and calculate the trainable weights per layer

In [None]:
print('Before change: \n')

for layer in lora_model._flatten_layers():
    print(layer, layer.__class__.__module__, layer.name, np.sum([np.prod(w.shape) for w in layer.trainable_weights]))

    if layer.__class__.__module__.startswith('keras') and not layer.name.startswith("lora"):
        layer.trainable = False
    elif layer.name.startswith("lora"):
        layer.trainable = True
    elif layer.name == 'embeddings':
        layer.trainable = False

print('After change: \n\n')

for layer in lora_model._flatten_layers():
    print(layer, layer.__class__.__module__, layer.name, np.sum([np.prod(w.shape) for w in layer.trainable_weights]))

In [None]:

def calculate_trainable_params(model):
  return None # Implement the amount of trainable weights

print(f"Trainable parameters in standard DistilBert: {calculate_trainable_params(standard_model)}")
print(f"Trainable parameters in LoRA-adapted DistilBert: {calculate_trainable_params(lora_model)}")


Notice the difference!!

In [None]:
standard_model.summary()

In [None]:
lora_model.summary()

We succesfully Lora-adapted DistillBERT

## Q&A

Loading classic stuff to do Q&A plus tokenizing and decoding

In [None]:
from transformers import DistilBertTokenizer, TFDistilBertForQuestionAnswering
from datasets import load_dataset


tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')

def prepare_qa_input(question, context):
    inputs = tokenizer.encode_plus(
        question,
        context,
        add_special_tokens=True,
        return_tensors="tf",
        truncation=True,
        padding="max_length",
        max_length=512  # Adjust based on your needs
    )
    return inputs

In [None]:
model = TFDistilBertForQuestionAnswering.from_pretrained('distilbert-base-uncased')


In [None]:
question = "What is the capital of France?"
context = "France is a country in Europe. Its capital is Paris."

inputs = prepare_qa_input(question, context)

outputs = model(**inputs)

start_logits, end_logits = outputs.start_logits, outputs.end_logits

In [None]:
import tensorflow as tf

def decode_answer(start_logits, end_logits, inputs):
    start_index = tf.argmax(start_logits, axis=1)[0]
    end_index = tf.argmax(end_logits, axis=1)[0]

    # Convert token indices to the original context text
    tokens = tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])
    answer_tokens = tokens[start_index: end_index + 1]
    answer = tokenizer.convert_tokens_to_string(answer_tokens)

    # Check for [CLS] prediction
    if answer.startswith("[CLS]"):
        return "Answer not found"
    return answer

decoded_answer = decode_answer(start_logits, end_logits, inputs)
print("Answer:", decoded_answer)

Lora-Adapt this new model

In [None]:
# Load TFDistilBertForQuestionAnswering model
lora_model = TFDistilBertForQuestionAnswering.from_pretrained('distilbert-base-uncased')


# Iterate through the layers and modify the self-attention layers
for layer in lora_model.distilbert.transformer.layer:
    None # Repeat the same as before

In [None]:
outputs = lora_model(**inputs)

start_logits, end_logits = outputs.start_logits, outputs.end_logits
decode_answer(start_logits, end_logits, inputs)

In [None]:
print('Before change: \n')

for layer in lora_model._flatten_layers():
    print(layer, layer.__class__.__module__, layer.name, np.sum([np.prod(w.shape) for w in layer.trainable_weights]))

    if layer.__class__.__module__.startswith('keras') and not layer.name.startswith("lora"):
        layer.trainable = False
    elif layer.name.startswith("lora"):
        layer.trainable = True
    elif layer.name == 'embeddings':
        layer.trainable = False

print('After change: \n\n')

for layer in lora_model._flatten_layers():
    print(layer, layer.__class__.__module__, layer.name, np.sum([np.prod(w.shape) for w in layer.trainable_weights]))

In [None]:
lora_model.summary()

Now comes the tricky part, we will  use the squad dataset, that has questions and answers, but we need to add the start_positions and end_positions

In [None]:
from transformers import AutoTokenizer

# Load the tokenizer
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased", use_fast=True)

# Load the dataset (replace with your specific dataset)
dataset = None # Load squad validation split

def add_token_positions(encodings, answers):
    start_positions = []
    end_positions = []
    for i in range(len(answers)):
        start_char = answers[i]['answer_start'][0]
        answer_text = answers[i]['text'][0]  # The text of the answer
        end_char = start_char + len(answer_text)  # Calculate the end character position

        start_idx = encodings.char_to_token(i, start_char)
        end_idx = encodings.char_to_token(i, end_char - 1)  # Adjust by -1 for inclusive range

        # Set to 0 (default index) if answer is not found within the context
        if start_idx is None:
            start_idx = 0
        if end_idx is None:
            end_idx = start_idx  # Default to start index if end index is not found

        start_positions.append(start_idx)
        end_positions.append(end_idx)

    encodings.update({'start_positions': start_positions, 'end_positions': end_positions})




def add_token_positions_batch(examples):
    # Tokenize the examples and then add token positions
    tokenized_inputs = tokenizer(examples['question'], examples['context'], truncation=True, padding='max_length', return_offsets_mapping=True, max_length=512)
    add_token_positions(tokenized_inputs, examples['answers'])
    return tokenized_inputs

# Apply the function in a batched manner
tokenized_datasets = dataset.map(add_token_positions_batch, batched=True, remove_columns=dataset.column_names)

In [None]:
train_input_ids = tokenized_datasets['input_ids']
train_attention_mask = tokenized_datasets['attention_mask']
train_start_positions = tokenized_datasets['start_positions']
train_end_positions = tokenized_datasets['end_positions']


In [None]:
train_input_ids = None # Convert to tensor
train_attention_mask = None # Convert to tensor
train_start_positions = None # Convert to tensor
train_end_positions = None # Convert to tensor


Testing if everything is OK

In [None]:
# Example: Checking model weights for NaN values
for weight in lora_model.weights:
    if tf.reduce_any(tf.math.is_nan(weight)):
        print("NaN weight:", weight.name)


In [None]:
test_output = lora_model(
    input_ids=train_input_ids[:1],
    attention_mask=train_attention_mask[:1]
)
if tf.reduce_any(tf.math.is_nan(test_output.start_logits)) or tf.reduce_any(tf.math.is_nan(test_output.end_logits)):
    print("NaN found in model outputs")


Finetuning Lora weights to do Q&A on squad

In [None]:
# Compile the model
lora_model.compile(
    optimizer= tf.keras.optimizers.Adam(learning_rate=5e-5, clipnorm=1.0),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]
)


In [None]:
# Create an instance of the callback
early_stopping = keras.callbacks.EarlyStopping(monitor='loss', patience=3)
# Train the model
None # Train for 5 epochs and batch size of 32

In [None]:
lora_model.summary()

In [None]:
# Your existing code for model prediction
question = "What is the capital of France?"
context = "France is a country in Europe. Its capital is Paris."
inputs = prepare_qa_input(question, context)
outputs = lora_model(inputs)
start_logits, end_logits = outputs.start_logits, outputs.end_logits

# Decode the answer
decoded_answer = decode_answer(start_logits, end_logits, inputs)
print("Answer:", decoded_answer)

In  case you wanted to create your custom training loop:

In [None]:
# import tqdm

# optimizer = tf.keras.optimizers.Adam(learning_rate=1e-6, epsilon=1e-08, clipnorm=1.0)
# loss_function = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# # Optionally define metrics, e.g., accuracy
# train_acc_metric = tf.keras.metrics.SparseCategoricalAccuracy()

# epochs = 3  # Set the number of epochs

# for epoch in range(epochs):
#     print("\nStart of epoch %d" % (epoch,))
#     for step, batch in enumerate(tqdm.tqdm(tf_dataset)):

#         # Open a GradientTape
#         with tf.GradientTape() as tape:

#             # Forward pass
#             outputs = lora_model(
#                 input_ids=batch['input_ids'],
#                 attention_mask=batch['attention_mask']
#             )


#             # Extract start_logits and end_logits from the model's output
#             start_logits = outputs.start_logits
#             end_logits = outputs.end_logits

#             # Compute the loss value
#             start_loss = loss_function(batch['start_positions'], start_logits)

#             end_loss = loss_function(batch['end_positions'], end_logits)

#             # Compute the total loss
#             total_loss = (start_loss + end_loss) / 2

#         # Compute gradients
#         gradients = tape.gradient(total_loss, lora_model.trainable_variables)
#         gradients = [tf.clip_by_value(grad, -1.0, 1.0) for grad in gradients]

#         # Update weights
#         optimizer.apply_gradients(zip(gradients, lora_model.trainable_variables))

#         # Update training metric.
#         train_acc_metric.update_state(batch['start_positions'], start_logits)
#         train_acc_metric.update_state(batch['end_positions'], end_logits)

#         # Log every 200 batches.
#         if step % 200 == 0:
#             print("Training loss (for one batch) at step %d: %.4f" % (step, float(total_loss)))
#             print("Seen so far: %s samples" % ((step + 1) * 16))

#     # Display metrics at the end of each epoch.
#     train_acc = train_acc_metric.result()
#     print("Training acc over epoch: %.4f" % (float(train_acc),))

#     # Reset training metrics at the end of each epoch
#     train_acc_metric.reset_states()
