# Using BERT + BiLSTM as Sentiment Analyzer
## Unified preprocessed:
1. 字母小寫
2. 刪除網址
3. 移除標點符號（所有標點符號都要刪除）
4. 移除非英文字母
5. 計算類別權重（class_weight）
6. 移除停用詞（stop_words）
7. 不替換用戶名（replace_username：False）
8. 不替換 COVID 相關詞彙（replace_covid：False）

### 2025/05/29 01:49 by sky
## gradient search for following hyperparameter：
### 階段一
    batch sizes = [64, 128, 256]
    學習率=[0.01,0.001, 0.0001]
### 階段二
    hidden_dim=[128, 256, 512]
    num_layers=[2, 3, 4]
    dropout=[0.1, 0.2, 0.3]
### 階段三
    max lengths = [30, 50]
### 階段四
    pooling methods = ['hidden state', 'max pooling', 'mean pooling']
    activation function：[None, ReLU, tanh]

## 步驟1：載入套件

In [2]:
import torch
torch.cuda.is_available()

False

In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertModel
import re
import string
from sklearn.utils.class_weight import compute_class_weight
import matplotlib.pyplot as plt
from tqdm import tqdm
from torch.optim.lr_scheduler import CosineAnnealingLR
import nltk
from nltk.corpus import stopwords
import matplotlib.font_manager as fm
from itertools import product
import seaborn as sns
from sklearn.metrics import confusion_matrix, f1_score
import copy

KeyboardInterrupt: 

In [None]:
print(1)

In [None]:
# 設置中文字型
plt.rcParams['font.sans-serif'] = ['Microsoft JhengHei']
plt.rcParams['axes.unicode_minus'] = False
nltk.download('stopwords')

# 步驟2：設置參數

In [None]:
OUTPUT_DIM = 5  # 五種情感類別
EPOCHS = 15  # 訓練輪數
PATIENCE = 5  # 早停耐心值
CLIP_GRAD_NORM = 1.0  # 梯度裁剪範圍
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
default_params = {
    'batch_size': 64,
    'learning_rate': 0.0001,
    'hidden_dim': 256,
    'num_layers': 4,
    'dropout': 0.2,
    'max_length': 50,
    'pooling_method': 'max pooling',  # 修正為 'max pooling'
    'activation_function': 'softmax'
}

In [None]:
# 階段性參數
stage_params = {
    'stage1': {
        'batch_size': [64, 128, 256],
        'learning_rate': [0.01, 0.001, 0.0001]
    },
    'stage2': {
        'hidden_dim': [128, 256, 512],
        'num_layers': [2, 3, 4],
        'dropout': [0.1, 0.2, 0.3]
    },
    'stage3': {
        'max_length': [30, 50]
    },
    'stage4': {
        'pooling_method': ['hidden state', 'max pooling', 'mean pooling'],
        'activation_function': [None, 'ReLU', 'tanh', 'softmax']
    }
}

## 步驟3：資料清理函數

In [None]:
# 步驟3：資料清理函數
def clean_text(text):
    # 轉換為小寫
    text = text.lower()
    # 移除網址
    text = re.sub(r'http\\S+|www\\S+|https\\S+', '', text, flags=re.MULTILINE)
    # 移除標點符號
    text = text.translate(str.maketrans('', '', string.punctuation))
    # 移除非英文字母
    text = re.sub(r'[^a-z\\s]', '', text)
    # 自定義停用詞（保留情感相關詞彙）
    custom_stop_words = set(stopwords.words('english')) - {'not', 'very', 'really'}
    text = ' '.join(word for word in text.split() if word not in custom_stop_words)
    # 移除多餘空格
    text = ' '.join(text.split())
    return text

## 步驟4：讀取資料

In [None]:
train_df = pd.read_csv('Corona_NLP_train.csv', encoding='latin1')
test_df = pd.read_csv('Corona_NLP_test.csv', encoding='latin1')

# 清理資料
train_df['clean_text'] = train_df['OriginalTweet'].apply(clean_text)
test_df['clean_text'] = test_df['OriginalTweet'].apply(clean_text)

# 檢查清理後文本長度
text_lengths = train_df['clean_text'].apply(lambda x: len(x.split()))
print(f"平均長度: {text_lengths.mean():.2f}, 最大長度: {text_lengths.max()}")

print("清理後的訓練資料前5筆：")
print(train_df[['OriginalTweet', 'clean_text']].head())
print("\n清理後的測試資料前5筆：")
print(test_df[['OriginalTweet', 'clean_text']].head())

## 步驟5：標籤編碼 & class weight

