In [None]:
# The initial notebook that I worked from: https://kaitchup.substack.com/p/mistral-7b-recipes-for-fine-tuning
import torch
from datasets import load_dataset
from peft import LoraConfig, PeftModel, prepare_model_for_kbit_training
from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
    BitsAndBytesConfig,
    AutoTokenizer,
    TrainingArguments,
)

from trl import SFTTrainer

num_training_epochs = 1 
num_eval_steps = 250 
sequence_length = 256
batch_size = 16
learning_rate = 1e-4
gradient_accumulation_steps = 4 
gpu_id = 0 
num_output_labels = 4 
hugging_face_token = "TODO: Add here your hugging face token" # Hugging Face API token
model_name = "mistralai/Mistral-7B-v0.1"
tokenizer_padding_side = "right"
models_output_dir = "./v2_mistral7b_results"
start_from_checkpoint_with_name = None # This can be a string with the checkpoint name to start from, or None to start from scratch.
print_dataset_statistics_flag = False # If True, the statistics of the dataset will be printed.
skip_training = True # If True, the training will be skipped. I added this option because sometime I want only to evaluate the model.

DEBUG_MODE = True
# For debugging purposes
if DEBUG_MODE:
    num_training_examples = 1000 # Number of training examples to use [0: num_training_examples]
    num_validation_examples = 100 # Number of validation examples to use [0: num_validation_examples]
    num_test_examples = 100  # Number of test examples to use [0: num_test_examples]

if start_from_checkpoint_with_name is not None:
    model_name = models_output_dir + "/" + start_from_checkpoint_with_name
    print("Starting from checkpoint with name: ", start_from_checkpoint_with_name)

print("Model name: ", model_name)

In [None]:
import random
import numpy as np

seed_value = 42

torch.manual_seed(seed_value)
torch.cuda.manual_seed_all(seed_value)
random.seed(seed_value)
np.random.seed(seed_value)

In [None]:
def concatenate_and_label(example):
    if example["label"] == -1: # This is a special value (from the dataset) that means the example there is no valid label for this example
        example["label"] = 3
    return {
        "text": f'Prem: {example["premise"]} Hypo: {example["hypothesis"]}', # Premise and hypothesis concatenated
        "labels": example.pop("label")  
    }

def create_tokenize_function(tokenizer):
    def tokenize_function(example):
        tokenized = tokenizer(example["text"], truncation=True, padding="max_length", max_length=sequence_length)
        return tokenized
    return tokenize_function

In [None]:
from trl.trainer import ConstantLengthDataset
from huggingface_hub import login
login(token=hugging_face_token) # I created this token on huggingface.co/settings/tokens

tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = tokenizer_padding_side

# Dataset source: https://huggingface.co/datasets/stanfordnlp/snli
dataset = load_dataset("stanfordnlp/snli")

if DEBUG_MODE:
    dataset["train"] = dataset["train"].select(range(num_training_examples))
    dataset["validation"] = dataset["validation"].select(range(num_validation_examples))
    dataset["test"] = dataset["test"].select(range(num_test_examples))

dataset = dataset.map(concatenate_and_label)
tokenize_function = create_tokenize_function(tokenizer)

tokenized_dataset = dataset.map(tokenize_function, batched=True)
print("data loaded successfully")

In [None]:
from collections import Counter
# Counter the number of examples for each label in the training set
print("The number of examples for each label in the training set: ", Counter(tokenized_dataset["train"]["labels"]))
# They use -1 to label examples for which gold label is missing (gold label = - in the original dataset). Based on this link: https://github.com/huggingface/datasets/issues/296
# print("The indices of the examples in the training set that have the label -1 (no label is provided): ", tokenized_dataset["train"].filter(lambda example: example["labels"] == -1)["idx"])

# Print the first example
# Retrieve the first example from the tokenized dataset
first_example = tokenized_dataset['train'][0]  # Assuming you want to visualize from the training split

# Decode the tokenized input back to text
decoded_text = tokenizer.decode(first_example['input_ids'], skip_special_tokens=True)
decoded_text_with_special_tokens = tokenizer.decode(first_example['input_ids'], skip_special_tokens=False)

