In [None]:
# install required packages
%pip install \
    transformers \
    datasets \
    evaluate \
    rouge_score\
    loralib \
    bitsandbytes  \
    scikit-learn \
    peft --quiet

In [None]:
# import required libraries
import torch
import pandas as pd
from datasets import Dataset, load_dataset, DatasetDict
from peft import LoraConfig, PeftModel, prepare_model_for_kbit_training, get_peft_model
from transformers import (
    AutoTokenizer,
    BitsAndBytesConfig,
    TrainingArguments,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    EarlyStoppingCallback,
    DataCollatorWithPadding)

import bitsandbytes as bnb
from getpass import getpass
import evaluate
import numpy as np

import random

In [None]:
# Logging In to Hugging Face
hf_token = getpass("Hugging Face: ")
!huggingface-cli login --token $hf_token

In [None]:
# Load the IMDB dataset
dataset_imdb = load_dataset("imdb")

In [None]:
# Reduce the dataset size for quicker experimentation

reduction_rate    = 0.1
num_train_to_keep = int(reduction_rate * dataset_imdb["train"].num_rows)
num_test_to_keep  = int(reduction_rate * dataset_imdb["test"].num_rows)

def select_random_indices(dataset, num_to_keep):
    indices = list(range(dataset.num_rows))
    random.shuffle(indices)
    return indices[:num_to_keep]

train_indices = select_random_indices(dataset_imdb["train"], num_train_to_keep)
test_indices  = select_random_indices(dataset_imdb["test"], num_test_to_keep)

dataset_imdb  = DatasetDict({
    "train": dataset_imdb["train"].select(train_indices),
    "test": dataset_imdb["test"].select(test_indices),
})

dataset_imdb

In [None]:
# Load the tokenizer and check the vocab size of the model
model_id  = "google/gemma-2b-it"
tokenizer = AutoTokenizer.from_pretrained(model_id)
print(f' Vocab size of the model {model_id}: {len(tokenizer.get_vocab())}')


In [None]:
# Preprocess the dataset
def preprocess_function(examples):
    return tokenizer(examples["text"], truncation=True,  max_length=512)

tokenized_imdb = dataset_imdb.map(preprocess_function, batched=True)

In [None]:
# Check the length of tokenized input ids
for i in range(10):
    print(len(tokenized_imdb['train'][i]['input_ids']))

In [None]:
# Define label mappings
id2label = {0: "NEGATIVE", 1: "POSITIVE"}
label2id = {"NEGATIVE": 0, "POSITIVE": 1}

In [None]:
# Create Data Collator
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
# Define compute_metrics function
metric = evaluate.combine(["accuracy", "f1", "precision", "recall"])

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)  # Convert probabilities to predicted labels
    return metric.compute(predictions=predictions, references=labels)

In [None]:
# Define BitsAndBytesConfig for 4-bit quantization
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,  # Enables 4-bit quantization
    bnb_4bit_use_double_quant=True,  # Use double quantization for potentially higher accuracy (optional)
    bnb_4bit_quant_type="nf4",  # Quantization type (specifics depend on hardware and library)
    bnb_4bit_compute_dtype=torch.bfloat16  # Compute dtype for improved efficiency (optional)
)

In [None]:
# Load the pre-trained model with quantization and label mappings
model = AutoModelForSequenceClassification.from_pretrained(
    model_id,  # "google/gemma-2b-it"
    num_labels=2,  # Number of output labels (2 for binary sentiment classification)
    id2label=id2label,  # {0: "NEGATIVE", 1: "POSITIVE"}
    label2id=label2id,  # {"NEGATIVE": 0, "POSITIVE": 1}
    quantization_config=bnb_config  # configuration for quantization
)

In [None]:
# Prepare the model for k-bit training
model = prepare_model_for_kbit_training(model)

