T5-LoRA Implementation (GLUE, SST2)

In [17]:
# Loading Pre Trained Model
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer

model_name = "t5-small"

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)

In [18]:
# Loading Dataset
from datasets import load_dataset

dataset = load_dataset("glue", "sst2")

In [19]:
# Data Preprocessing

def preproc(record):
    inputs = ["sst2 sentence"+ sentence for sentence in record["sentence"]]
    labels =  ["positive " if label == 1 else "negative" for label in record["label"]]
    return {"input_text": inputs, "target_text": labels}

dataset = dataset.map(preproc, batched= True)

In [20]:
# Applying LoRA to T5
from peft import LoraConfig, get_peft_model

lora_confg = LoraConfig(
    r=8,
    lora_alpha=32,
    lora_dropout=0.1,
    target_modules= ["q", "v"],)

model = get_peft_model(model, lora_confg)
model.print_trainable_parameters()

trainable params: 294,912 || all params: 60,801,536 || trainable%: 0.4850


In [21]:
from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments

def tokenize_function(record):
    model_inputs = tokenizer(record["input_text"], padding="max_length", truncation=True, max_length=128)
    labels = tokenizer(record["target_text"], padding="max_length", truncation=True, max_length=5)

    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

toke_dataset = dataset.map(tokenize_function, batched=True)
toke_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])

training_args = Seq2SeqTrainingArguments(
    output_dir="./t5_lora_sst2",
    eval_strategy="epoch",
    save_strategy="epoch",
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_dir="./logs",
    report_to="none",
)

# Creating Trainer
trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=toke_dataset["train"],
    eval_dataset=toke_dataset["validation"],
    tokenizer=tokenizer,
)

# Start training
trainer.train()

