In [None]:
import matplotlib.pyplot as plt
import pandas as pd

df = pd.read_csv('/kaggle/input/mbti-personality-types-500-dataset/MBTI 500.csv')

mbti_counts = df['type'].value_counts()
# 假设df是从文件中正确加载的DataFrame，并且 'type' 列包含了MBTI类型

# 去除NaN值
df = df.dropna(subset=['type'])

# 计算每个分类的数量
categories = ['I/E', 'N/S', 'T/F', 'J/P']
counts = {category: {'I': 0, 'E': 0, 'N': 0, 'S': 0, 'T': 0, 'F': 0, 'J': 0, 'P': 0} for category in categories}

for index, row in df.iterrows():
    mbti_type = row['type']
    counts['I/E'][mbti_type[0]] += 1
    counts['N/S'][mbti_type[1]] += 1
    counts['T/F'][mbti_type[2]] += 1
    counts['J/P'][mbti_type[3]] += 1

# 转换计数到DataFrame准备绘图
df_counts = pd.DataFrame(counts)

df_origin = df
df_origin.rename(columns={'type': 'type_mbti'}, inplace=True)

mbti_type = pd.DataFrame
mbti_type = df['type_mbti']

label_map_all = {
    'ISTJ': 1, 'ISFJ': 2, 'INFJ': 3, 'INTJ': 4,
    'ISTP': 5, 'ISFP': 6, 'INFP': 7, 'INTP': 8,
    'ESTP': 9, 'ESFP': 10, 'ENFP':11, 'ENTP':12,
    'ESTJ': 13, 'ESFJ':14, 'ENFJ':15, 'ENTJ':16
}

label_map_ie = {
    'ISTJ': 1, 'ISFJ': 1, 'INFJ': 1, 'INTJ': 1,
    'ISTP': 1, 'ISFP': 1, 'INFP': 1, 'INTP': 1,
    'ESTP': 0, 'ESFP': 0, 'ENFP': 0, 'ENTP': 0,
    'ESTJ': 0, 'ESFJ': 0, 'ENFJ': 0, 'ENTJ': 0
}
label_map_sn = {
    'ISTJ': 1, 'ISFJ': 1, 'INFJ': 0, 'INTJ': 0,
    'ISTP': 1, 'ISFP': 1, 'INFP': 2, 'INTP': 0,
    'ESTP': 1, 'ESFP': 1, 'ENFP': 0, 'ENTP': 0,
    'ESTJ': 1, 'ESFJ': 1, 'ENFJ': 0, 'ENTJ': 0
}
label_map_tf = {
    'ISTJ': 1, 'ISFJ': 0, 'INFJ': 0, 'INTJ': 1,
    'ISTP': 1, 'ISFP': 0, 'INFP': 0, 'INTP': 1,
    'ESTP': 1, 'ESFP': 0, 'ENFP': 0, 'ENTP': 1,
    'ESTJ': 1, 'ESFJ': 0, 'ENFJ': 0, 'ENTJ': 1
}
label_map_jp = {
    'ISTJ': 1, 'ISFJ': 1, 'INFJ': 1, 'INTJ': 1,
    'ISTP': 0, 'ISFP': 0, 'INFP': 0, 'INTP': 0,
    'ESTP': 0, 'ESFP': 0, 'ENFP': 0, 'ENTP': 0,
    'ESTJ': 1, 'ESFJ': 1, 'ENFJ': 1, 'ENTJ': 1
}

import pandas as pd

# 創建新的 DataFrame，複製舊 DataFrame 中的所有列
new_df = df.copy()

# 使用 map 函數將 MBTI 類型映射為對應的數字或類別，填充到五個新列中
new_df['mbti_all'] = df['type_mbti'].map(label_map_all)
new_df['mbti_ie'] = df['type_mbti'].map(label_map_ie)
new_df['mbti_sn'] = df['type_mbti'].map(label_map_sn)
new_df['mbti_tf'] = df['type_mbti'].map(label_map_tf)
new_df['mbti_jp'] = df['type_mbti'].map(label_map_jp)

# 刪除原來的 'type_mbti' 列
new_df.drop(columns=['type_mbti'], inplace=True)

print(new_df)

In [None]:
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertModel
from torch.optim import AdamW
import torch.nn as nn
import numpy as np
from tqdm import tqdm  # 导入 tqdm 用于进度条
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score

In [None]:
! pip install hf_xet

In [None]:
# 加载数据
data = new_df

# data = data.iloc[0:10000]
data = data.sample(n=10000, replace=False, random_state=42)

# 分割数据
train_data, test_data = train_test_split(data, test_size=0.2, random_state=42)

