In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from transformers import BertTokenizer, BertModel

# 設定隨機種子與裝置
torch.manual_seed(42)
np.random.seed(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

df = pd.read_csv("SICK_filtered.tsv", sep="\t")
print("原始資料形狀:", df.shape)

# 以 entailment_label 做 stratified 切分：80% 訓練，20% 測試
df_train, df_test = train_test_split(df, test_size=0.2, random_state=42, stratify=df["entailment_label"])
print("訓練集形狀:", df_train.shape, "測試集形狀:", df_test.shape)

# 標籤編碼：將 entailment_label 轉換為整數
le = LabelEncoder()
df_train["label_enc"] = le.fit_transform(df_train["entailment_label"])
df_test["label_enc"] = le.transform(df_test["entailment_label"])
num_classes = len(le.classes_)
print("標籤類別:", le.classes_)

pretrained_model = "bert-base-uncased"  # 若資料為中文，可改用 'bert-base-chinese'
tokenizer = BertTokenizer.from_pretrained(pretrained_model)

# 設定最大序列長度
max_length = 128


class SICKBERTDataset(Dataset):
    def __init__(self, df, tokenizer, max_length):
        self.df = df.reset_index(drop=True)
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.sentences_A = df["sentence_A"].tolist()
        self.sentences_B = df["sentence_B"].tolist()
        self.labels = df["label_enc"].tolist()  # 直接使用標籤編碼結果
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        text_A = self.sentences_A[idx]
        text_B = self.sentences_B[idx]
        label = self.labels[idx]
        # 對 sentence_A 進行 tokenize
        encoding_A = self.tokenizer(
            text_A,
            add_special_tokens=True,
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt"
        )
        # 對 sentence_B 進行 tokenize
        encoding_B = self.tokenizer(
            text_B,
            add_special_tokens=True,
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt"
        )
        # 移除 batch 維度 (變成一維張量)
        input_ids_A = encoding_A["input_ids"].squeeze(0)
        attention_mask_A = encoding_A["attention_mask"].squeeze(0)
        input_ids_B = encoding_B["input_ids"].squeeze(0)
        attention_mask_B = encoding_B["attention_mask"].squeeze(0)
        
        return {
            "input_ids_A": input_ids_A,
            "attention_mask_A": attention_mask_A,
            "input_ids_B": input_ids_B,
            "attention_mask_B": attention_mask_B,
            "label": torch.tensor(label, dtype=torch.long)
        }

def create_dataloader(df, tokenizer, max_length, batch_size, shuffle=True):
    dataset = SICKBERTDataset(df, tokenizer, max_length)
    return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)


class BertDNNClassifier(nn.Module):
    def __init__(self, pretrained_model_name, hidden_dim, num_classes, dropout=0.5):
        super(BertDNNClassifier, self).__init__()
        # 分別為 sentence_A 與 sentence_B 使用獨立的 BERT encoder
        self.bert_A = BertModel.from_pretrained(pretrained_model_name)
        self.bert_B = BertModel.from_pretrained(pretrained_model_name)
        self.dropout = nn.Dropout(dropout)
        # 取 BERT 的 pooler_output (768 維) 串接後為 1536 維
        self.fc = nn.Sequential(
            nn.Linear(768 * 2, hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, num_classes)
        )
    
    def forward(self, input_ids_A, attention_mask_A, input_ids_B, attention_mask_B):
        outputs_A = self.bert_A(input_ids=input_ids_A, attention_mask=attention_mask_A)
        pooled_A = outputs_A.pooler_output  # (batch, 768)
        
        outputs_B = self.bert_B(input_ids=input_ids_B, attention_mask=attention_mask_B)
        pooled_B = outputs_B.pooler_output  # (batch, 768)
        
        # 串接兩個向量（不相加）
        features = torch.cat([pooled_A, pooled_B], dim=1)  # (batch, 1536)
        logits = self.fc(features)
        return logits


def train_and_evaluate(model, train_loader, test_loader, num_epochs, lr):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    train_losses, test_losses = [], []
    train_accuracies, test_accuracies = [], []
    epoch_list = []
    
    for epoch in range(1, num_epochs + 1):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        for batch in train_loader:
            input_ids_A = batch["input_ids_A"].to(device)
            attention_mask_A = batch["attention_mask_A"].to(device)
            input_ids_B = batch["input_ids_B"].to(device)
            attention_mask_B = batch["attention_mask_B"].to(device)
            labels = batch["label"].to(device)
            
            optimizer.zero_grad()
            outputs = model(input_ids_A, attention_mask_A, input_ids_B, attention_mask_B)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item() * labels.size(0)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
        
        train_loss = running_loss / total
        train_acc = correct / total
        
        model.eval()
        test_running_loss = 0.0
        test_correct = 0
        test_total = 0
        with torch.no_grad():
            for batch in test_loader:
                input_ids_A = batch["input_ids_A"].to(device)
                attention_mask_A = batch["attention_mask_A"].to(device)
                input_ids_B = batch["input_ids_B"].to(device)
                attention_mask_B = batch["attention_mask_B"].to(device)
                labels = batch["label"].to(device)
                
                outputs = model(input_ids_A, attention_mask_A, input_ids_B, attention_mask_B)
                loss = criterion(outputs, labels)
                test_running_loss += loss.item() * labels.size(0)
                _, preds = torch.max(outputs, 1)
                test_correct += (preds == labels).sum().item()
                test_total += labels.size(0)
        
        test_loss = test_running_loss / test_total
        test_acc = test_correct / test_total
        
        epoch_list.append(epoch)
        train_losses.append(train_loss)
        test_losses.append(test_loss)
        train_accuracies.append(train_acc)
        test_accuracies.append(test_acc)
        
        print(f"Epoch {epoch}: Train Loss={train_loss:.4f}, Train Acc={train_acc:.4f} | Test Loss={test_loss:.4f}, Test Acc={test_acc:.4f}")
    
    return epoch_list, train_losses, test_losses, train_accuracies, test_accuracies