Map: 100%|██████████| 67349/67349 [00:06<00:00, 10417.67 examples/s]
Map: 100%|██████████| 872/872 [00:00<00:00, 8740.91 examples/s]
Map: 100%|██████████| 1821/1821 [00:00<00:00, 10875.98 examples/s]
  trainer = Seq2SeqTrainer(
No label_names provided for model class `PeftModel`. Since `PeftModel` hides base models input arguments, if label_names is not given, label_names can't be set automatically within `Trainer`. Note that empty label_names list will be used instead.


Epoch,Training Loss,Validation Loss
1,0.0567,No log
2,0.0592,No log
3,0.0555,No log


TrainOutput(global_step=25257, training_loss=0.12646112741443982, metrics={'train_runtime': 2490.9708, 'train_samples_per_second': 81.112, 'train_steps_per_second': 10.139, 'total_flos': 6882113353678848.0, 'train_loss': 0.12646112741443982, 'epoch': 3.0})

In [22]:
# Evaluating the model on the validation set
eval_results = trainer.evaluate()

# Print evaluation results
print(eval_results)

{'eval_runtime': 3.5538, 'eval_samples_per_second': 245.374, 'eval_steps_per_second': 30.672, 'epoch': 3.0}


In [23]:
eval_results = trainer.evaluate()
print(f"Eval Loss: {eval_results.get('eval_loss', 'Loss not computed')}")


Eval Loss: Loss not computed


In [24]:
import torch

total_loss = 0
num_batches = 0

for batch in trainer.get_eval_dataloader():
    with torch.no_grad():
        outputs = model(**{k: v.to(model.device) for k, v in batch.items()})
        total_loss += outputs.loss.item()
        num_batches += 1

eval_loss = total_loss / num_batches 
print(f"Manually Computed Eval Loss: {eval_loss}")

Manually Computed Eval Loss: 0.05340239079989527


 Manual KronA Implementation

In [25]:
# Custom KronA layer
import torch
import torch.nn as nn

class KronA(nn.Module):
    def __init__(self, in_features, out_features, rank):
        super(KronA, self).__init__()
        self.rank = rank
        self.in_feat = in_features
        self.out_feat = out_features

        self.A_matr = nn.parameter(torch.randn(rank, in_features))
        self.B_matr = nn.parameter(torch.randn(rank, out_features))

    def forward(self, x):
        Prod_Kron = torch.kron(self.A_matr, self.B_matr)

        return torch.matmul(x, Prod_Kron)



In [26]:
# Modifying T5 layers to use KronA

from transformers import T5ForConditionalGeneration, T5Config
import torch
import torch.nn as nn
from transformers.models.t5.modeling_t5 import T5Attention, T5DenseActDense

class KronA_T5Attention(nn.Module):
    def __init__(self, config, rank):
        super(KronA_T5Attention, self).__init__()
        self.self_attention = T5Attention(config)
        self.rank = rank

        self.query = KronA(config.d_model, config.d_model, rank)
        self.key = KronA(config.d_model, config.d_model, rank)
        self.value = KronA(config.d_model, config.d_model, rank)
        self.o = KronA(config.d_model, config.d_model, rank)

    def forward(self, hidden_states, attention_mask = None):
        query = self.query(hidden_states)
        key = self.query(hidden_states)
        value = self.query(hidden_states)

        attention_output = self.self_attention(query, key, value, attention_mask)
        output = self.o(attention_output)
        return output 

class KronA_T5DenseReluDense(nn.Module):
    def __init__(self, config, rank):
        super().__init__(KronA_T5DenseReluDense, self).__init__()
        self.dense_1 = KronA(config.d_model, config.d_ff, rank)
        self.dense_2 = KronA(config.d_ff, config.d_model, rank)
        self.dropout = nn.Dropout(config.dropout_rate)
        self.activation = nn.GELU()

    def forward(self, hidden_states):
        hidden_states = self.dense_1(hidden_states)
        hidden_states = self.activation(hidden_states)
        hidden_states = self.dense_2(hidden_states)
        hidden_states = self.dropout(hidden_states)
        return hidden_states
    
class KronA_T5ForConditionalGeneration(T5ForConditionalGeneration):
    def __init__(self, config, rank=16):
        super(KronA_T5ForConditionalGeneration, self).__init__(config)

        self.encoder.block = nn.ModuleList([KronA_T5Attention(config, rank) if isinstance(layer, T5Attention) else layer for layer in self.encoder.block])    
        self.decoder.block = nn.ModuleList([KronA_T5Attention(config, rank) if isinstance(layer, T5Attention) else layer for layer in self.decoder.block])
        self.encoder.block = nn.ModuleList([KronA_T5DenseReluDense(config, rank) if isinstance(layer, T5DenseActDense) else layer for layer in self.encoder.block ])
        self.decoder.block = nn.ModuleList([KronA_T5DenseReluDense(config, rank) if isinstance(layer, T5DenseActDense) else layer for layer in self.decoder.block])

        

In [30]:
# Load Dataset SST-2
from datasets import load_dataset
from transformers import T5Tokenizer

# Load dataset
dataset = load_dataset("glue", "sst2")

# Load tokenizer
tokenizer = T5Tokenizer.from_pretrained("t5-small")

# Define label mapping for SST-2
label_map = {0: "negative", 1: "positive"}

# Debug: Check label distribution
print("Unique labels in dataset:", set(dataset["train"]["label"]))

def preprocess_function(examples):
    inputs = [f"sst2 sentence: {sentence}" for sentence in examples["sentence"]]
    model_inputs = tokenizer(inputs, padding="max_length", truncation=True, max_length=128)

    # Handle unknown labels safely
    labels = []
    for label in examples["label"]:
        if label in label_map:
            labels.append(label_map[label])
        else:
            print(f"Warning: Unexpected label {label} encountered. Assigning 'unknown'.")
            labels.append("unknown")  # Change this if you want to handle it differently.

    # Tokenize labels as targets
    with tokenizer.as_target_tokenizer():
        labels = tokenizer(labels, padding="max_length", truncation=True, max_length=2)

    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

# Apply preprocessing
encoded_dataset = dataset.map(preprocess_function, batched=True)

# Convert to PyTorch format
encoded_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])

# Split into train and validation sets
train_dataset = encoded_dataset["train"]
val_dataset = encoded_dataset["validation"]

# Load KronA-T5 Model
from transformers import Trainer, TrainingArguments
from sklearn.metrics import accuracy_score

config = T5Config.from_pretrained("t5-small")
krona_t5 = KronA_T5ForConditionalGeneration(config, rank=16)

# Training Arguments
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_dir="./logs",   
)

# Trainer
trainer = Trainer(
    model=krona_t5,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer
)

trainer.train()


Unique labels in dataset: {0, 1}


  trainer = Trainer(


Epoch,Training Loss,Validation Loss
1,0.2132,0.260885
2,0.1714,0.333046
3,0.1546,0.406478


TrainOutput(global_step=25257, training_loss=0.20243810226332287, metrics={'train_runtime': 2406.8538, 'train_samples_per_second': 83.947, 'train_steps_per_second': 10.494, 'total_flos': 6836351240503296.0, 'train_loss': 0.20243810226332287, 'epoch': 3.0})

In [31]:
# Evaluate the fine-tuned model
results1 = trainer.evaluate()
print("Validation Results:", results1)


Validation Results: {'eval_loss': 0.40647807717323303, 'eval_runtime': 2.7182, 'eval_samples_per_second': 320.8, 'eval_steps_per_second': 40.1, 'epoch': 3.0}


In [32]:
# Save the fine-tuned model
krona_t5.save_pretrained("./krona_t5_sst2")