# 创建数据集类
class MBTIDataset(Dataset):
    def __init__(self, data, tokenizer, max_len):
        self.data = data
        self.tokenizer = tokenizer
        self.max_len = max_len
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        text = row['posts']
        labels = [row[f'mbti_{dim}'] for dim in ['ie', 'sn', 'tf', 'jp']]
        
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(labels, dtype=torch.float)
        }

# 初始化tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# 创建数据加载器
max_len = 512
train_dataset = MBTIDataset(train_data, tokenizer, max_len)
test_dataset = MBTIDataset(test_data, tokenizer, max_len)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)

In [None]:
# 搭建模型
class MultiTaskBert(torch.nn.Module):
    def __init__(self):
        super(MultiTaskBert, self).__init__()
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.bert.gradient_checkpointing_enable()  # 启用检查点
        self.dropout = torch.nn.Dropout(0.1)
        self.classifiers = torch.nn.ModuleList([
            torch.nn.Linear(768, 1) for _ in range(4)
        ])
    
    # def forward(self, input_ids, attention_mask):
    #     outputs = self.bert(input_ids, attention_mask=attention_mask)
    #     pooled_output = outputs[1]
    #     pooled_output = self.dropout(pooled_output)
    #     logits = torch.cat([classifier(pooled_output).squeeze() for classifier in self.classifiers], dim=-1)
    #     return logits
    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.bert(input_ids, attention_mask=attention_mask)
        pooled_output = outputs[1]  # [batch_size, 768]
        pooled_output = self.dropout(pooled_output)
        logits = torch.cat([classifier(pooled_output) for classifier in self.classifiers], dim=-1)  # [batch_size, 4]
        return logits

model = MultiTaskBert()

In [None]:
import os

# 确定保存模型的目录
model_save_path = './model/models'
os.makedirs(model_save_path, exist_ok=True)

In [None]:
# 设备和模型初始化
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = MultiTaskBert()
if torch.cuda.device_count() > 1:
    print(f"使用 {torch.cuda.device_count()} 块 GPU")
    model = nn.DataParallel(model)  # 启用数据并行
model = model.to(device)

optimizer = AdamW(model.parameters(), lr=2e-5)

def train_epoch(model, data_loader, optimizer, device):
    model.train()
    total_loss = 0
    # 使用 tqdm 包装 data_loader，显示进度条
    for batch in tqdm(data_loader, desc="Training", leave=False):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        optimizer.zero_grad()
        
        outputs = model(input_ids, attention_mask, labels)
        loss_fn = torch.nn.BCEWithLogitsLoss()
        loss = loss_fn(outputs, labels)
        total_loss += loss.item()
        loss.backward()
        optimizer.step()
    avg_loss = total_loss / len(train_loader)
    return avg_loss

def eval_model(model, data_loader, device):
    model.eval()
    final_outputs = []
    final_labels = []
    # 使用 tqdm 包装 data_loader，显示进度条
    with torch.no_grad():
        for batch in tqdm(data_loader, desc="Evaluating", leave=False):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            outputs = model(input_ids, attention_mask)
            final_outputs.append(outputs.cpu().numpy())
            final_labels.append(labels.cpu().numpy())
    
    final_outputs = np.concatenate(final_outputs, axis=0)
    final_labels = np.concatenate(final_labels, axis=0)
    # 转换为二分类预测
    preds = (torch.sigmoid(torch.tensor(final_outputs)) > 0.5).numpy()  # [num_samples, 4]
    
    # 计算每个维度的准确率
    accuracies = []
    dimension_names = ['E/I', 'N/S', 'T/F', 'J/P']
    for dim in range(4):
        acc = accuracy_score(final_labels[:, dim], preds[:, dim])
        accuracies.append(acc)
        print(f"{dimension_names[dim]} 准确率: {acc:.4f}")
    
    return preds, final_labels, accuracies

num_epochs = 3
best_metric = float('inf')  # 基于训练损失
for epoch in range(num_epochs):
    print(f'Epoch {epoch + 1}/{num_epochs}')
    train_loss = train_epoch(model, train_loader, optimizer, device)
    preds, final_labels, accuracies = eval_model(model, test_loader, device)
    avg_acc = sum(accuracies) / len(accuracies)
    print(f"第 {epoch + 1} 轮评估完成，平均准确率: {avg_acc:.4f}")
    # print(f"第 {epoch + 1} 轮评估完成，准确率: {accuracies}")
    # 保存最佳模型（基于训练损失）
    if train_loss < best_metric:
        best_metric = train_loss
        torch.save(model.state_dict(), os.path.join(model_save_path, 'best_model.pth'))
        print(f"最佳模型已保存至 {os.path.join(model_save_path, 'best_model.pth')}，训练损失: {best_metric:.4f}")

