In [None]:
!pip install evaluate

In [None]:
import pandas as pd
import numpy as np
import torch
import os
from sklearn.model_selection import train_test_split
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers import TrainingArguments, Trainer
from datasets import Dataset
from sklearn.utils import class_weight

In [None]:
from torch import nn
from transformers.models.roberta.modeling_roberta import RobertaForSequenceClassification
from typing import Optional, Tuple, Union

os.environ["WANDB_DISABLED"] = "true"

class WeightedRobertaForSequenceClassification(RobertaForSequenceClassification):
    def __init__(self, config, **kwargs):
        weights = kwargs.pop('weights', None)
        super().__init__(config, **kwargs)
        self.loss_fct = nn.CrossEntropyLoss(weight=weights)
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.Tensor], dict]:

        outputs = super().forward(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=True,
        )

        logits = outputs.logits
        loss = None

        if labels is not None:
            if self.config.num_labels == 1:
                loss = self.loss_fct(logits.view(-1), labels.float().view(-1))
            else:
                loss = self.loss_fct(logits.view(-1, self.config.num_labels), labels.view(-1))

        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return {"loss": loss, "logits": logits, "hidden_states": outputs.hidden_states, "attentions": outputs.attentions}

In [None]:
df = pd.read_csv("/Mental_health.csv", engine='python')
print(df.shape)

(53043, 3)


In [None]:
df = df.dropna(subset=['statement', 'status'])
df['statement'] = df['statement'].astype(str).str.strip()
df = df[df['statement'].str.len() > 0]
df = df.drop_duplicates(subset='statement')

X = df['statement'].tolist()
y = df['status'].tolist()

In [None]:
labels = sorted(df['status'].unique())
label_to_id = {label: i for i, label in enumerate(labels)}
id_to_label = {i: label for label, i in label_to_id.items()}
y_ids = [label_to_id[label] for label in y]
print("\n--- Model Class ID Mappings ---")
print("Label (Class Name) to ID:")
print(label_to_id)
print("\nID to Label (Class Name):")
print(id_to_label)

EVAL_SAMPLE_SIZE = 0.1
# Use 100% of the training data
X_train, X_eval, y_train, y_eval = train_test_split(
    X, y_ids, test_size=EVAL_SAMPLE_SIZE, stratify=y_ids, random_state=42
)

In [None]:
MODEL_NAME = 'roberta-base'
MAX_LEN = 128
NUM_CLASSES = len(labels)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

In [None]:
unique_labels = np.unique(y_ids)
class_weights_array = class_weight.compute_class_weight(
    class_weight='balanced',
    classes=unique_labels,
    y=y_ids
)

class_weights_tensor = torch.tensor(class_weights_array, dtype=torch.float32).to(device)
print(f"\nCalculated Class Weights: {class_weights_array}")


print(f"\nTotal training samples being used: {len(X_train)}")
print(f"Evaluation samples (Reduced): {len(X_eval)}")
print(f"Number of classes: {len(labels)}")

In [None]:
def tokenize_function(examples):
    return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=MAX_LEN)

In [None]:
train_data_dict = {'text': X_train, 'label': y_train}
eval_data_dict = {'text': X_eval, 'label': y_eval}

raw_train_dataset = Dataset.from_dict(train_data_dict)
raw_eval_dataset = Dataset.from_dict(eval_data_dict)

tokenized_train_dataset = raw_train_dataset.map(tokenize_function, batched=True, remove_columns=['text'])
tokenized_eval_dataset = raw_eval_dataset.map(tokenize_function, batched=True, remove_columns=['text'] )

columns_to_keep = ["input_ids", "attention_mask", "label"]
tokenized_train_dataset = tokenized_train_dataset.select_columns(columns_to_keep)
tokenized_eval_dataset = tokenized_eval_dataset.select_columns(columns_to_keep)


In [None]:
MODEL_NAME = 'roberta-base'
MAX_LEN = 128
BATCH_SIZE = 16
NUM_CLASSES = len(labels)
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = WeightedRobertaForSequenceClassification.from_pretrained(
    MODEL_NAME,
    num_labels=NUM_CLASSES,
    weights=class_weights_tensor
)
model.to(device)

In [None]:
accuracy_metric = evaluate.load("accuracy")
f1_metric = evaluate.load("f1")

def compute_metrics(p):
    predictions = np.argmax(p.predictions, axis=1)
    macro_f1 = f1_metric.compute(predictions=predictions, references=p.label_ids, average='macro')['f1']

    return {
        'accuracy': accuracy_metric.compute(predictions=predictions, references=p.label_ids)['accuracy'],
        'macro_f1': macro_f1,
    }

In [None]:
training_args = TrainingArguments(
    output_dir='./results_roberta',
    num_train_epochs=3,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=64,
    learning_rate=2e-5,
    warmup_steps=150,
    weight_decay=0.05,
    logging_dir='./logs_roberta',
    logging_steps=100,
    eval_strategy="epoch",
    save_strategy="steps",
    save_steps=250,
    load_best_model_at_end=False,
    metric_for_best_model='macro_f1',
    greater_is_better=True,
    report_to="none"
)
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train_dataset,
    eval_dataset=tokenized_eval_dataset,
    compute_metrics=compute_metrics
)


print("\nStarting BERT fine-tuning with Trainer API and optimized parameters...")
trainer.train()
print("Fine-tuning complete. Model saved to the final checkpoint.")



In [None]:
results = trainer.evaluate()
print("\nFinal Evaluation Results")
print(results)

In [None]:
SAVE_DIRECTORY = "./mental_health_status_roberta_model_assets"
trainer.model.save_pretrained(SAVE_DIRECTORY)
tokenizer.save_pretrained(SAVE_DIRECTORY)
print(f"DistilBERT Model and Tokenizer saved to the folder: {SAVE_DIRECTORY}")

In [None]:
FOLDER_NAME = "mental_health_status_roberta_model_assets"
ZIP_NAME = "distilbert_mental_health_roberta_model.zip"
!zip -r $ZIP_NAME $FOLDER_NAME
print(f"Folder successfully zipped as {ZIP_NAME}")

In [None]:
from google.colab import files
files.download(ZIP_NAME)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>