In [None]:
import sys, os
if 'google.colab' in sys.modules:
    from google.colab import drive
    drive.mount('/content/gdrive')
    path_to_file = '/content/gdrive/My Drive/BT5153/Project/'
    print(path_to_file)
    os.chdir(path_to_file)
    !pwd

Mounted at /content/gdrive
/content/gdrive/My Drive/BT5153/Project/
/content/gdrive/My Drive/BT5153/Project


In [None]:
import pandas as pd
import re
import numpy as np
from imblearn.under_sampling import RandomUnderSampler
from sklearn.preprocessing import LabelEncoder
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizerFast
import matplotlib.pyplot as plt

# Data Preprocessing

In [None]:
kaggle_ds = pd.read_csv("twitter_MBTI.csv")
kaggle_ds.drop(columns=["Unnamed: 0"], inplace=True)
kaggle_ds.columns = ["post", "type"]
kaggle_ds.head()

Unnamed: 0,post,type
0,@Pericles216 @HierBeforeTheAC @Sachinettiyil T...,intj
1,@Hispanthicckk Being you makes you look cute||...,intj
2,@Alshymi Les balles sont réelles et sont tirée...,intj
3,"I'm like entp but idiotic|||Hey boy, do you wa...",intj
4,@kaeshurr1 Give it to @ZargarShanif ... He has...,intj


In [None]:
tianchi_ds = pd.read_csv(f"mbti_1.csv")
tianchi_ds.columns = ["type", "post"]
tianchi_ds.head()

Unnamed: 0,type,post
0,INFJ,'http://www.youtube.com/watch?v=qsXHcwe3krw|||...
1,ENTP,'I'm finding the lack of me in these posts ver...
2,INTP,'Good one _____ https://www.youtube.com/wat...
3,INTJ,"'Dear INTP, I enjoyed our conversation the o..."
4,ENTJ,'You're fired.|||That's another silly misconce...


## Split Post

In [None]:
def explode_column(df, col_to_explode, sep = r"\|\|\|"):
    """
    Split the specified column of a DataFrame into multiple rows by a separator,
    preserving all other columns unchanged.

    param
    ----
    df : imported dataframes
    col_to_explode : str columns want to split
    sep : str regularize expression for "|||"
    """
    df_copy = df.copy()
    # 1) split the text with "|||"
    df_copy[col_to_explode] = df_copy[col_to_explode].str.split(sep)
    # 2) explode
    df_exploded = df_copy.explode(col_to_explode)
    # 3) reset index
    return df_exploded.reset_index(drop=True)

In [None]:
df1 = explode_column(kaggle_ds, "post")
df2 = explode_column(tianchi_ds, "post")
df = pd.concat([df1, df2], ignore_index=True)

In [None]:
def clean_text(text):
  """
  clean to text, to lower case, remove abnormal characters or strings like http, @, *, #, etc
  input: str
  output: str
  """
  text = text.lower() #to lower case
  text = re.sub(r"https?://\S+", "", text) #remove links
  text = re.sub(r"@\w+", "", text) #remove @nickname
  text = re.sub(r"#\w+", "", text) #remove 话题
  text = re.sub(r"&amp;|&lt;|&gt;", "", text) #remove all HTML specific entities, avoid left in post
  text = re.sub(r"[^0-9a-zA-Z\s.,!?;:'\"()\[\]{}]", "", text) #keep only numbers, letters, and  punctuaion
  text = re.sub(r"\s+", " ", text).strip() #only keep one space if there are multiple
  return text

In [None]:
df["post"] = df["post"].apply(clean_text)
df['type']=df['type'].apply(clean_text)

In [None]:
le = LabelEncoder()
df["label_16"] = le.fit_transform(df["type"])  # 0–15

In [None]:
#四维encoding
df['E'] = df["type"].str[0].map({"e":1, "i":0})
df['N'] = df["type"].str[1].map({"n":1, "s":0})
df['T'] = df["type"].str[2].map({"t":1, "f":0})
df['P'] = df["type"].str[3].map({"p":1, "j":0})