In [None]:
# 将预测结果拼接为 MBTI 类型
def predictions_to_mbti(preds):
    mbti_types = []
    for pred in preds:
        ei = 'E' if pred[0] == 1 else 'I'
        ns = 'N' if pred[1] == 1 else 'S'
        tf = 'T' if pred[2] == 1 else 'F'
        jp = 'J' if pred[3] == 1 else 'P'
        mbti_types.append(ei + ns + tf + jp)
    return mbti_types

In [None]:
# def predict_with_probability_scores(model, data_loader, device, original_data=None):
#     model.eval()
#     final_outputs = []
#     final_labels = []
    
#     with torch.no_grad():
#         for batch in tqdm(data_loader, desc="Predicting", leave=True):
#             input_ids = batch['input_ids'].to(device)
#             attention_mask = batch['attention_mask'].to(device)
#             labels = batch.get('labels')  # Labels may not exist in prediction
#             if labels is not None:
#                 labels = labels.to(device)
#                 final_labels.append(labels.cpu())
            
#             logits = model(input_ids, attention_mask)
#             final_outputs.append(logits.cpu())
    
#     final_outputs = torch.cat(final_outputs, dim=0)
#     probs = torch.sigmoid(final_outputs).numpy()  # Probabilities [num_samples, 4]
#     preds = (probs > 0.5).astype(int)  # Binary predictions [num_samples, 4]
    
#     # Compute scores: probability of the predicted class
#     scores = np.zeros_like(probs)
#     for i in range(probs.shape[0]):
#         for j in range(probs.shape[1]):
#             scores[i, j] = probs[i, j] if preds[i, j] == 1 else 1 - probs[i, j]
    
#     # Create DataFrame for predictions and scores
#     pred_columns = ['pred_EI', 'pred_NS', 'pred_TF', 'pred_JP']
#     score_columns = ['score_EI', 'score_NS', 'score_TF', 'score_JP']
#     pred_df = pd.DataFrame(preds, columns=pred_columns)
#     score_df = pd.DataFrame(scores, columns=score_columns)
    
#     if original_data is not None:
#         results_df = original_data.reset_index(drop=True).copy()
#         results_df = pd.concat([results_df, pred_df, score_df], axis=1)
#     else:
#         results_df = pd.concat([pred_df, score_df], axis=1)
    
#     # If labels are available, compute accuracies
#     accuracies = None
#     if final_labels:
#         final_labels = torch.cat(final_labels, dim=0).numpy()
#         accuracies = []
#         dimension_names = ['E/I', 'N/S', 'T/F', 'J/P']
#         for dim in range(4):
#             acc = accuracy_score(final_labels[:, dim], preds[:, dim])
#             accuracies.append(acc)
#             print(f"{dimension_names[dim]} Accuracy: {acc:.4f}")
    
#     return results_df, accuracies
def predict_text(model, texts, tokenizer, device, max_len=128, batch_size=16, preprocess=True):
    """
    Predict MBTI dimensions for input text(s) with probability scores.
    
    Args:
        model: Trained MultiTaskBert model
        texts: Single string or list of strings
        tokenizer: BERT tokenizer
        device: torch.device (cuda or cpu)
        max_len: Maximum sequence length
        batch_size: Batch size for prediction
        preprocess: Whether to apply preprocess_text
    
    Returns:
        results_df: Pandas DataFrame with original text, cleaned text, predictions, and scores
    """
    model.eval()
    
    # Convert single text to list
    if isinstance(texts, str):
        texts = [texts]
    
    # Preprocess texts if required
    cleaned_texts = [preprocess_text(text) if preprocess else text for text in texts]
    
    # Create DataFrame for input texts
    input_df = pd.DataFrame({'original_text': texts, 'cleaned_text': cleaned_texts})
    
    # Tokenize cleaned texts
    encodings = [tokenizer.encode_plus(
        cleaned_text,
        add_special_tokens=True,
        max_length=max_len,
        padding='max_length',
        truncation=True,
        return_tensors='pt'
    ) for cleaned_text in cleaned_texts]
    
    input_ids = torch.cat([enc['input_ids'] for enc in encodings], dim=0)
    attention_mask = torch.cat([enc['attention_mask'] for enc in encodings], dim=0)
    
    # Create DataLoader
    dataset = torch.utils.data.TensorDataset(input_ids, attention_mask)
    data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
    
    final_outputs = []
    
    with torch.no_grad():
        for batch in tqdm(data_loader, desc="Predicting", leave=True):
            input_ids = batch[0].to(device)
            attention_mask = batch[1].to(device)
            
            logits = model(input_ids, attention_mask)
            final_outputs.append(logits.cpu())
    
    final_outputs = torch.cat(final_outputs, dim=0)
    probs = torch.sigmoid(final_outputs).numpy()
    preds = (probs > 0.5).astype(int)
    
    # Compute scores: probability of predicted class
    scores = np.zeros_like(probs)
    for i in range(probs.shape[0]):
        for j in range(probs.shape[1]):
            scores[i, j] = probs[i, j] if preds[i, j] == 1 else 1 - probs[i, j]
    
    # Create DataFrame
    pred_columns = ['pred_EI', 'pred_NS', 'pred_TF', 'pred_JP']
    score_columns = ['score_EI', 'score_NS', 'score_TF', 'score_JP']
    pred_df = pd.DataFrame(preds, columns=pred_columns)
    score_df = pd.DataFrame(scores, columns=score_columns)
    # 拼接 MBTI 类型
    mbti_types = predictions_to_mbti(preds)
    # 创建 DataFrame
    pred_columns = ['pred_EI', 'pred_NS', 'pred_TF', 'pred_JP']
    score_columns = ['score_EI', 'score_NS', 'score_TF', 'score_JP']
    pred_df = pd.DataFrame(preds, columns=pred_columns)
    score_df = pd.DataFrame(scores, columns=score_columns)
    mbti_df = pd.DataFrame({'mbti_type': mbti_types})
    
    results_df = pd.concat([input_df, pred_df, score_df, mbti_df], axis=1)
    
    # results_df = pd.concat([input_df, pred_df, score_df], axis=1)
    
    return results_df