In [None]:
def find_linear_names(model):
    """
    This function identifies all linear layer names within a model that use 4-bit quantization.
    Args:
        model (torch.nn.Module): The PyTorch model to inspect.
    Returns:
        list: A list containing the names of all identified linear layers with 4-bit quantization.
    """
    cls = bnb.nn.Linear4bit

    # Set to store identified layer names
    lora_module_names = set()

    # Iterate through named modules in the model
    for name, module in model.named_modules():
        # Check if the current module is an instance of the 4-bit linear layer class
        if isinstance(module, cls):
            names = name.split('.')
            lora_module_names.add(names[0] if len(names) == 1 else names[-1])

        # Special case: remove 'lm_head' if present
        if 'lm_head' in lora_module_names:
            lora_module_names.remove('lm_head')
    return list(lora_module_names)

# Example usage:
modules = find_linear_names(model)
print(modules)

In [None]:
# Define LoRA configuration and apply it to the model
lora_config = LoraConfig(
    r=64,  # Reduction factor (lower r means more parameters in the adapter)
    lora_alpha=32,  # Dimensionality of the adapter projection
    target_modules=modules,  # List of modules to apply the LoRA adapter
    lora_dropout=0.05,  # Dropout rate for the adapter
    bias="none",  # Bias configuration for the adapter
    task_type="SEQ_CLS"  # Task type (sequence classification in this case)
)

model = get_peft_model(model, lora_config)
model.print_trainable_parameters()

In [None]:
# Define training arguments
training_args = TrainingArguments(
    output_dir="epoch_weights",  # Output directory for checkpoints
    learning_rate=2e-5,  # Learning rate for the optimizer
    per_device_train_batch_size=1,  # Batch size per device
    per_device_eval_batch_size=1,  # Batch size per device for evaluation
    num_train_epochs=1,  # Number of training epochs
    weight_decay=0.01,  # Weight decay for regularization
    eval_strategy='epoch',  # Evaluate after each epoch
    save_strategy="epoch",  # Save model checkpoints after each epoch
    load_best_model_at_end=True,  # Load the best model based on the chosen metric
    push_to_hub=False,  # Disable pushing the model to the Hugging Face Hub
    report_to="none",  # Disable logging to Weight&Bias
    metric_for_best_model='eval_loss'  # Metric for selecting the best model
)

In [None]:
# Define early stopping callback
early_stop = EarlyStoppingCallback(early_stopping_patience=1, early_stopping_threshold=.0)

In [None]:
# Start training
trainer = Trainer(
    model=model,  # The LoRA-adapted model
    args=training_args,  # Training arguments
    train_dataset=tokenized_imdb["train"],  # Training dataset
    eval_dataset=tokenized_imdb["test"],  # Evaluation dataset
    tokenizer=tokenizer,  # Tokenizer for processing text
    data_collator=data_collator,  # Data collator for preparing batches
    compute_metrics=compute_metrics,  # Function to calculate evaluation metrics
    callbacks=[early_stop]  # Optional early stopping callback
)

trainer.train()

In [None]:
def predict(input_text):
    """
    Predicts the sentiment label for a given text input.

    Args:
        input_text (str): The text to predict the sentiment for.

    Returns:
        float: The predicted probability of the text being positive sentiment.
    """
    inputs = tokenizer(input_text, return_tensors="pt").to("cuda")  # Convert to PyTorch tensors and move to GPU (if available)
    with torch.no_grad():
        outputs = model(**inputs).logits  # Get the model's output logits
    y_prob = torch.sigmoid(outputs).tolist()[0]  # Apply sigmoid activation and convert to list
    return np.round(y_prob, 5)  # Round the predicted probability to 5 decimal places

In [None]:
predict("The movie was the best movie I have ever seen!!!")

In [None]:
predict("The movie was boring")

In [None]:
# Evaluate the model on a subset of the test dataset
df_test = pd.DataFrame(dataset_imdb['test']).head(10)

df_test['prediction'] = df_test['text'].map(predict)
df_test['y_pred'] = df_test['prediction'].apply(lambda x: np.argmax(x, axis=0))
accuracy = (df_test['y_pred'] == df_test['label']).mean()
print(f"Model Accuracy on Test Data: {accuracy:.4f}")
df_test.head()