In [4]:
!pip install transformers datasets evaluate rouge_score loralib bitsandbytes scikit-learn peft



## Loading Libraries

In [5]:
import torch
import pandas as pd
from datasets import Dataset, load_dataset
from peft import LoraConfig, PeftModel, prepare_model_for_kbit_training, get_peft_model
from transformers import (
    AutoTokenizer,
    BitsAndBytesConfig,
    TrainingArguments,
    AutoModelForSequenceClassification, 
    TrainingArguments, 
    Trainer,
    EarlyStoppingCallback,
    DataCollatorWithPadding)

import bitsandbytes as bnb

import evaluate
import numpy as np

import random

## Logging In to Hugging Face

**It is necessary for local fine-tuning of Gemma**

    Create an Account: Visit https://huggingface.co/ and sign up for a free account.
    Generate an Access Token: Go to your profile settings (top right corner) -> Access Tokens -> Create a new token. This token grants access to Hugging Face features like uploading fine-tuned models.

In [None]:
#copy toj=ken from https://huggingface.co/settings/tokens
import huggingface_hub
hf_token = '...' # put your User Access Tokens here
# ابتدا login کنید
huggingface_hub.login(token=hf_token)

In [7]:
!hf auth whoami

[1muser: [0m AM-Nateghi


## load the imdb dataset

In [8]:
dataset_imdb = load_dataset("imdb")

### reduce the dataset (optional)

In [9]:
from datasets import DatasetDict

reduction_rate    = 0.1
num_train_to_keep = int(reduction_rate * dataset_imdb["train"].num_rows)
num_test_to_keep  = int(reduction_rate * dataset_imdb["test"].num_rows)

def select_random_indices(dataset, num_to_keep):
    indices = list(range(dataset.num_rows))
    random.shuffle(indices)
    return indices[:num_to_keep]

train_indices = select_random_indices(dataset_imdb["train"], num_train_to_keep)
test_indices  = select_random_indices(dataset_imdb["test"], num_test_to_keep)

dataset_imdb  = DatasetDict({
    "train": dataset_imdb["train"].select(train_indices),
    "test": dataset_imdb["test"].select(test_indices),
})

dataset_imdb

DatasetDict({
    train: Dataset({
        features: ['text', 'label'],
        num_rows: 2500
    })
    test: Dataset({
        features: ['text', 'label'],
        num_rows: 2500
    })
})

## Tokenization and Quantization

In [10]:
model_id  = "google/gemma-2b-it"
tokenizer = AutoTokenizer.from_pretrained(model_id)
print(f'Vocab size of the model {model_id}: {len(tokenizer.get_vocab())}')


Vocab size of the model google/gemma-2b-it: 256000


In [11]:
def preprocess_function(examples):
    return tokenizer(examples["text"], truncation=True,  max_length=512)

In [12]:
tokenized_imdb = dataset_imdb.map(preprocess_function, batched=True)

Map: 100%|██████████| 2500/2500 [00:00<00:00, 5748.41 examples/s]
Map: 100%|██████████| 2500/2500 [00:00<00:00, 5207.09 examples/s]


This creates a new dataset named tokenized_imdb with additional columns:

    input_ids: Numerical representation of the text using tokenizer vocabulary.
    attention_mask: Mask to indicate valid elements in padded sequences.

In [13]:
tokenized_imdb

DatasetDict({
    train: Dataset({
        features: ['text', 'label', 'input_ids', 'attention_mask'],
        num_rows: 2500
    })
    test: Dataset({
        features: ['text', 'label', 'input_ids', 'attention_mask'],
        num_rows: 2500
    })
})

In [14]:
for i in range(10):
    print(len(tokenized_imdb['train'][i]['input_ids']))

512
322
163
512
198
277
463
162
512
415


## Label Preparation

In [15]:
# طبق گفته جناب تصدیقی از آنجایی که برنامه نویسان همواره عاشق هستند مگر اینکه خلافش ثابت بشه باید اینجا دومی رو بر حسب اولی بسازیم! هه
id2label = {0: "NEGATIVE", 1: "POSITIVE"}
label2id = {v: k for k, v in id2label.items()}
print(label2id)

{'NEGATIVE': 0, 'POSITIVE': 1}


In [16]:
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

## Defining Evaluation Metrics

In [17]:
metric = evaluate.combine(["accuracy", "f1", "precision", "recall"])

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)  # Convert probabilities to predicted labels
    return metric.compute(predictions=predictions, references=labels)

## Quantization Configuration

In [18]:
# QLoRa Configuration
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,  # Enables 4-bit quantization
    bnb_4bit_use_double_quant=True,  # Use double quantization for potentially higher accuracy (optional)
    bnb_4bit_quant_type="nf4",  # Quantization type (specifics depend on hardware and library, now, our library is QLoRa)
    bnb_4bit_compute_dtype=torch.bfloat16  # Compute dtype for improved efficiency (optional)
)

## Loading GEMMA-2b in 4-bit

In [19]:
model = AutoModelForSequenceClassification.from_pretrained(
    model_id,  # "google/gemma-2b-it"
    num_labels=2,  # Number of output labels (2 for binary sentiment classification)
    id2label=id2label,  # {0: "NEGATIVE", 1: "POSITIVE"} 
    label2id=label2id,  # {"NEGATIVE": 0, "POSITIVE": 1}
    quantization_config=bnb_config  # configuration for quantization 
)

Loading checkpoint shards: 100%|██████████| 2/2 [00:12<00:00,  6.25s/it]
Some weights of GemmaForSequenceClassification were not initialized from the model checkpoint at google/gemma-2b-it and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