In [None]:
import re
import nltk
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
from transformers import BertTokenizer, BertForSequenceClassification
import torch

# 确保已经下载了nltk中的停止词和词形还原器的数据
nltk.download('wordnet')
nltk.download('stopwords')

In [None]:
def preprocess_text(text):
    # 移除表情符号和特殊字符
    text = re.sub(r'[^\w\s]', '', text)
    
    # 分词
    words = text.split()

    # 词形还原和去除停止词
    lemmatizer = WordNetLemmatizer()
    stop_words = set(stopwords.words('english'))
    cleaned_text = [lemmatizer.lemmatize(word.lower()) for word in words if word.lower() not in stop_words]

    return ' '.join(cleaned_text)

# 示例文本
sample_text = ("2024 starts with a bang 😅. Everyone's year-end summaries are so brilliant, compared to them it feels like I haven't lived at all. By contrast, I feel a year younger 😘.Today in class, I realized I lost my red pen. I remembered that my Python exam teacher borrowed it yesterday and didn't return it 😅.I only realized after the exam that there's a mode on the calculator that can calculate variance with just one click 😅. When I asked my classmate how he knew, he said that calculators are allowed in Shanghai's college entrance exams, and they learned it quite early.During the trial, it was clearly stated that Trump had never been involved with Epstein Island. I'm surprised he didn't fabricate millions of pages of documents to drag Trump down, I'm devastated.Mariah Carey, you really have a discerning eye. At that time, you shot a music video for this song with a low-budget 'nobody cares' special effect, and indeed, this song has remained popular.")
# cleaned_text = preprocess_text(sample_text)
# print("Cleaned text:", cleaned_text)

In [None]:
# 加载模型
model = MultiTaskBert()
model = model.to(device)

model_path = os.path.join(model_save_path, 'best_model.pth')
if not os.path.exists(model_path):
    raise FileNotFoundError(f"模型文件 {model_path} 不存在")
model.load_state_dict(torch.load(model_path))
# model.eval()
print(f"模型已从 {model_path} 加载")

In [None]:
# # 加载分词器和模型
# tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
# # 对文本进行编码
# encoded_input = tokenizer.encode_plus(
#     cleaned_text,
#     add_special_tokens=True,
#     max_length=64,
#     padding='max_length',
#     return_attention_mask=True,
#     truncation=True,
#     return_tensors='pt'
# )
# input_ids = encoded_input['input_ids'].to(device)
# attention_mask = encoded_input['attention_mask'].to(device)
# # 示例：预测新输入文本
# input_texts = cleaned_text
# 进行预测
results_df = predict_text(model, input_texts, tokenizer, device)
# results_df.to_csv('prediction_results_with_prob_scores.csv', index=False)
# print("Prediction results saved to 'prediction_results_with_prob_scores.csv'")
# if accuracies:
#     print(f"Dimension accuracies: {dict(zip(['E/I', 'N/S', 'T/F', 'J/P'], accuracies))}")

print("Prediction results saved to 'text_prediction_results.csv'")
print(results_df)