In [None]:
# Undersample to balance 16‑class
X = df[["post"]].values
y = df["label_16"].values
rus = RandomUnderSampler(sampling_strategy="not minority", random_state=42)
X_res, y_res = rus.fit_resample(X, y)

# rebuild DataFrame after resampling
df_resampled = pd.DataFrame({
    "posts": X_res.ravel(),
    "label_16": y_res
})

In [None]:
# rebuild DataFrame after resampling
df_resampled = pd.DataFrame({
    "posts": X_res.ravel(),
    "label_16": y_res
})

In [None]:
# regenerate the four binary dims
df_resampled['type'] = le.inverse_transform(df_resampled["label_16"])
df_resampled['E'] = df_resampled["type"].str[0].map({"e":1, "i":0})
df_resampled['N'] = df_resampled["type"].str[1].map({"n":1, "s":0})
df_resampled['T'] = df_resampled["type"].str[2].map({"t":1, "f":0})
df_resampled['P'] = df_resampled["type"].str[3].map({"p":1, "j":0})

In [None]:
df_resampled

Unnamed: 0,posts,label_16,type,E,N,T,P
0,moveee this was my jam,0,enfj,1,1,0,0
1,pls no,0,enfj,1,1,0,0
2,as a fan it is hard for me to say this but i l...,0,enfj,1,1,0,0
3,"bob saget and oprah, apparently. i've heard pe...",0,enfj,1,1,0,0
4,i know what all of you are thinking and none o...,0,enfj,1,1,0,0
...,...,...,...,...,...,...,...
225323,yall never believe me when i say i get bitches...,15,istp,0,0,1,1
225324,"can't draw, can't write a song, can't make som...",15,istp,0,0,1,1
225325,social media wanna look minimal so bad,15,istp,0,0,1,1
225326,i stopped caring when i wanted more bacon.,15,istp,0,0,1,1


# T5

