<a href="https://colab.research.google.com/github/billzhao1030/Mamba-MBTI-Text-Classification/blob/main/Mamba_500.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("zeyadkhalid/mbti-personality-types-500-dataset")

print("Path to dataset files:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/zeyadkhalid/mbti-personality-types-500-dataset?dataset_version_number=1...


100%|██████████| 123M/123M [00:01<00:00, 122MB/s]


Extracting files...
Path to dataset files: /root/.cache/kagglehub/datasets/zeyadkhalid/mbti-personality-types-500-dataset/versions/1


In [None]:
import pandas as pd
import zipfile
import os


df = pd.read_csv("/root/.cache/kagglehub/datasets/zeyadkhalid/mbti-personality-types-500-dataset/versions/1/MBTI 500.csv")



In [None]:
import json
from dataclasses import dataclass , field , asdict

@dataclass
class MambaConfig():
    d_model: int = 2560
    d_intermediate: int = 0
    n_layer: int = 64
    vocab_size: int = 50277
    ssm_cfg: dict = field(default_factory=dict)
    attn_layer_idx: list = field(default_factory=list)
    attn_cfg: dict = field(default_factory=dict)
    rms_norm: bool = True
    residual_in_fp32: bool = True
    fused_add_norm: bool = True
    pad_vocab_size_multiple:int = 8
    tie_embeddings: bool = True

    def to_json_string(self):
        return json.dumps(asdict(self))

    def to_dict(self):
        return asdict(self)

In [None]:
import torch
import torch.nn as nn

class MambaClassificationHead(nn.Module):
    def __init__(self, d_model, num_classes=16, **kwargs):
        super(MambaClassificationHead, self).__init__()

        # Use a linear layer to perform classification based on the input with size d_model and the number of classes to classify num_classes.
        self.classification_head = nn.Linear(d_model, num_classes, **kwargs)

    def forward(self, hidden_states):
        return self.classification_head(hidden_states)

In [None]:
# !pip install torch==2.4.1 torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
# torch.__version__
!pip install mamba_ssm evaluate

Collecting mamba_ssm
  Downloading mamba_ssm-2.2.2.tar.gz (85 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/85.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m85.4/85.4 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting evaluate
  Downloading evaluate-0.4.3-py3-none-any.whl.metadata (9.2 kB)
Collecting ninja (from mamba_ssm)
  Downloading ninja-1.11.1.1-py2.py3-none-manylinux1_x86_64.manylinux_2_5_x86_64.whl.metadata (5.3 kB)
Collecting datasets>=2.0.0 (from evaluate)
  Downloading datasets-3.0.2-py3-none-any.whl.metadata (20 kB)
Collecting dill (from evaluate)
  Downloading dill-0.3.9-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from evaluate)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess (from evaluate)
  Downloading multiprocess-0.70.17-py310-none-

In [None]:
import numpy as np
from mamba_ssm.models.mixer_seq_simple import MambaLMHeadModel
from mamba_ssm.utils.hf import load_config_hf,load_state_dict_hf
from collections import namedtuple
import torch.nn as nn
import torch


class MambaTextClassification(MambaLMHeadModel):
    def __init__(
        self,
        config: MambaConfig,
        initializer_cfg = None,
        device = None,
        dtype = None,
    ) -> None:
        super().__init__(config, initializer_cfg, device, dtype)

        # Create a classification head using MambaClassificationHead with input size of d_model and number of classes 2.
        self.classification_head = MambaClassificationHead(d_model=config.d_model, num_classes=16)

        del self.lm_head

    def forward(self, input_ids, attention_mask = None, labels = None):
        # Pass input_ids through the backbone model to receive hidden_states.
        hidden_states = self.backbone(input_ids)

        # Take the mean of hidden_states along the second dimension to create a representative [CLS] feature.
        mean_hidden_states = hidden_states.mean(dim = 1)

        # Pass mean_hidden_states through the classification head to get logits.
        logits = self.classification_head(mean_hidden_states)

        if labels is None:
            ClassificationOuptput = namedtuple("ClassificationOutput", ["logits"])
            return ClassificationOutput(logits = logits)
        else:
            ClassificationOutput = namedtuple("ClassificationOutput", ["loss", "logits"])

            # Use CrossEntropyLoss loss function to compute the loss.
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits, labels)

            return ClassificationOutput(loss = loss, logits = logits)
    def predict(self, text, tokenizer, id2label = None):
        input_ids = torch.tensor(tokenizer(text)['input_ids'], device = "cuda")[None]
        with torch.no_grad():
            logits = self.forward(input_ids).logits[0]
            label = np.argmax(logits.cpu().numpy())

        if id2label is not None:
            return id2label[label]
        else:
            return label

    @classmethod
    def from_pretrained(cls, pretrained_model_name, device = None, dtype = None, **kwargs):
        # Load the configuration from the pre-trained model.
        config_data = load_config_hf(pretrained_model_name)
        config = MambaConfig(**config_data)

        # Initialize the model from the configuration and move it to the desired device and data type.
        model = cls(config, device = device, dtype = dtype, **kwargs)

        # Load the state of the pre-trained model.
        model_state_dict = load_state_dict_hf(pretrained_model_name, device = device, dtype = dtype)
        model.load_state_dict(model_state_dict , strict=False)

        # Print the newly initialized embedding parameters.
        print (" Newly initialized embedding :",
              set(model.state_dict().keys()) - set(model_state_dict.keys())
        )

        return model.to(device)

  def forward(ctx, xz, conv1d_weight, conv1d_bias, x_proj_weight, delta_proj_weight,
  def backward(ctx, dout):
  def forward(
  def backward(ctx, dout, *args):
  def forward(ctx, x, weight, bias, process_group=None, sequence_parallel=True):
  def backward(ctx, grad_output):
  def forward(ctx, zxbcdt, conv1d_weight, conv1d_bias, dt_bias, A, D, chunk_size, initial_states=None, seq_idx=None, dt_limit=(0.0, float("inf")), return_final_states=False, activation="silu",
  def backward(ctx, dout, *args):


In [None]:
import json
import os
from transformers import Trainer
import torch

# Define a class MambaTrainer inheriting from the Trainer class.
class MambaTrainer(Trainer):
    # Define a function compute_loss to compute the loss during training.
    def compute_loss(self, model, inputs, return_outputs=False):
        # Get the input_ids and labels values from inputs.
        input_ids = inputs.pop("input_ids")
        labels = inputs.pop('labels')

        # Call the forward function of the model with input_ids and labels to get the results.
        outputs = model(input_ids=input_ids , labels=labels)

        # Get the loss value from the model's outputs.
        loss = outputs.loss

        # Return both loss and outputs if return_outputs is True, otherwise only return loss.
        return (loss, outputs) if return_outputs else loss

    def save_model(self, output_dir = None, _internal_call = False):
        # Check if the output directory is not specified, use the default directory from the 'args' argument.
        if output_dir is None:
            output_dir = self.args.output_dir

        # If the output directory does not exist, create it.
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)

        # Save the PyTorch model's state to the 'pytorch_model.bin' file in the output directory.
        torch.save(self.model.state_dict(), f"{output_dir}/pytorch_model.bin")

        # Save the tokenizer's state to the output directory.
        self.tokenizer.save_pretrained(output_dir)

        # Save the model's configuration to the 'config.json' file in the output directory.
        with open(f'{output_dir}/config.json', 'w') as f:
            json.dump(self.model.config.to_dict(), f)

In [None]:
import numpy as np
import evaluate

# Load the "accuracy" module from the evaluate library.
accuracy = evaluate.load("accuracy")

# Create a preprocessing function to encode text and truncate strings longer than the maximum input token length.
def preprocess_function(tokenizer, examples):
    samples = tokenizer(examples["text"], truncation=True)
    samples.pop('attention_mask')
    return samples

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    # Get the index of the class with the highest probability in predictions.
    predictions = np.argmax(predictions, axis=1)

    # Use the "accuracy" module to compute accuracy based on predictions and labels.
    return accuracy.compute(predictions=predictions, references=labels)

In [None]:
import os
import random
import numpy as np
import pandas as pd
from datasets import Dataset
from huggingface_hub import login
from datasets import load_dataset
from transformers import Trainer
from transformers import AutoTokenizer, TrainingArguments
from sklearn.model_selection import train_test_split



# Load data
# df = pd.read_csv('drive/MyDrive/Methodology/mbti_cleaned.csv')

# Create a dictionary to map labels to integers
label_to_id = {label: idx for idx, label in enumerate(sorted(df['type'].unique()))}

# Perform stratified train-test split
train_df, test_df = train_test_split(df, test_size=0.2, stratify=df['type'], random_state=42)

# Convert DataFrames to Hugging Face Datasets
train_dataset = Dataset.from_pandas(train_df)
test_dataset = Dataset.from_pandas(test_df)


# Initialize tokenizer
tokenizer = AutoTokenizer.from_pretrained("EleutherAI/gpt-neox-20b")
tokenizer.pad_token_id = tokenizer.eos_token_id

def preprocess_function(examples):
    # Tokenize the text
    tokenized_inputs = tokenizer(examples['posts'], truncation=True, padding='max_length', max_length=512)

    # Map labels from strings to integers using a dictionary
    labels = [label_to_id[label] for label in examples['type']]

    # Update the tokenized inputs dictionary to include labels
    tokenized_inputs['labels'] = labels

    return tokenized_inputs

# Apply the preprocessing function to both datasets
train_dataset = train_dataset.map(preprocess_function, batched=True)
test_dataset = test_dataset.map(preprocess_function, batched=True)

# Set format for PyTorch
train_dataset.set_format(type='torch', columns=['input_ids', 'labels'])
test_dataset.set_format(type='torch', columns=['input_ids', 'labels'])





Map:   0%|          | 0/84853 [00:00<?, ? examples/s]

Map:   0%|          | 0/21214 [00:00<?, ? examples/s]

In [None]:
token = os.getenv("HUGGINGFACE_TOKEN") # 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
login(token=token, write_permission=True)

# Load the Mamba model from a pretrained model.
model = MambaTextClassification.from_pretrained("BuduBuduBudu/mamba_mbti_500") # state-spaces/mamba-130m

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

  return torch.load(resolved_archive_file, map_location=mapped_device)


 Newly initialized embedding : set()


In [None]:
# Define training arguments in the TrainingArguments class.
# More details about supported parameters can be found at: https://huggingface.co/docs/transformers/main_classes/trainer
training_args = TrainingArguments(
    output_dir="mamba_mbti_500",  # Output folder name
    learning_rate=5e-5, #4e-7
    per_device_train_batch_size=4,  # Number of training samples per device
    per_device_eval_batch_size=16,  # Number of evaluation samples per device
    num_train_epochs=2,  # Number of training epochs
    warmup_ratio=0.01,  # Ratio of increasing LR during warmup
    lr_scheduler_type="cosine",  # Type of scheduler to decrease LR
    report_to="wandb",  # "wandb" if you want to log results
    evaluation_strategy="steps",  # Determine the metric for evaluation after each step
    eval_steps=0.1,  # Number of steps between evaluation batches
    save_strategy="steps",  # Determine when to save checkpoints
    save_steps=0.1,  # Number of steps between saving checkpoints
    logging_strategy="steps",  # Determine when to log information
    logging_steps=1,  # Number of steps between logging
    push_to_hub=True,  # Push the results to the Hub
    load_best_model_at_end=True,  # Load the model with the best evaluation result during training
)

# Initialize the MambaTrainer class to perform the model training process.
trainer = MambaTrainer(
    model=model,  # Model to train
    train_dataset=train_dataset,  # Training data
    eval_dataset=test_dataset,  # Evaluation data
    tokenizer=tokenizer,  # Tokenizer used to encode data
    args=training_args,  # Pre-defined training parameters
    compute_metrics=compute_metrics  # Function to calculate performance metrics for evaluation
)

# Start the training process by calling the train() function on the trainer class.
trainer.train()

# 788dd3b5a8c1bc23eafad2de4161fe1bcdeb55a7



<IPython.core.display.Javascript object>

[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
wandb: Paste an API key from your profile and hit enter, or press ctrl+c to quit:

 ··········


[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc


Step,Training Loss,Validation Loss,Accuracy
4243,0.248,0.650957,0.796267
8486,0.8708,0.638729,0.812529
12729,0.0084,0.582085,0.831149
16972,0.0596,0.51781,0.848213
21215,0.0207,0.48745,0.853682
25458,0.0177,0.60013,0.852503
29701,0.0,0.639197,0.852079
33944,0.1022,0.65052,0.855614
38187,0.3585,0.634669,0.856604


TrainOutput(global_step=42428, training_loss=0.47469002701040913, metrics={'train_runtime': 6398.6102, 'train_samples_per_second': 26.522, 'train_steps_per_second': 6.631, 'total_flos': 0.0, 'train_loss': 0.47469002701040913, 'epoch': 2.0})

In [None]:
from transformers import Trainer, TrainingArguments
from sklearn.metrics import classification_report
import torch

# Define training arguments - you don't need to set many parameters for evaluation only
training_args = TrainingArguments(
    output_dir="./results",
    per_device_eval_batch_size=8,
    do_eval=True,
    logging_dir='./logs',
    report_to="none"  # Avoids unnecessary logging
)

# Define the Trainer for evaluation
trainer = Trainer(
    model=model,
    args=training_args,
    tokenizer=tokenizer,
)

# Run predictions on the test set
predictions = trainer.predict(test_dataset)


In [None]:
preds = np.argmax(predictions.predictions, axis=1)

# Generate a classification report
true_labels = test_dataset['labels']

print(classification_report(true_labels, preds, target_names=list(label_to_id.keys())))

              precision    recall  f1-score   support

        ENFJ       0.81      0.74      0.77       307
        ENFP       0.85      0.83      0.84      1233
        ENTJ       0.90      0.81      0.85       591
        ENTP       0.86      0.85      0.85      2345
        ESFJ       0.71      0.61      0.66        36
        ESFP       0.71      0.49      0.58        72
        ESTJ       0.88      0.76      0.82        96
        ESTP       0.95      0.93      0.94       397
        INFJ       0.86      0.87      0.87      2993
        INFP       0.83      0.86      0.84      2427
        INTJ       0.87      0.88      0.87      4486
        INTP       0.86      0.88      0.87      4992
        ISFJ       0.76      0.67      0.71       130
        ISFP       0.71      0.65      0.67       175
        ISTJ       0.75      0.69      0.72       249
        ISTP       0.88      0.83      0.85       685

    accuracy                           0.86     21214
   macro avg       0.82   

In [None]:
from sklearn.metrics import accuracy_score

# Create reverse mapping from integers to MBTI types
id_to_label = {idx: label for label, idx in label_to_id.items()}

# Convert integer labels back to MBTI string labels
true_labels = [id_to_label[label.item()] for label in test_dataset['labels']]
pred_labels = [id_to_label[pred] for pred in preds]

# Define a helper function to get accuracy for each dimension
def dimension_accuracy(true_mbti, pred_mbti, index):
    true_dim = [mbti[index] for mbti in true_mbti]
    pred_dim = [mbti[index] for mbti in pred_mbti]
    return accuracy_score(true_dim, pred_dim)

# Calculate accuracies for each dimension
dimension_labels = ['I/E', 'S/N', 'T/F', 'P/J']
dimension_accuracies = {}

for i, dim_label in enumerate(dimension_labels):
    accuracy = dimension_accuracy(true_labels, pred_labels, i)
    dimension_accuracies[dim_label] = accuracy

print("Dimension Accuracies:", dimension_accuracies)

Dimension Accuracies: {'I/E': 0.9422079758649948, 'S/N': 0.9741680022626568, 'T/F': 0.9531912887715659, 'P/J': 0.91788441595173}


In [None]:
import numpy as np

# Assuming `pred_labels` and `true_labels` are arrays of predicted and true labels (e.g., MBTI types)
# and are already available after using `trainer.predict()`.

def compute_partial_accuracy(pred_labels, true_labels):
    # Convert the MBTI labels to binary vectors for each dimension
    pred_vectors = [mbti_to_vector(label) for label in pred_labels]
    true_vectors = [mbti_to_vector(label) for label in true_labels]

    # Calculate partial accuracy for each sample
    partial_accuracies = []
    for pred, true in zip(pred_vectors, true_vectors):
        correct_dims = sum(p == t for p, t in zip(pred, true))
        partial_accuracy = correct_dims / 4  # Each dimension is worth 0.25
        partial_accuracies.append(partial_accuracy)

    # Compute the average partial accuracy
    avg_partial_accuracy = np.mean(partial_accuracies)
    return avg_partial_accuracy

# Convert MBTI label to binary vector (e.g., "INTJ" -> [1, 0, 1, 0])
def mbti_to_vector(label):
    return [
        1 if label[0] == 'I' else 0,  # I/E
        1 if label[1] == 'N' else 0,  # N/S
        1 if label[2] == 'T' else 0,  # T/F
        1 if label[3] == 'J' else 0   # J/P
    ]

# Calculate and print the average partial accuracy
avg_partial_acc = compute_partial_accuracy(pred_labels, true_labels)
print(f"Average Partial Accuracy: {avg_partial_acc:.4f}")

Average Partial Accuracy: 0.9469


In [None]:
# Define training arguments in the TrainingArguments class.
# More details about supported parameters can be found at: https://huggingface.co/docs/transformers/main_classes/trainer
training_args = TrainingArguments(
    output_dir="mamba_mbti_500_fine_tune",  # Output folder name
    learning_rate=4e-7, #4e-7
    per_device_train_batch_size=4,  # Number of training samples per device
    per_device_eval_batch_size=16,  # Number of evaluation samples per device
    num_train_epochs=1,  # Number of training epochs
    warmup_ratio=0.01,  # Ratio of increasing LR during warmup
    lr_scheduler_type="cosine",  # Type of scheduler to decrease LR
    report_to="wandb",  # "wandb" if you want to log results
    evaluation_strategy="steps",  # Determine the metric for evaluation after each step
    eval_steps=0.1,  # Number of steps between evaluation batches
    save_strategy="steps",  # Determine when to save checkpoints
    save_steps=0.1,  # Number of steps between saving checkpoints
    logging_strategy="steps",  # Determine when to log information
    logging_steps=1,  # Number of steps between logging
    push_to_hub=True,  # Push the results to the Hub
    load_best_model_at_end=True,  # Load the model with the best evaluation result during training
)

# Initialize the MambaTrainer class to perform the model training process.
trainer = MambaTrainer(
    model=model,  # Model to train
    train_dataset=train_dataset,  # Training data
    eval_dataset=test_dataset,  # Evaluation data
    tokenizer=tokenizer,  # Tokenizer used to encode data
    args=training_args,  # Pre-defined training parameters
    compute_metrics=compute_metrics  # Function to calculate performance metrics for evaluation
)

# Start the training process by calling the train() function on the trainer class.
trainer.train()



Step,Training Loss,Validation Loss,Accuracy
2122,0.0,0.654673,0.85651
4244,0.0377,0.668801,0.855426
6366,0.0,0.683089,0.854624
8488,0.0015,0.697219,0.854106
10610,0.0009,0.705398,0.853446
