In [1]:
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AutoModelForSequenceClassification, TrainingArguments, Trainer
from sklearn.model_selection import train_test_split
import torch
from sklearn.utils.class_weight import compute_class_weight
import numpy as np
import torch

In [2]:
data = pd.read_csv(r"C:\Users\ASUS\Desktop\github\fine_tunning\persian-twitter-dataset-sentiment-analysis\versions\4\cleaned_comment.csv")

In [3]:
X_train, X_test, y_train, y_test = train_test_split(data['comment_cleaned'], data['label'], test_size=0.2, random_state=42, stratify=data['label'], shuffle=True)

In [4]:
X_train = X_train.dropna()
y_train = y_train.loc[X_train.index]  

X_test = X_test.dropna()
y_test = y_test.loc[X_test.index]

In [5]:
X_train = X_train.astype(str)
X_test = X_test.astype(str)

In [6]:
from datasets import Dataset
train_dataset = Dataset.from_pandas(train_df := pd.DataFrame({'text': X_train, 'label': y_train}))
test_dataset = Dataset.from_pandas(test_df := pd.DataFrame({'text': X_test, 'label': y_test}))

tokenizer = AutoTokenizer.from_pretrained("PartAI/TookaBERT-Base") 
def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=128)

train_dataset = train_dataset.map(tokenize_function, batched=True, remove_columns=["text"])
test_dataset = test_dataset.map(tokenize_function, batched=True, remove_columns=["text"])

train_dataset.set_format("torch", columns=["input_ids", "attention_mask", "label"])
test_dataset.set_format("torch", columns=["input_ids", "attention_mask", "label"])

Map:   0%|          | 0/52579 [00:00<?, ? examples/s]

Map:   0%|          | 0/13151 [00:00<?, ? examples/s]

In [7]:
import torch.nn as nn
from transformers import AutoModel

# Compute class weights (make sure y_train is a numpy array)
class_weights = compute_class_weight("balanced", classes=np.unique(y_train), y=y_train)
class_weights = torch.tensor(class_weights, dtype=torch.float).to("cuda")


class CustomTookaModel(nn.Module):
    def __init__(self, pretrained_model_name, num_labels):
        super(CustomTookaModel, self).__init__()
        self.bert = AutoModel.from_pretrained(pretrained_model_name)
        self.dropout = nn.Dropout(0.3)
        self.classifier = nn.Linear(self.bert.config.hidden_size, num_labels)
        self.loss_fn = nn.CrossEntropyLoss(weight=class_weights)
    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output  
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        
        if labels is not None:
            # loss_fn = nn.CrossEntropyLoss()
            loss = self.loss_fn(logits, labels)
            return {"loss": loss, "logits": logits}
        
        return {"logits": logits}
    
custom_model = CustomTookaModel(pretrained_model_name="PartAI/TookaBERT-Base", num_labels=2)

# freezing if needed

for param in custom_model.bert.parameters():
    param.requires_grad = True

# # Unfreeze the last 4 encoder layers
# for layer in custom_model.bert.encoder.layer[-4:]:
#     for param in layer.parameters():
#         param.requires_grad = True

# # Also unfreeze the pooler (optional) and classifier
# for param in custom_model.bert.pooler.parameters():
#     param.requires_grad = True

# for param in custom_model.classifier.parameters():
#     param.requires_grad = True

Some weights of BertModel were not initialized from the model checkpoint at PartAI/TookaBERT-Base and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [8]:
training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    save_strategy="epoch", 
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    num_train_epochs=2,
    weight_decay=0.01,
    fp16=False,
    metric_for_best_model="accuracy"
)

In [9]:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = logits.argmax(axis=-1)
    acc = accuracy_score(labels, predictions)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, predictions, average="weighted")
    return {
        "accuracy": acc,
        "precision": precision,
        "recall": recall,
        "f1": f1
    }

In [10]:
import torch
from transformers import Trainer
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

# Assume y_train is your training labels as a numpy array or list
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.get("labels").to(device)
        
        outputs = model(**inputs)
        logits = outputs.get("logits")

        loss_fct = torch.nn.CrossEntropyLoss(weight=class_weights.to("cuda"))
        loss = loss_fct(logits, labels)

        return (loss, outputs) if return_outputs else loss


In [11]:
from transformers import DataCollatorWithPadding
from transformers import EarlyStoppingCallback
tokenizer = AutoTokenizer.from_pretrained("PartAI/TookaBERT-Base") 
trainer = CustomTrainer(
    model=custom_model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    tokenizer=tokenizer, 
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)] 
)

  trainer = CustomTrainer(


In [12]:
import torch
torch.cuda.empty_cache()
custom_model.to("cuda")
trainer.train()

Using EarlyStoppingCallback without load_best_model_at_end=True. Once training is finished, the best model will not be loaded automatically.


Epoch,Training Loss,Validation Loss,Accuracy,Precision,Recall,F1
1,0.3109,0.296421,0.880846,0.884575,0.880846,0.880684
2,0.2568,0.305496,0.879249,0.880784,0.879249,0.87921


TrainOutput(global_step=6574, training_loss=0.29962170591896925, metrics={'train_runtime': 2313.0104, 'train_samples_per_second': 45.464, 'train_steps_per_second': 2.842, 'total_flos': 0.0, 'train_loss': 0.29962170591896925, 'epoch': 2.0})

In [None]:
# optimizer = trainer.optimizer
# new_lr = 2e-6  # or any value you want
# for param_group in optimizer.param_groups:
#     param_group['lr'] = new_lr

# trainer.train()