# Print the decoded text and the corresponding label
print("Decoded Text:", decoded_text)
print("Decoded Text with Special Tokens:", decoded_text_with_special_tokens)
print("Label:", first_example['labels']) 

from torch.utils.data import DataLoader

from torch.nn.utils.rnn import pad_sequence
import torch    

def collate_fn(batch):
    # Extract input_ids, attention_mask, and labels from the batch
    input_ids = [torch.tensor(example["input_ids"]).to(f'cuda:{gpu_id}') for example in batch]
    attention_mask = [torch.tensor(example["attention_mask"]).to(f'cuda:{gpu_id}') for example in batch]
    labels = [torch.tensor(example["labels"]).to(f'cuda:{gpu_id}') for example in batch]

    input_ids_padded = torch.stack(input_ids)
    attention_mask_padded = torch.stack(attention_mask)

    # Stack the labels into a single tensor
    labels = torch.stack(labels)

    return {"input_ids": input_ids_padded, "attention_mask": attention_mask_padded, "labels": labels}

train_dataloader = DataLoader(tokenized_dataset["train"], batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
validation_dataloader = DataLoader(tokenized_dataset["validation"], batch_size=batch_size, shuffle=False) 
# Get the first batch of the training dataloader
first_batch = next(iter(train_dataloader))

print("First batch input ids: ", first_batch['input_ids'].shape)
print("First batch labels: ", first_batch['labels'])
decoded_text_for_batch = [tokenizer.decode(input_ids, skip_special_tokens=True) for input_ids in first_batch['input_ids']]
print("First batch decoded text: ", decoded_text_for_batch)
print("Finished debugging the batch.")

In [None]:
def print_dataset_statistics(set_name):
    max_length_premise = max(len(tokenizer(example["premise"])["input_ids"]) for example in dataset[set_name])
    max_length_hypothesis = max(len(tokenizer(example["hypothesis"])["input_ids"]) for example in dataset[set_name])
    print(f"Statistics for the {set_name} dataset:")
    print(f"\tNumber of examples: {len(dataset[set_name])}")
    print(f"\tMaximum length of the premise: {max_length_premise}, Maximum length of the hypothesis: {max_length_hypothesis}. Therefore, the maximum length of the concatenated text is {max_length_premise + max_length_hypothesis}")

if print_dataset_statistics_flag:
    print_dataset_statistics("train")
    print_dataset_statistics("validation")
    print_dataset_statistics("test")

In [None]:
#Quantization configuration
compute_dtype = getattr(torch, "float16")
bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=compute_dtype,
        bnb_4bit_use_double_quant=True,
)
model = AutoModelForSequenceClassification.from_pretrained(
          model_name, quantization_config=bnb_config, num_labels=num_output_labels, device_map={"": gpu_id}
)
model = prepare_model_for_kbit_training(model)
model.config.pad_token_id = tokenizer.pad_token_id

In [None]:
# define the configuration of LoRA.
from peft.utils import TaskType
peft_config = LoraConfig(
        lora_alpha=16,
        lora_dropout=0.05,
        r=16,
        bias="none",
        task_type=TaskType.SEQ_CLS,
        target_modules= ['k_proj', 'q_proj', 'v_proj', 'o_proj', "gate_proj", "down_proj", "up_proj"]
)

# Training hyperparameters
training_arguments = TrainingArguments(
        output_dir=models_output_dir,
        eval_strategy="steps",
        do_eval=True,
        do_predict=True,
        optim="paged_adamw_8bit",
        per_device_train_batch_size=batch_size,
        gradient_accumulation_steps=gradient_accumulation_steps,
        per_device_eval_batch_size=batch_size,
        log_level="debug",
        learning_rate=learning_rate,
        eval_steps=num_eval_steps,
        num_train_epochs=num_training_epochs,
        save_steps=num_eval_steps,
        warmup_steps=num_eval_steps,
        lr_scheduler_type="linear",
)

# print the trainable parameters
def print_trainable_parameters(model):
    """
    Prints the number of trainable parameters in the model.
    """
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# Print the number of trainable parameters
print_trainable_parameters(model) 