In [None]:
sentiment_mapping = {
    'Extremely Negative': 0,
    'Negative': 1,
    'Neutral': 2,
    'Positive': 3,
    'Extremely Positive': 4
}
train_df['label'] = train_df['Sentiment'].map(sentiment_mapping)
test_df['label'] = test_df['Sentiment'].map(sentiment_mapping)

print("\n訓練資料類別分佈：")
print(train_df['label'].value_counts())
print("\n測試資料類別分佈：")
print(test_df['label'].value_counts())

# 計算類別權重
class_weights = compute_class_weight('balanced', classes=np.unique(train_df['label']), y=train_df['label'])
class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)

## 步驟 6：創建自定義資料集

In [None]:
class TweetDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

## 步驟7：定義 BERT-BiLSTM 模型

In [None]:
class BertBiLSTM(nn.Module):
    def __init__(self, hidden_dim, output_dim, num_layers, dropout, pooling_method, activation_function):
        super(BertBiLSTM, self).__init__()
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        for param in self.bert.encoder.layer[:10].parameters():
            param.requires_grad = False
        self.lstm = nn.LSTM(
            input_size=768,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True
        )
        self.dropout = nn.Dropout(dropout)
        self.pooling_method = pooling_method
        if pooling_method == 'hidden state':
            self.fc = nn.Linear(hidden_dim * 2, output_dim)
        else:
            self.fc = nn.Linear(hidden_dim * 4, output_dim)
        self.activation_function = activation_function
        if activation_function == 'ReLU':
            self.activation = nn.ReLU()
        elif activation_function == 'tanh':
            self.activation = nn.Tanh()
        elif activation_function == 'softmax':
            self.activation = nn.Softmax(dim=1)
        else:
            self.activation = None
    
    def forward(self, input_ids, attention_mask):
        bert_outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        embedded = bert_outputs[0]
        lstm_out, (hidden, _) = self.lstm(embedded)
        if self.pooling_method == 'hidden state':
            hidden = torch.cat((hidden[-2], hidden[-1]), dim=1)
            pooled = hidden
        elif self.pooling_method == 'max pooling':
            max_pool, _ = torch.max(lstm_out, dim=1)
            avg_pool = torch.mean(lstm_out, dim=1)
            pooled = torch.cat((max_pool, avg_pool), dim=1)
        elif self.pooling_method == 'mean pooling':
            pooled = torch.mean(lstm_out, dim=1)
        else:
            raise ValueError(f"Invalid pooling_method: {self.pooling_method}. Expected 'hidden state', 'max pooling', or 'mean pooling'.")
        pooled = self.dropout(pooled)
        output = self.fc(pooled)
        if self.activation:
            output = self.activation(output)
        return output

## 步驟8：訓練與驗證函數

In [None]:
def train_epoch(model, data_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    for batch in tqdm(data_loader, desc="訓練"):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        optimizer.zero_grad()
        outputs = model(input_ids, attention_mask)
        loss = criterion(outputs, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), CLIP_GRAD_NORM)
        optimizer.step()
        total_loss += loss.item()
        _, preds = torch.max(outputs, dim=1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)
    return total_loss / len(data_loader), correct / total