batch_size = 16       
num_epochs = 5         
learning_rate = 2e-5   

X = df_train.copy()

fractions = [1.0, 0.5, 0.25, 0.1]

for frac in fractions:
    num_samples = int(len(X) * frac)
    df_train_subset = X.iloc[:num_samples].reset_index(drop=True)
    subset_percentage = int(frac * 100)
    df_train_subset.to_csv(f"SICK_train_subset_{subset_percentage}.tsv", sep="\t", index=False)
    
    train_loader = create_dataloader(df_train_subset, tokenizer, max_length, batch_size, shuffle=True)
    test_loader = create_dataloader(df_test, tokenizer, max_length, batch_size, shuffle=False)
    
    print(f"\n[比例 {subset_percentage}%] 訓練資料筆數: {len(df_train_subset)}")
    
    # 初始化 BERT+DNN 模型
    model = BertDNNClassifier(pretrained_model, hidden_dim=256, num_classes=num_classes, dropout=0.5)
    model.to(device)
    
    epoch_list, train_losses, test_losses, train_accs, test_accs = train_and_evaluate(model, train_loader, test_loader, num_epochs, learning_rate)
    
    # 畫圖：Loss 與 Accuracy 隨 Epoch 變化
    fig, axs = plt.subplots(1, 2, figsize=(12, 5))
    axs[0].plot(epoch_list, train_losses, label="Train Loss")
    axs[0].plot(epoch_list, test_losses, label="Test Loss")
    axs[0].set_xlabel("Epoch")
    axs[0].set_ylabel("Loss")
    axs[0].set_title(f"Loss vs Epoch (Train Subset {subset_percentage}%)")
    axs[0].legend()
    
    axs[1].plot(epoch_list, train_accs, label="Train Acc")
    axs[1].plot(epoch_list, test_accs, label="Test Acc")
    axs[1].set_xlabel("Epoch")
    axs[1].set_ylabel("Accuracy")
    axs[1].set_title(f"Accuracy vs Epoch (Train Subset {subset_percentage}%)")
    axs[1].legend()
    
    plt.tight_layout()
    plt.savefig(f"BertDNN_SICK_{subset_percentage}.png")
    plt.close()
    
    # 最終在測試集上評估模型
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for batch in test_loader:
            input_ids_A = batch["input_ids_A"].to(device)
            attention_mask_A = batch["attention_mask_A"].to(device)
            input_ids_B = batch["input_ids_B"].to(device)
            attention_mask_B = batch["attention_mask_B"].to(device)
            labels = batch["label"].to(device)
            
            outputs = model(input_ids_A, attention_mask_A, input_ids_B, attention_mask_B)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    print(f"分類報告 (Train Subset {subset_percentage}%):")
    print(classification_report(all_labels, all_preds, target_names=le.classes_))
    from sklearn.metrics import confusion_matrix

    cm = confusion_matrix(all_labels, all_preds)
    print("Confusion Matrix:")
    print(cm)

    plt.figure(figsize=(6, 6))
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title("Confusion Matrix")
    plt.colorbar()
    tick_marks = np.arange(len(le.classes_))
    plt.xticks(tick_marks, le.classes_, rotation=45)
    plt.yticks(tick_marks, le.classes_)
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()
    plt.show()


  from .autonotebook import tqdm as notebook_tqdm


原始資料形狀: (9840, 3)
訓練集形狀: (7872, 3) 測試集形狀: (1968, 3)
標籤類別: ['CONTRADICTION' 'ENTAILMENT' 'NEUTRAL']

[比例 100%] 訓練資料筆數: 7872
Epoch 1: Train Loss=0.8124, Train Acc=0.5879 | Test Loss=0.7267, Test Acc=0.6052
Epoch 2: Train Loss=0.7414, Train Acc=0.6018 | Test Loss=0.7092, Test Acc=0.6128
Epoch 3: Train Loss=0.7144, Train Acc=0.6157 | Test Loss=0.7073, Test Acc=0.5971
Epoch 4: Train Loss=0.6680, Train Acc=0.6632 | Test Loss=0.6953, Test Acc=0.6347
Epoch 5: Train Loss=0.6293, Train Acc=0.6838 | Test Loss=0.7111, Test Acc=0.6225
分類報告 (Train Subset 100%):
               precision    recall  f1-score   support

CONTRADICTION       0.57      0.87      0.69       285
   ENTAILMENT       0.54      0.41      0.47       564
      NEUTRAL       0.67      0.66      0.67      1119

     accuracy                           0.62      1968
    macro avg       0.60      0.65      0.61      1968
 weighted avg       0.62      0.62      0.62      1968


[比例 50%] 訓練資料筆數: 3936
Epoch 1: Train Loss=0.8256, Train A