## Fine-Tuning with LoRA Adapter

زمانی از prepare_model_for_kbit_training استفاده کنید که:

    حافظه GPU محدود دارید
    می‌خواهید سرعت آموزش را افزایش دهید
    با مدل‌های خیلی بزرگ کار می‌کنید
    از کوانتیزاسیون 8-bit استفاده می‌کنید


In [20]:
model = prepare_model_for_kbit_training(model)

In [21]:
def find_linear_names(model):
    """
    This function identifies all linear layer names within a model that use 4-bit quantization.
    Args:
        model (torch.nn.Module): The PyTorch model to inspect.
    Returns:
        list: A list containing the names of all identified linear layers with 4-bit quantization.
    """
    cls = bnb.nn.Linear4bit  

    # Set to store identified layer names
    lora_module_names = set()

    # Iterate through named modules in the model
    for name, module in model.named_modules():
        # Check if the current module is an instance of the 4-bit linear layer class
        if isinstance(module, cls):
            names = name.split('.')
            lora_module_names.add(names[0] if len(names) == 1 else names[-1])

        # Special case: remove 'lm_head' if present
        if 'lm_head' in lora_module_names: 
            lora_module_names.remove('lm_head')
    return list(lora_module_names)

# Example usage:
modules = find_linear_names(model)
print(modules)

['o_proj', 'down_proj', 'v_proj', 'q_proj', 'k_proj', 'up_proj', 'gate_proj']


In [22]:
lora_config = LoraConfig(
    r=64,  # Reduction factor (lower r means more parameters in the adapter)
    lora_alpha=32,  # Dimensionality of the adapter projection
    target_modules=modules,  # List of modules to apply the LoRA adapter
    lora_dropout=0.05,  # Dropout rate for the adapter
    bias="none",  # Bias configuration for the adapter
    task_type="SEQ_CLS"  # Task type (sequence classification in this case)
)

model = get_peft_model(model, lora_config)
model.print_trainable_parameters()

trainable params: 78,450,688 || all params: 2,584,627,200 || trainable%: 3.0353


## Training Arguments

In [23]:
training_args = TrainingArguments(
    output_dir="epoch_weights",  # Output directory for checkpoints
    learning_rate=2e-5,  # Learning rate for the optimizer
    per_device_train_batch_size=1,  # Batch size per device
    per_device_eval_batch_size=1,  # Batch size per device for evaluation 
    num_train_epochs=5,  # Number of training epochs
    weight_decay=0.01,  # Weight decay for regularization
    eval_strategy='epoch',  # Evaluate after each epoch
    save_strategy="epoch",  # Save model checkpoints after each epoch
    load_best_model_at_end=True,  # Load the best model based on the chosen metric
    push_to_hub=False,  # Disable pushing the model to the Hugging Face Hub 
    report_to="none",  # Disable logging to Weight&Bias
    metric_for_best_model='eval_loss'  # Metric for selecting the best model 
)

## Early Stopping (Optional)

In [24]:
early_stop = EarlyStoppingCallback(early_stopping_patience=1, early_stopping_threshold=.0)

## Starting the Training

In [25]:
trainer = Trainer(
    model=model,  # The LoRA-adapted model
    args=training_args,  # Training arguments
    train_dataset=tokenized_imdb["train"],  # Training dataset
    eval_dataset=tokenized_imdb["test"],  # Evaluation dataset
    tokenizer=tokenizer,  # Tokenizer for processing text
    data_collator=data_collator,  # Data collator for preparing batches
    compute_metrics=compute_metrics,  # Function to calculate evaluation metrics
    callbacks=[early_stop]  # Optional early stopping callback
)

trainer.train()

  trainer = Trainer(
`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`.
  return fn(*args, **kwargs)


Epoch,Training Loss,Validation Loss


KeyboardInterrupt: 

## save model

In [None]:
peft_model_path="./peft-gemma-imdb"

trainer.model.save_pretrained(peft_model_path)


In [None]:
tokenizer.save_pretrained(peft_model_path)

## load the saved model

In [None]:
model = AutoModelForSequenceClassification.from_pretrained(
   peft_model_path, num_labels=2, 
    id2label=id2label, 
    label2id=label2id,
    quantization_config=bnb_config,
)

## Making Predictions

In [None]:
def predict(input_text):
    """
    Predicts the sentiment label for a given text input.

    Args:
        input_text (str): The text to predict the sentiment for.

    Returns:
        float: The predicted probability of the text being positive sentiment.
    """
    inputs = tokenizer(input_text, return_tensors="pt").to("cuda")  # Convert to PyTorch tensors and move to GPU (if available)
    with torch.no_grad():
        outputs = model(**inputs).logits  # Get the model's output logits
    y_prob = torch.sigmoid(outputs).tolist()[0]  # Apply sigmoid activation and convert to list
    return np.round(y_prob, 5)  # Round the predicted probability to 5 decimal places

In [None]:
predict("The movie was the best movie I have ever seen!!!")

In [None]:
predict("The movie was perfect")

In [None]:
predict("The movie was boring")

In [None]:
predict("The movie was not bad, it was good")

In [None]:
predict("The movie was not good, it was bad")

In [None]:
df_test = pd.DataFrame(dataset_imdb['test']).head(10) 

df_test['prediction'] = df_test['text'].map(predict)
df_test['y_pred'] = df_test['prediction'].apply(lambda x: np.argmax(x, axis=0))
accuracy = (df_test['y_pred'] == df_test['label']).mean()
print(f"Model Accuracy on Test Data: {accuracy:.4f}")
df_test.head()