def evaluate(model, data_loader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for batch in tqdm(data_loader, desc="驗證"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            _, preds = torch.max(outputs, dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    f1 = f1_score(all_labels, all_preds, average='weighted')
    return total_loss / len(data_loader), correct / total, f1, all_preds, all_labels

## 步驟 9：階段性參數實驗

In [None]:
def staged_search():
    best_params = default_params.copy()
    best_val_acc = 0
    results = []
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

    # 階段一：batch_size 和 learning_rate
    print("\n階段一：測試 batch_size 和 learning_rate")
    for batch_size, learning_rate in product(stage_params['stage1']['batch_size'], stage_params['stage1']['learning_rate']):
        print(f"\n測試參數：batch_size={batch_size}, learning_rate={learning_rate}")
        train_dataset = TweetDataset(
            texts=train_df['clean_text'].to_numpy(),
            labels=train_df['label'].to_numpy(),
            tokenizer=tokenizer,
            max_len=best_params['max_length']
        )
        test_dataset = TweetDataset(
            texts=test_df['clean_text'].to_numpy(),
            labels=test_df['label'].to_numpy(),
            tokenizer=tokenizer,
            max_len=best_params['max_length']
        )
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=batch_size)
        model = BertBiLSTM(
            hidden_dim=best_params['hidden_dim'],
            output_dim=OUTPUT_DIM,
            num_layers=best_params['num_layers'],
            dropout=best_params['dropout'],
            pooling_method=best_params['pooling_method'],
            activation_function=best_params['activation_function']
        ).to(device)
        criterion = nn.CrossEntropyLoss(weight=class_weights).to(device)
        optimizer = optim.Adam([
            {'params': model.bert.parameters(), 'lr': learning_rate, 'weight_decay': 1e-4},
            {'params': list(model.lstm.parameters()) + list(model.fc.parameters()), 'lr': learning_rate, 'weight_decay': 1e-4}
        ])
        scheduler = CosineAnnealingLR(optimizer, T_max=10)
        best_val_loss = float('inf')
        counter = 0
        for epoch in range(EPOCHS):
            train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
            val_loss, val_acc, val_f1, _, _ = evaluate(model, test_loader, criterion, device)
            scheduler.step()
            print(f'輪次 {epoch+1}/{EPOCHS}, 訓練損失: {train_loss:.4f}, 訓練準確率: {train_acc:.4f}')
            print(f'驗證損失: {val_loss:.4f}, 驗證準確率: {val_acc:.4f}, F1: {val_f1:.4f}')
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                counter = 0
            else:
                counter += 1
                if counter >= PATIENCE:
                    print(f'早停於輪次 {epoch+1}')
                    break
        results.append({
            'stage': 1,
            'batch_size': batch_size,
            'learning_rate': learning_rate,
            'val_acc': val_acc,
            'val_f1': val_f1
        })
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_params['batch_size'] = batch_size
            best_params['learning_rate'] = learning_rate
    print(f"\n階段一最佳參數：batch_size={best_params['batch_size']}, learning_rate={best_params['learning_rate']}, 驗證準確率={best_val_acc:.4f}")

    # 階段二：hidden_dim, num_layers, dropout
    print("\n階段二：測試 hidden_dim, num_layers, dropout")
    for hidden_dim, num_layers, dropout in product(stage_params['stage2']['hidden_dim'], stage_params['stage2']['num_layers'], stage_params['stage2']['dropout']):
        print(f"\n測試參數：hidden_dim={hidden_dim}, num_layers={num_layers}, dropout={dropout}")
        train_dataset = TweetDataset(
            texts=train_df['clean_text'].to_numpy(),
            labels=train_df['label'].to_numpy(),
            tokenizer=tokenizer,
            max_len=best_params['max_length']
        )
        test_dataset = TweetDataset(
            texts=test_df['clean_text'].to_numpy(),
            labels=test_df['label'].to_numpy(),
            tokenizer=tokenizer,
            max_len=best_params['max_length']
        )
        train_loader = DataLoader(train_dataset, batch_size=best_params['batch_size'], shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=best_params['batch_size'])
        model = BertBiLSTM(
            hidden_dim=hidden_dim,
            output_dim=OUTPUT_DIM,
            num_layers=num_layers,
            dropout=dropout,
            pooling_method=best_params['pooling_method'],
            activation_function=best_params['activation_function']
        ).to(device)
        criterion = nn.CrossEntropyLoss(weight=class_weights).to(device)
        optimizer = optim.Adam([
            {'params': model.bert.parameters(), 'lr': best_params['learning_rate'], 'weight_decay': 1e-4},
            {'params': list(model.lstm.parameters()) + list(model.fc.parameters()), 'lr': best_params['learning_rate'], 'weight_decay': 1e-4}
        ])
        scheduler = CosineAnnealingLR(optimizer, T_max=10)
        best_val_loss = float('inf')
        counter = 0
        for epoch in range(EPOCHS):
            train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
            val_loss, val_acc, val_f1, _, _ = evaluate(model, test_loader, criterion, device)
            scheduler.step()
            print(f'輪次 {epoch+1}/{EPOCHS}, 訓練損失: {train_loss:.4f}, 訓練準確率: {train_acc:.4f}')
            print(f'驗證損失: {val_loss:.4f}, 驗證準確率: {val_acc:.4f}, F1: {val_f1:.4f}')
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                counter = 0
            else:
                counter += 1
                if counter >= PATIENCE:
                    print(f'早停於輪次 {epoch+1}')
                    break
        results.append({
            'stage': 2,
            'hidden_dim': hidden_dim,
            'num_layers': num_layers,
            'dropout': dropout,
            'val_acc': val_acc,
            'val_f1': val_f1
        })
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_params['hidden_dim'] = hidden_dim
            best_params['num_layers'] = num_layers
            best_params['dropout'] = dropout
    print(f"\n階段二最佳參數：hidden_dim={best_params['hidden_dim']}, num_layers={best_params['num_layers']}, dropout={best_params['dropout']}, 驗證準確率={best_val_acc:.4f}")

    # 階段三：max_length
    print("\n階段三：測試 max_length")
    for max_length in stage_params['stage3']['max_length']:
        print(f"\n測試參數：max_length={max_length}")
        train_dataset = TweetDataset(
            texts=train_df['clean_text'].to_numpy(),
            labels=train_df['label'].to_numpy(),
            tokenizer=tokenizer,
            max_len=max_length
        )
        test_dataset = TweetDataset(
            texts=test_df['clean_text'].to_numpy(),
            labels=test_df['label'].to_numpy(),
            tokenizer=tokenizer,
            max_len=max_length
        )
        train_loader = DataLoader(train_dataset, batch_size=best_params['batch_size'], shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=best_params['batch_size'])
        model = BertBiLSTM(
            hidden_dim=best_params['hidden_dim'],
            output_dim=OUTPUT_DIM,
            num_layers=best_params['num_layers'],
            dropout=best_params['dropout'],
            pooling_method=best_params['pooling_method'],
            activation_function=best_params['activation_function']
        ).to(device)
        criterion = nn.CrossEntropyLoss(weight=class_weights).to(device)
        optimizer = optim.Adam([
            {'params': model.bert.parameters(), 'lr': best_params['learning_rate'], 'weight_decay': 1e-4},
            {'params': list(model.lstm.parameters()) + list(model.fc.parameters()), 'lr': best_params['learning_rate'], 'weight_decay': 1e-4}
        ])
        scheduler = CosineAnnealingLR(optimizer, T_max=10)
        best_val_loss = float('inf')
        counter = 0
        for epoch in range(EPOCHS):
            train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
            val_loss, val_acc, val_f1, _, _ = evaluate(model, test_loader, criterion, device)
            scheduler.step()
            print(f'輪次 {epoch+1}/{EPOCHS}, 訓練損失: {train_loss:.4f}, 訓練準確率: {train_acc:.4f}')
            print(f'驗證損失: {val_loss:.4f}, 驗證準確率: {val_acc:.4f}, F1: {val_f1:.4f}')
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                counter = 0
            else:
                counter += 1
                if counter >= PATIENCE:
                    print(f'早停於輪次 {epoch+1}')
                    break
        results.append({
            'stage': 3,
            'max_length': max_length,
            'val_acc': val_acc,
            'val_f1': val_f1
        })
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_params['max_length'] = max_length
    print(f"\n階段三最佳參數：max_length={best_params['max_length']}, 驗證準確率={best_val_acc:.4f}")

    # 階段四：pooling_method 和 activation_function
    print("\n階段四：測試 pooling_method 和 activation_function")
    for pooling_method, activation_function in product(stage_params['stage4']['pooling_method'], stage_params['stage4']['activation_function']):
        print(f"\n測試參數：pooling_method={pooling_method}, activation_function={activation_function}")
        train_dataset = TweetDataset(
            texts=train_df['clean_text'].to_numpy(),
            labels=train_df['label'].to_numpy(),
            tokenizer=tokenizer,
            max_len=best_params['max_length']
        )
        test_dataset = TweetDataset(
            texts=test_df['clean_text'].to_numpy(),
            labels=test_df['label'].to_numpy(),
            tokenizer=tokenizer,
            max_len=best_params['max_length']
        )
        train_loader = DataLoader(train_dataset, batch_size=best_params['batch_size'], shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=best_params['batch_size'])
        model = BertBiLSTM(
            hidden_dim=best_params['hidden_dim'],
            output_dim=OUTPUT_DIM,
            num_layers=best_params['num_layers'],
            dropout=best_params['dropout'],
            pooling_method=pooling_method,
            activation_function=activation_function
        ).to(device)
        criterion = nn.CrossEntropyLoss(weight=class_weights).to(device)
        optimizer = optim.Adam([
            {'params': model.bert.parameters(), 'lr': best_params['learning_rate'], 'weight_decay': 1e-4},
            {'params': list(model.lstm.parameters()) + list(model.fc.parameters()), 'lr': best_params['learning_rate'], 'weight_decay': 1e-4}
        ])
        scheduler = CosineAnnealingLR(optimizer, T_max=10)
        best_val_loss = float('inf')
        counter = 0
        for epoch in range(EPOCHS):
            train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
            val_loss, val_acc, val_f1, _, _ = evaluate(model, test_loader, criterion, device)
            scheduler.step()
            print(f'輪次 {epoch+1}/{EPOCHS}, 訓練損失: {train_loss:.4f}, 訓練準確率: {train_acc:.4f}')
            print(f'驗證損失: {val_loss:.4f}, 驗證準確率: {val_acc:.4f}, F1: {val_f1:.4f}')
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                counter = 0
            else:
                counter += 1
                if counter >= PATIENCE:
                    print(f'早停於輪次 {epoch+1}')
                    break
        results.append({
            'stage': 4,
            'pooling_method': pooling_method,
            'activation_function': activation_function,
            'val_acc': val_acc,
            'val_f1': val_f1
        })
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_params['pooling_method'] = pooling_method
            best_params['activation_function'] = activation_function
    print(f"\n階段四最佳參數：pooling_method={best_params['pooling_method']}, activation_function={best_params['activation_function']}, 驗證準確率={best_val_acc:.4f}")

    # 儲存結果
    results_df = pd.DataFrame(results)
    results_df.to_csv('staged_search_results.csv', index=False)
    print(f"\n最終最佳參數：{best_params}")
    print(f"最終最佳驗證準確率: {best_val_acc:.4f}")

    # 使用最佳參數進行最終訓練並生成混淆矩陣
    print("\n使用最佳參數進行最終訓練...")
    train_dataset = TweetDataset(
        texts=train_df['clean_text'].to_numpy(),
        labels=train_df['label'].to_numpy(),
        tokenizer=tokenizer,
        max_len=best_params['max_length']
    )
    test_dataset = TweetDataset(
        texts=test_df['clean_text'].to_numpy(),
        labels=test_df['label'].to_numpy(),
        tokenizer=tokenizer,
        max_len=best_params['max_length']
    )
    train_loader = DataLoader(train_dataset, batch_size=best_params['batch_size'], shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=best_params['batch_size'])
    model = BertBiLSTM(
        hidden_dim=best_params['hidden_dim'],
        output_dim=OUTPUT_DIM,
        num_layers=best_params['num_layers'],
        dropout=best_params['dropout'],
        pooling_method=best_params['pooling_method'],
        activation_function=best_params['activation_function']
    ).to(device)
    criterion = nn.CrossEntropyLoss(weight=class_weights).to(device)
    optimizer = optim.Adam([
        {'params': model.bert.parameters(), 'lr': best_params['learning_rate'], 'weight_decay': 1e-4},
        {'params': list(model.lstm.parameters()) + list(model.fc.parameters()), 'lr': best_params['learning_rate'], 'weight_decay': 1e-4}
    ])
    scheduler = CosineAnnealingLR(optimizer, T_max=10)
    train_losses = []
    val_losses = []
    val_accuracies = []
    best_val_loss = float('inf')
    counter = 0
    best_model_state = None

    for epoch in range(EPOCHS):
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
        val_loss, val_acc, val_f1, all_preds, all_labels = evaluate(model, test_loader, criterion, device)
        scheduler.step()
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        val_accuracies.append(val_acc)
        print(f'輪次 {epoch+1}/{EPOCHS}')
        print(f'訓練損失: {train_loss:.4f}, 訓練準確率: {train_acc:.4f}')
        print(f'驗證損失: {val_loss:.4f}, 驗證準確率: {val_acc:.4f}, F1: {val_f1:.4f}')
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_state = copy.deepcopy(model.state_dict())
            counter = 0
        else:
            counter += 1
            if counter >= PATIENCE:
                print(f'早停於輪次 {epoch+1}')
                break

    # 載入最佳模型
    model.load_state_dict(best_model_state)

    # 生成混淆矩陣
    sentiment_labels = ['Extremely Negative', 'Negative', 'Neutral', 'Positive', 'Extremely Positive']
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=sentiment_labels, yticklabels=sentiment_labels)
    plt.xlabel('預測標籤')
    plt.ylabel('實際標籤')
    plt.title('混淆矩陣 (BERT + BiLSTM)')
    plt.tight_layout()
    plt.savefig('confusion_matrix_bert_bilstm.png')
    plt.show()

    # 繪製訓練/驗證曲線
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(range(1, len(train_losses)+1), train_losses, label='訓練損失')
    plt.plot(range(1, len(val_losses)+1), val_losses, label='驗證損失')
    plt.xlabel('輪次')
    plt.ylabel('損失')
    plt.title('訓練/驗證損失曲線')
    plt.legend()
    plt.subplot(1, 2, 2)
    plt.plot(range(1, len(val_accuracies)+1), val_accuracies, label='驗證準確率', color='green')
    plt.xlabel('輪次')
    plt.ylabel('準確率')
    plt.title('驗證準確率曲線')
    plt.legend()
    plt.tight_layout()
    plt.savefig('final_training_curves.png')
    plt.show()

## 步驟10：最終訓練與視覺化

In [None]:
staged_search()