In [None]:
! pip install datasets -q

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/491.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m481.3/491.2 kB[0m [31m20.8 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m491.2/491.2 kB[0m [31m13.7 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/116.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m11.3 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/183.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m183.9/183.9 kB[0m [31m15.7 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/143.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
from transformers import T5Tokenizer, T5ForConditionalGeneration, TrainingArguments, Trainer, T5Config
from datasets import DatasetDict, Dataset
from sklearn.model_selection import train_test_split
import torch
import random
import numpy as np

seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

In [None]:
df = df_resampled.copy()
df['label_text'] = df['type'].str.upper()
train_df, temp_df = train_test_split(df, test_size=0.30, stratify=df_resampled["label_16"], random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.50, stratify=temp_df["label_16"], random_state=42)
for df in [train_df, val_df, test_df]:
    df['E_label'] = df['E'].apply(lambda x: 'E' if x == 1 else 'I')
    df['N_label'] = df['N'].apply(lambda x: 'N' if x == 1 else 'S')
    df['T_label'] = df['T'].apply(lambda x: 'T' if x == 1 else 'F')
    df['P_label'] = df['P'].apply(lambda x: 'P' if x == 1 else 'J')

## 16 types

In [None]:
# Build Hugging Face Datasets
def build_dataset(df):
    return Dataset.from_dict({
        'input_text': [
            "Predict this person's MBTI type based on their posts. Only reply ISTJ/ISFJ/INFJ/INTJ/ISTP/ISFP/INFP/INTP/ESTP/ESFP/ENFP/ENTP/ESTJ/ESFJ/ENFJ/ENTJ.: " + text
            for text in df["posts"]
        ],
        'target_text': df['label_text']
    })

dataset_16 = DatasetDict({
    "train": build_dataset(train_df),
    "validation": build_dataset(val_df),
    "test": build_dataset(test_df)
})

In [None]:
# Load tokenizer and model
tokenizer = T5Tokenizer.from_pretrained("t5-base")
config = T5Config.from_pretrained("t5-base")
config.dropout_rate = 0.1
model = T5ForConditionalGeneration.from_pretrained("t5-base", config=config)

In [None]:
# Tokenize dataset_16
def preprocess(example):
    inputs = tokenizer(example["input_text"], max_length=128, padding="max_length", truncation=True)
    targets = tokenizer(example["target_text"], max_length=10, padding="max_length", truncation=True)

    inputs["labels"] = targets["input_ids"]
    return inputs

tokenized_dataset = dataset_16.map(preprocess, batched=True)

In [None]:
# Train process
training_args = TrainingArguments(
    output_dir="./t5-mbti-output",
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    save_strategy="epoch",
    logging_dir="./logs",
    logging_steps=500,
    fp16=torch.cuda.is_available(),
    report_to="none"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["validation"],
    tokenizer=tokenizer
)

trainer_16 = trainer.train()
print("TrainOutput Summary:\n", trainer_16)

## 4 dimensions

In [None]:
def build_dataset(df, label_name):
    prompts = {
        "E_label": "Predict if the person is Extraverted or Introverted. Reply with only E or I: ",
        "N_label": "Predict if the person relies more on iNtuition or Sensing. Reply with only N or S: ",
        "T_label": "Predict if the person prefers Thinking or Feeling. Reply with only T or F: ",
        "P_label": "Predict if the person is more Perceiving or Judging. Reply with only P or J: ",
    }

    prompt = prompts[label_name]

    return Dataset.from_dict({
        'input_text': [prompt + text for text in df["posts"]],
        'target_text': df[label_name]
    })

def train_model_for_label(tokenized_dataset, label_name):
    training_args = TrainingArguments(
        output_dir=f"./t5-mbti-output-{label_name}",
        per_device_train_batch_size=16,
        per_device_eval_batch_size=16,
        num_train_epochs=3,
        save_strategy="epoch",
        logging_dir=f"./logs-{label_name}",
        logging_steps=500,
        fp16=torch.cuda.is_available(),
        report_to="none"
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_dataset["train"],
        eval_dataset=tokenized_dataset["validation"],
        tokenizer=tokenizer
    )

    train_output = trainer.train()
    print(f"[{label_name}] TrainOutput Summary:\n{train_output}")

    return trainer

In [None]:
dataset_dict_E = DatasetDict({
    "train": build_dataset(train_df, "E_label"),
    "validation": build_dataset(val_df, "E_label"),
    "test": build_dataset(test_df, "E_label")
})

tokenized_dataset_E = dataset_dict_E.map(preprocess, batched=True)

trainer_E = train_model_for_label(tokenized_dataset_E, "E")

In [None]:
dataset_dict_N = DatasetDict({
    "train": build_dataset(train_df, "N_label"),
    "validation": build_dataset(val_df, "N_label"),
    "test": build_dataset(test_df, "N_label")
})

tokenized_dataset_N = dataset_dict_N.map(preprocess, batched=True)

trainer_N = train_model_for_label(tokenized_dataset_N, "N")

In [None]:
dataset_dict_T = DatasetDict({
    "train": build_dataset(train_df, "T_label"),
    "validation": build_dataset(val_df, "T_label"),
    "test": build_dataset(test_df, "T_label")
})

tokenized_dataset_T = dataset_dict_T.map(preprocess, batched=True)

trainer_T = train_model_for_label(tokenized_dataset_T, "T")

In [None]:
dataset_dict_P = DatasetDict({
    "train": build_dataset(train_df, "P_label"),
    "validation": build_dataset(val_df, "P_label"),
    "test": build_dataset(test_df, "P_label")
})

tokenized_dataset_P = dataset_dict_P.map(preprocess, batched=True)

trainer_P = train_model_for_label(tokenized_dataset_P, "P")