In [16]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
from tqdm import tqdm

# 数据预处理类
class MBTIDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len
        
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]
        
        # 文本序列化
        sequence = self.tokenizer.texts_to_sequences([text])
        padded_sequence = pad_sequences(sequence, maxlen=self.max_len, padding='post')[0]
        
        return {
            'text': torch.tensor(padded_sequence, dtype=torch.long),
            'label': torch.tensor(label, dtype=torch.long)
        }

# LSTM模型架构
class MBTILSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, 
                 bidirectional, dropout):
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        self.lstm = nn.LSTM(embedding_dim,
                           hidden_dim,
                           num_layers=n_layers,
                           bidirectional=bidirectional,
                           dropout=dropout,
                           batch_first=True)
        
        lstm_output_dim = hidden_dim * 2 if bidirectional else hidden_dim
        
        self.fc = nn.Sequential(
            nn.Dropout(dropout),
            nn.Linear(lstm_output_dim, lstm_output_dim//2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(lstm_output_dim//2, output_dim)
        )
        
    def forward(self, text):
        embedded = self.embedding(text)
        
        # LSTM层
        output, (hidden, cell) = self.lstm(embedded)
        
        # 处理双向输出
        if self.lstm.bidirectional:
            hidden = torch.cat((hidden[-2], hidden[-1]), dim=1)
        else:
            hidden = hidden[-1]
        
        return self.fc(hidden)



In [17]:
# 超参数配置
MAX_WORDS = 10000
MAX_LEN = 500
EMBEDDING_DIM = 128
HIDDEN_DIM = 256
OUTPUT_DIM = 16  # 16种MBTI类型
N_LAYERS = 2
BIDIRECTIONAL = True
DROPOUT = 0.2
BATCH_SIZE = 64
EPOCHS = 15
LEARNING_RATE = 0.001



In [18]:
#可以换一个Tokenizer

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
# 数据准备

df = pd.read_csv('MBTI 500.csv')  
label_encoder = LabelEncoder()
label_encoder.classes_ = [
    'INTP', 'ENTP', 'INFJ', 'ENFJ', 'INTJ', 'ENTJ', 'INFP', 'ENFP',
    'ISTJ', 'ESTJ', 'ISFJ', 'ESFJ', 'ISTP', 'ESTP', 'ISFP', 'ESFP'
]  # MBTI 类型
texts = df['posts'].values
labels = label_encoder.fit_transform(df['type'].values)

# 创建Tokenizer
tokenizer = Tokenizer(num_words=MAX_WORDS)
tokenizer.fit_on_texts(texts)

# 划分数据集
X_train, X_val, y_train, y_val = train_test_split(
    texts, labels, test_size=0.2, random_state=42
)

# 创建DataLoader
train_dataset = MBTIDataset(X_train, y_train, tokenizer, MAX_LEN)
val_dataset = MBTIDataset(X_val, y_val, tokenizer, MAX_LEN)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)

#whole dataset
X_test,y_test =  texts, labels
test_dataset = MBTIDataset(X_test, y_test,  tokenizer, MAX_LEN)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)


In [19]:
# 初始化模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = MBTILSTM(MAX_WORDS, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM, 
                N_LAYERS, BIDIRECTIONAL, DROPOUT).to(device)

# 定义优化器和损失函数
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss()

In [20]:
# 训练循环
best_val_acc = 0
for epoch in range(EPOCHS):
    # 训练阶段
    model.train()
    train_loss, train_acc = 0, 0
    for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS}"):
        texts = batch['text'].to(device)
        labels = batch['label'].to(device)
        
        optimizer.zero_grad()
        
        outputs = model(texts)
        loss = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        train_acc += (outputs.argmax(1) == labels).sum().item()
    
    # 验证阶段
    model.eval()
    val_loss, val_acc = 0, 0
    with torch.no_grad():
        for batch in val_loader:
            texts = batch['text'].to(device)
            labels = batch['label'].to(device)
            
            outputs = model(texts)
            loss = criterion(outputs, labels)
            
            val_loss += loss.item()
            val_acc += (outputs.argmax(1) == labels).sum().item()
    
    # 计算指标
    train_loss /= len(train_loader)
    train_acc /= len(train_dataset)
    val_loss /= len(val_loader)
    val_acc /= len(val_dataset)
    
    # 保存最佳模型
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), 'best_model.pth')
    
    print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc*100:.2f}%")
    print(f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc*100:.2f}%")

# 测试模型
model.load_state_dict(torch.load('best_model.pth'))
model.eval()
test_acc = 0
with torch.no_grad():
    for batch in val_loader:
        texts = batch['text'].to(device)
        labels = batch['label'].to(device)
        
        outputs = model(texts)
        test_acc += (outputs.argmax(1) == labels).sum().item()

test_acc /= len(val_dataset)
print(f'Final Test Accuracy: {test_acc*100:.2f}%')

Epoch 1/15: 100%|██████████| 1326/1326 [01:41<00:00, 13.06it/s]


Train Loss: 2.0893 | Train Acc: 24.67%
Val Loss: 2.0564 | Val Acc: 25.67%


Epoch 2/15: 100%|██████████| 1326/1326 [01:47<00:00, 12.37it/s]


Train Loss: 1.7098 | Train Acc: 42.34%
Val Loss: 1.3605 | Val Acc: 54.80%


Epoch 3/15: 100%|██████████| 1326/1326 [01:37<00:00, 13.66it/s]


Train Loss: 0.9938 | Train Acc: 69.86%
Val Loss: 0.8827 | Val Acc: 73.00%


Epoch 4/15: 100%|██████████| 1326/1326 [01:36<00:00, 13.75it/s]