In [None]:
from transformers import Trainer
from torch.utils.data import DataLoader

class CustomSFTTrainer(SFTTrainer):
    def get_train_dataloader(self):
        return DataLoader(
            self.train_dataset,
            batch_size=self.args.per_device_train_batch_size,
            shuffle=True,
            collate_fn=collate_fn 
        )
    
    def get_eval_dataloader(self, eval_dataset=None):
        eval_dataset = eval_dataset if eval_dataset is not None else self.eval_dataset
        return DataLoader(
            eval_dataset,
            batch_size=self.args.per_device_eval_batch_size,
            collate_fn=collate_fn 
        )
    
    def get_test_dataloader(self, test_dataset):
        return DataLoader(
            test_dataset,
            batch_size=self.args.per_device_eval_batch_size,
            collate_fn=collate_fn
        )

trainer = CustomSFTTrainer(
            model=model,
            train_dataset=tokenized_dataset['train'],
            eval_dataset=tokenized_dataset['validation'],
            peft_config=peft_config,
            dataset_text_field="text",
            max_seq_length=sequence_length,
            tokenizer=tokenizer,
            args=training_arguments,
    )
if not skip_training: 
    if start_from_checkpoint_with_name is not None:
        print("Starting the training from the checkpoint with name: ", start_from_checkpoint_with_name)
        trainer.train(resume_from_checkpoint=model_name)
    else:
        print("Starting the training from original Mistral 7B model weights.")
        trainer.train()


In [None]:
from transformers import AutoModelForSequenceClassification, AutoTokenizer
import torch
import torch.nn.functional as F

model.config.use_cache = True
model = PeftModel.from_pretrained(model, "./v2_mistral7b_results/checkpoint-2250/") # This model affects this cell and the next cell that calculates the validation accuracy on the test dataset.

model.cuda(gpu_id) 
model.eval() 

def classify(prem, hypo):
    # Create the prompt as it was used during training
    prompt = f'Prem: {prem} Hypo: {hypo}'
    inputs = tokenizer(prompt, return_tensors="pt", truncation=True, padding="max_length", max_length=sequence_length)
    input_ids_list = inputs["input_ids"].tolist()[0]
    decoded_text_with_special_tokens = tokenizer.decode(input_ids_list, skip_special_tokens=False)
    print("Decoded Text with Special Tokens:", decoded_text_with_special_tokens)

    input_ids = inputs["input_ids"].cuda(gpu_id) 

    with torch.no_grad():
        outputs = model(input_ids=input_ids)
        logits = outputs.logits

    probabilities = F.softmax(logits, dim=-1)

    predicted_class = torch.argmax(probabilities, dim=-1).item()

    print("For the prompt: ", prompt)

    probabilities = (probabilities * 100).tolist()
    print("Probabilities: ", probabilities)

    classes_dict = {0: "entailment", 1: "neutral", 2: "contradiction", 3: "no valid label"}
    print(f"Predicted class: {classes_dict[predicted_class]}")

# Test the function with example premises and hypotheses
classify("The moon's gravity affects the Earth.", "The Earth revolves around the sun.") # Should be entailment
classify("The Earth revolves around the Sun in a circular orbit.", "The Earth revolves around the Sun in an elliptical orbit.") # Should be contradiction
classify("The football player scored a goal.", "The football player is a striker.") # Should be neutral
classify("The football player scored a goal.", "The football player is a goalkeeper.") # Should be contradiction
classify("The football player scored a goal.", "The football player is a defender.") # Should be contradiction
classify("A person is outside on a beautiful day.", "The weather is nice.") # Should be entailment


In [None]:
predictions_dict = trainer.predict(test_dataset=tokenized_dataset['test'])
predictions_tensor = torch.tensor(predictions_dict.predictions)
predictions = torch.argmax(predictions_tensor, dim=-1)
predictions = predictions.cpu().numpy()
print("Prediction metrics: ", predictions_dict.metrics)
accuracy = (predictions == predictions_dict.label_ids).mean()
print(f"Accuracy: {accuracy * 100:.2f}%")