Train Loss: 0.7075 | Train Acc: 78.68%
Val Loss: 0.6550 | Val Acc: 80.03%


Epoch 5/15: 100%|██████████| 1326/1326 [01:36<00:00, 13.67it/s]


Train Loss: 0.5514 | Train Acc: 83.49%
Val Loss: 0.6191 | Val Acc: 81.38%


Epoch 6/15: 100%|██████████| 1326/1326 [01:36<00:00, 13.72it/s]


Train Loss: 0.4455 | Train Acc: 86.79%
Val Loss: 0.6100 | Val Acc: 82.06%


Epoch 7/15: 100%|██████████| 1326/1326 [01:37<00:00, 13.54it/s]


Train Loss: 0.3539 | Train Acc: 89.54%
Val Loss: 0.6816 | Val Acc: 80.62%


Epoch 8/15: 100%|██████████| 1326/1326 [01:38<00:00, 13.51it/s]


Train Loss: 0.2702 | Train Acc: 92.08%
Val Loss: 0.7127 | Val Acc: 81.35%


Epoch 9/15: 100%|██████████| 1326/1326 [01:36<00:00, 13.77it/s]


Train Loss: 0.2097 | Train Acc: 93.83%
Val Loss: 0.7916 | Val Acc: 81.64%


Epoch 10/15: 100%|██████████| 1326/1326 [01:35<00:00, 13.84it/s]


Train Loss: 0.1653 | Train Acc: 95.28%
Val Loss: 0.8404 | Val Acc: 80.61%


Epoch 11/15: 100%|██████████| 1326/1326 [01:35<00:00, 13.86it/s]


Train Loss: 0.1421 | Train Acc: 95.88%
Val Loss: 0.9014 | Val Acc: 81.57%


Epoch 12/15: 100%|██████████| 1326/1326 [01:35<00:00, 13.86it/s]


Train Loss: 0.1199 | Train Acc: 96.50%
Val Loss: 0.9344 | Val Acc: 81.19%


Epoch 13/15: 100%|██████████| 1326/1326 [01:35<00:00, 13.85it/s]


Train Loss: 0.1015 | Train Acc: 96.95%
Val Loss: 1.1148 | Val Acc: 80.33%


Epoch 14/15: 100%|██████████| 1326/1326 [01:36<00:00, 13.78it/s]


Train Loss: 0.0893 | Train Acc: 97.35%
Val Loss: 1.0553 | Val Acc: 79.74%


Epoch 15/15: 100%|██████████| 1326/1326 [01:37<00:00, 13.65it/s]


Train Loss: 0.0819 | Train Acc: 97.52%
Val Loss: 1.0797 | Val Acc: 80.75%
Final Test Accuracy: 82.06%


In [21]:
# 测试整个数据集
model.load_state_dict(torch.load('best_model.pth'))
model.eval()
test_acc = 0
with torch.no_grad():
    for batch in test_loader:
        texts = batch['text'].to(device)
        labels = batch['label'].to(device)
        
        outputs = model(texts)
        test_acc += (outputs.argmax(1) == labels).sum().item()

test_acc /= len(test_dataset)
print(f' Final Test Accuracy: {test_acc*100:.2f}%')

 Final Test Accuracy: 89.59%


In [None]:
import sys
import torch
import numpy as np
import ipywidgets as widgets
from IPython.display import display
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.preprocessing import LabelEncoder

# 确保 `device` 设定正确
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 加载模型
model.load_state_dict(torch.load('best_model.pth', map_location=device))
model.to(device)
model.eval()

# 🔹 加载 tokenizer（确保已加载）
# tokenizer = ...  # 需要手动提供 tokenizer

# 🔹 定义 LabelEncoder 并加载 MBTI 标签
label_encoder = LabelEncoder()
label_encoder.classes_ = np.array([
    'INTP', 'ENTP', 'INFJ', 'ENFJ', 'INTJ', 'ENTJ', 'INFP', 'ENFP',
    'ISTJ', 'ESTJ', 'ISFJ', 'ESFJ', 'ISTP', 'ESTP', 'ISFP', 'ESFP'
])  # MBTI 类型

# 创建输入框
text_input = widgets.Text(
    value='',
    placeholder='请输入文本...',
    description='输入:',
    layout=widgets.Layout(width='400px')
)

# 创建预测按钮
predict_button = widgets.Button(description="预测")

# 创建退出按钮
exit_button = widgets.Button(description="退出", button_style='danger')

# 创建输出框
output = widgets.Output()

# 预测函数
def predict_mbti(b):
    with output:
        output.clear_output()  # 清除之前的输出
        user_input = text_input.value
        if not user_input.strip():
            print("⚠️ 请输入文本内容！")
            return
        
        # 处理输入文本
        sequence = tokenizer.texts_to_sequences([user_input])
        padded_sequence = pad_sequences(sequence, maxlen=MAX_LEN)
        texts = torch.tensor(padded_sequence, dtype=torch.long).to(device)

        # 进行预测
        with torch.no_grad():
            outputs = model(texts)
            predicted_label = outputs.argmax(1).item()
            predicted_mbti = label_encoder.inverse_transform(np.array([predicted_label]))[0]

        # 输出结果
        print(f"🧠 预测的 MBTI 类型: **{predicted_mbti}**")

# 退出函数
def exit_notebook(b):
    print("📌 退出程序...")
    sys.exit()  # 安全退出 Python 进程

# 绑定按钮点击事件
predict_button.on_click(predict_mbti)
exit_button.on_click(exit_notebook)  # 绑定退出功能

# 显示 GUI
display(text_input, predict_button, exit_button, output)


Text(value='', description='输入:', layout=Layout(width='400px'), placeholder='请输入文本...')

Button(description='预测', style=ButtonStyle())

Output()