In [24]:
!pip install datasets
!pip install torchmetrics
import transformers as T
from datasets import load_dataset
import torch
from torch.utils.data import Dataset, DataLoader
from torch.optim import AdamW
from tqdm import tqdm
from torchmetrics import SpearmanCorrCoef, Accuracy, F1Score
device = "cuda" if torch.cuda.is_available() else "cpu"



In [25]:
# 有些中文的標點符號在tokenizer編碼以後會變成[UNK]，所以將其換成英文標點
token_replacement = [
    ["：" , ":"],
    ["，" , ","],
    ["“" , "\""],
    ["”" , "\""],
    ["？" , "?"],
    ["……" , "..."],
    ["！" , "!"]
]

In [29]:
model = MultiLabelModel().to(device)
tokenizer = T.BertTokenizer.from_pretrained("google-bert/bert-base-uncased", cache_dir="./cache/")

In [44]:
import requests
import zipfile
import os
#改自己下載的資料集
# 創建目標目錄
os.makedirs('./cache/sick_data', exist_ok=True)

# 文件下載鏈接
urls = [
    "https://alt.qcri.org/semeval2014/task1/data/uploads/sick_train.zip",
    "https://alt.qcri.org/semeval2014/task1/data/uploads/sick_test_annotated.zip",
    "https://alt.qcri.org/semeval2014/task1/data/uploads/sick_trial.zip"
]

# 下載和解壓縮文件
for url in urls:
    local_filename = os.path.join('./cache/sick_data', url.split('/')[-1])
    with requests.get(url, stream=True) as r:
        with open(local_filename, 'wb') as f:#wb是寫入二進制文件
            for chunk in r.iter_content(chunk_size=8192):
                f.write(chunk)
    with zipfile.ZipFile(local_filename, 'r') as zip_ref: #解壓縮
        zip_ref.extractall('./cache/sick_data')#解壓縮到指定目錄


In [45]:
import random
import nltk
from nltk.corpus import wordnet
from torch.utils.data import Dataset

nltk.download('wordnet')
nltk.download('omw-1.4')

# 同義詞替換函數
def synonym_replacement(text, num_replacements=2):
    words = text.split()
    new_words = words.copy()
    replaced_indices = set()  # 記錄已經替換過的單詞位置

    for _ in range(num_replacements):
        word_to_replace = random.choice(words)
        if word_to_replace in replaced_indices:
            continue  # 避免重複替換
        synonyms = wordnet.synsets(word_to_replace)
        if synonyms:
            synonym = random.choice(synonyms).lemmas()[0].name()
            new_words = [synonym if word == word_to_replace else word for word in words]
            replaced_indices.add(word_to_replace)

    return ' '.join(new_words)

# 隨機刪除某些單詞
def random_deletion(text, p=0.3):
    words = text.split()
    if len(words) == 1:  # 若只有一個詞，返回原句
        return text

    # 選擇要刪除的單詞數量
    remaining_words = [word for word in words if random.random() > p]

    # 如果句子沒有變長，隨機刪除一個單詞
    if len(remaining_words) == 0:
        remaining_words = random.sample(words, 1)

    return ' '.join(remaining_words)

# 隨機插入新詞
def random_insertion(text, p=0.2):
    words = text.split()
    new_words = words.copy()
    num_insertions = int(len(words) * p)
    for _ in range(num_insertions):
        insert_word = random.choice(["非常", "極度", "總是", "有時", "還是"])
        insert_position = random.randint(0, len(new_words))
        new_words.insert(insert_position, insert_word)
    return ' '.join(new_words)

# 句子順序調換
def random_swap(text, n=3):
    words = text.split()
    for _ in range(n):
        idx1, idx2 = random.sample(range(len(words)), 2)
        words[idx1], words[idx2] = words[idx2], words[idx1]
    return ' '.join(words)

# 拼寫錯誤增強
def random_typo(text, p=0.1):
    chars = list(text)
    for i in range(len(chars)):
        if random.random() < p:
            chars[i] = random.choice('abcdefghijklmnopqrstuvwxyz')
    return ''.join(chars)

# 增強 `class 1` 的文本數據
def augment_class1_data(data):
    augmented_data = []
    for item in data:
        if item['entailment_judgment'] == 1:  # 只針對 `class 1` 的數據進行輕微增強
            premise_augmented = random_swap(item['premise'], n=1)  # 進行少量的隨機調換
            premise_augmented = random_insertion(premise_augmented, p=0.1)  # 插入少量的詞語
            hypothesis_augmented = item['hypothesis']  # 保持 hypothesis 不變
            augmented_data.append({
                'sentence_pair_id': item['sentence_pair_id'],
                'premise': premise_augmented,
                'hypothesis': hypothesis_augmented,
                'relatedness_score': item['relatedness_score'],
                'entailment_judgment': item['entailment_judgment']
            })
        else:
            augmented_data.append(item)  # 保持其他類別不變
    return augmented_data

# 增強 `class 2` 的文本數據
def augment_class2_data(data):
    augmented_data = []
    for item in data:
        if item['entailment_judgment'] == 2:  # 只針對 `class 2` 的數據進行強增強
            premise_augmented = synonym_replacement(item['premise'], num_replacements=3)  # 同義詞替換
            premise_augmented = random_swap(premise_augmented, n=2)  # 進行隨機調換
            premise_augmented = random_insertion(premise_augmented, p=0.2)  # 插入詞語
            hypothesis_augmented = random_deletion(item['hypothesis'], p=0.2)  # 隨機刪除詞語
            hypothesis_augmented = random_typo(hypothesis_augmented, p=0.1)  # 拼寫錯誤
            augmented_data.append({
                'sentence_pair_id': item['sentence_pair_id'],
                'premise': premise_augmented,
                'hypothesis': hypothesis_augmented,
                'relatedness_score': item['relatedness_score'],
                'entailment_judgment': item['entailment_judgment']
            })
        else:
            augmented_data.append(item)  # 保持其他類別不變
    return augmented_data

# 更新 `SemevalDataset` 類，使用不同的增強方法
class SemevalDataset(Dataset):
    def __init__(self, split="train") -> None:
        super().__init__()
        assert split in ["train", "validation", "test"]

        # Mapping from split to file
        split_to_file = {
            "train": "SICK_train.txt",
            "test": "SICK_test_annotated.txt",
            "validation": "SICK_trial.txt"
        }
        # Load data from my file
        file_path = f'./cache/sick_data/{split_to_file[split]}'
        self.data = self.load_local_dataset(file_path)

        # 增強數據，先對 class 1 進行輕微增強，再對 class 2 進行強增強
        self.data = augment_class1_data(self.data)
        self.data = augment_class2_data(self.data)

    def load_local_dataset(self, file_path):
        data = []
        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f.readlines()[1:]:  # 跳過標題行
                parts = line.strip().split('\t')
                # Mapping for entailment labels
                entailment_mapping = {
                    "NEUTRAL": 0,
                    "ENTAILMENT": 1,
                    "CONTRADICTION": 2
                }
                # Append data to list
                data.append({
                    "sentence_pair_id": int(parts[0]),
                    "premise": parts[1],
                    "hypothesis": parts[2],
                    "relatedness_score": float(parts[3]),
                    "entailment_judgment": entailment_mapping[parts[4]] # Store numerical label
                })
        return data

    def __getitem__(self, index):
        d = self.data[index]
        # 把中文標點替換掉
        for k in ["premise", "hypothesis"]:
            for tok in token_replacement:
                d[k] = d[k].replace(tok[0], tok[1])
        return d

    def __len__(self):
        return len(self.data)

# 測試數據增強效果
data_sample = SemevalDataset(split="train").data[:3]
print(f"Dataset example: \n{data_sample[0]} \n{data_sample[1]} \n{data_sample[2]}")


Dataset example: 
{'sentence_pair_id': 1, 'premise': 'A group of kids is playing in a yard and an old man is standing in the background', 'hypothesis': 'A group of boys in a yard is playing and a man is standing in the background', 'relatedness_score': 4.5, 'entailment_judgment': 0} 
{'sentence_pair_id': 2, 'premise': 'A group of children is playing in the house and there is no man standing in the background', 'hypothesis': 'A group of kids is playing in a yard and an old man is standing in the background', 'relatedness_score': 3.2, 'entailment_judgment': 0} 
{'sentence_pair_id': 3, 'premise': 'The young boys the playing outdoors 有時 and are man is smiling nearby', 'hypothesis': 'The kids are playing outdoors near a man with a smile', 'relatedness_score': 4.7, 'entailment_judgment': 1}


[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!


In [32]:
# Define the hyperparameters
lr = 3e-5
epochs = 3
train_batch_size = 8
validation_batch_size = 8

In [40]:
# TODO1: Create batched data for DataLoader
# `collate_fn` is a function that defines how the data batch should be packed.
# This function will be called in the DataLoader to pack the data batch.

def collate_fn(batch):
    # TODO1-1: Implement the collate_fn function
    # 使用 tokenizer 將批次的句子進行編碼
    premises = [item["premise"] for item in batch]
    hypotheses = [item["hypothesis"] for item in batch]
    relatedness_scores = [item["relatedness_score"] for item in batch]
    entailment_judgments = [item["entailment_judgment"] for item in batch]

    # 將前提和假設進行編碼，並返回所需的 tensor 格式
    encoding = tokenizer(
        premises,
        hypotheses,
        padding=True,
        truncation=True,
        return_tensors="pt"
    )

    # 將標籤轉換為 PyTorch 張量
    encoding["relatedness_score"] = torch.tensor(relatedness_scores, dtype=torch.float32)
    encoding["entailment_judgment"] = torch.tensor(entailment_judgments, dtype=torch.long)

    return encoding

# TODO1-2: Define your DataLoader
# 定義 DataLoader
dl_train = DataLoader(
    SemevalDataset(split="train"),
    batch_size=train_batch_size,
    shuffle=False,#打亂數據，因為是訓練集
    collate_fn=collate_fn
)

dl_validation = DataLoader(
    SemevalDataset(split="validation"),
    batch_size=validation_batch_size,
    shuffle=False,#不打亂數據，因為是驗證集
    collate_fn=collate_fn
)
# Define DataLoader for test set
dl_test = DataLoader(
    SemevalDataset(split="test"),  # Assuming you have a 'test' split in your dataset
    batch_size=validation_batch_size,  # You can adjust the batch size as needed
    shuffle=False,  # No need to shuffle for testing
    collate_fn=collate_fn
)

In [28]:
# TODO2: Construct your model
class MultiLabelModel(torch.nn.Module):
    def __init__(self):
        super(MultiLabelModel, self).__init__()
        self.bert = T.BertModel.from_pretrained("google-bert/bert-base-uncased", cache_dir="./cache/")
        self.dropout = torch.nn.Dropout(0.3)#dropout層<避免過擬合>
        self.fc_relatedness = torch.nn.Linear(self.bert.config.hidden_size, 1)
        self.fc_entailment = torch.nn.Linear(self.bert.config.hidden_size, 3)

    def forward(self, input_ids, attention_mask, token_type_ids):
        outputs = self.bert(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)#輸入到bert模型
        pooled_output = outputs[1]#取得pooling的輸出
        pooled_output = self.dropout(pooled_output)#dropout層

        relatedness_score = self.fc_relatedness(pooled_output).squeeze(-1)#將維度為1的維度去掉
        entailment_judgment = self.fc_entailment(pooled_output)#將pooling的輸出輸入到全連接層

        return relatedness_score, entailment_judgment

In [41]:
# TODO3: Define your optimizer and loss function

# TODO3-1: Define your Optimizer
optimizer = AdamW(model.parameters(), lr=lr)#AdamW優化器

# TODO3-2: Define your loss functions (you should have two)
loss_fn_relatedness = torch.nn.MSELoss()#均方誤差
loss_fn_entailment = torch.nn.CrossEntropyLoss()#交叉熵損失

# scoring functions
spc = SpearmanCorrCoef()
acc = Accuracy(task="multiclass", num_classes=3)
f1 = F1Score(task="multiclass", num_classes=3, average='macro')



In [42]:
# 初始化 confusion matrix 指標
from torchmetrics import ConfusionMatrix

confusion_matrix = ConfusionMatrix(task="multiclass", num_classes=3)
for ep in range(epochs):
    # 訓練階段
    pbar = tqdm(dl_train)  # 創建進度條
    pbar.set_description(f"Training epoch [{ep+1}/{epochs}]")  # 設置進度條描述
    model.train()  # 設置模型為訓練模式
    total_loss_relatedness = 0.0  # 初始化 relatedness 損失總和
    total_loss_entailment = 0.0  # 初始化 entailment 損失總和

    for batch in pbar:
        # 將數據移動到 GPU 或 CPU 設備
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        token_type_ids = batch["token_type_ids"].to(device)
        relatedness_score = batch["relatedness_score"].to(device)
        entailment_judgment = batch["entailment_judgment"].to(device)

        # 清除優化器的梯度
        optimizer.zero_grad()

        # 前向傳遞：將數據輸入模型，獲取 relatedness 和 entailment 的預測
        pred_relatedness, pred_entailment = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids
        )

        # 計算損失：分別計算 relatedness 和 entailment 的損失，並將兩者相加為總損失
        loss_relatedness = loss_fn_relatedness(pred_relatedness, relatedness_score)
        loss_entailment = loss_fn_entailment(pred_entailment, entailment_judgment)
        loss = loss_relatedness + loss_entailment  # 總損失

        # 反向傳遞：計算梯度
        loss.backward()

        # 更新模型參數
        optimizer.step()

        # 累加損失值，用於後續計算平均損失
        total_loss_relatedness += loss_relatedness.item()
        total_loss_entailment += loss_entailment.item()

    # 驗證階段
    pbar = tqdm(dl_validation)  # 創建進度條
    pbar.set_description(f"Validation epoch [{ep+1}/{epochs}]")  # 設置進度條描述
    model.eval()  # 設置模型為驗證模式（不會更新權重）

    # 初始化列表，用於儲存真實值和預測值
    val_relatedness_scores = []
    val_pred_relatedness = []
    val_entailment_judgments = []
    val_pred_entailment = []

    with torch.no_grad():  # 禁用梯度計算，減少內存消耗
        for batch in pbar:
            # 將數據移動到設備
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            token_type_ids = batch["token_type_ids"].to(device)
            relatedness_score = batch["relatedness_score"].to(device)
            entailment_judgment = batch["entailment_judgment"].to(device)

            # 前向傳遞：獲取模型的預測輸出
            pred_relatedness, pred_entailment = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                token_type_ids=token_type_ids
            )

            # 收集驗證集的真實值和預測值，用於後續的評估
            val_relatedness_scores.extend(relatedness_score.cpu().numpy())
            val_pred_relatedness.extend(pred_relatedness.cpu().numpy())
            val_entailment_judgments.extend(entailment_judgment.cpu().numpy())
            val_pred_entailment.extend(torch.argmax(pred_entailment, dim=1).cpu().numpy())

    # 計算評估指標
    spearman_corr = spc(torch.tensor(val_pred_relatedness), torch.tensor(val_relatedness_scores))  # 計算 Spearman 相關係數
    accuracy = acc(torch.tensor(val_pred_entailment), torch.tensor(val_entailment_judgments))  # 計算準確度
    f1_score = f1(torch.tensor(val_pred_entailment), torch.tensor(val_entailment_judgments))  # 計算 F1 分數
    # 計算混淆矩陣
    conf_matrix = confusion_matrix(torch.tensor(val_pred_entailment), torch.tensor(val_entailment_judgments))
    # 輸出驗證結果
    print(f"Validation Results - Epoch [{ep+1}/{epochs}]:")
    print(f"Spearman Correlation: {spearman_corr:.4f}")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score: {f1_score:.4f}")
    print(f"Confusion Matrix:\n{conf_matrix}")

    # 保存模型的檢查點
    os.makedirs("./saved_models", exist_ok=True)
    torch.save(model, f'./saved_models/ep{ep}.ckpt')  # 儲存模型檔案至指定路徑


Training epoch [1/3]: 100%|██████████| 563/563 [00:48<00:00, 11.49it/s]
Validation epoch [1/3]: 100%|██████████| 63/63 [00:01<00:00, 46.07it/s]


Validation Results - Epoch [1/3]:
Spearman Correlation: 0.8542
Accuracy: 0.9900
F1 Score: 0.9912
Confusion Matrix:
tensor([[282,   0,   0],
        [  5, 139,   0],
        [  0,   0,  74]])


Training epoch [2/3]: 100%|██████████| 563/563 [00:48<00:00, 11.50it/s]
Validation epoch [2/3]: 100%|██████████| 63/63 [00:01<00:00, 47.41it/s]


Validation Results - Epoch [2/3]:
Spearman Correlation: 0.8486
Accuracy: 0.9820
F1 Score: 0.9842
Confusion Matrix:
tensor([[279,   3,   0],
        [  6, 138,   0],
        [  0,   0,  74]])


Training epoch [3/3]: 100%|██████████| 563/563 [00:48<00:00, 11.53it/s]
Validation epoch [3/3]: 100%|██████████| 63/63 [00:01<00:00, 47.32it/s]


Validation Results - Epoch [3/3]:
Spearman Correlation: 0.8499
Accuracy: 0.9780
F1 Score: 0.9807
Confusion Matrix:
tensor([[278,   4,   0],
        [  7, 137,   0],
        [  0,   0,  74]])


In [43]:
# 初始化混淆矩陣指標
from torchmetrics import ConfusionMatrix
import torch
import os

# 假設你有已經訓練過的模型
model = torch.load('./saved_models/ep2.ckpt')  # 載入訓練好的模型
model.to(device)  # 確保模型移動到正確的設備（GPU/CPU）
model.eval()  # 設置模型為評估模式

confusion_matrix = ConfusionMatrix(task="multiclass", num_classes=3)

# 測試階段
pbar = tqdm(dl_test)  # 創建進度條
pbar.set_description(f"Testing")  # 設置進度條描述

# 初始化列表，用於儲存真實值和預測值
test_relatedness_scores = []
test_pred_relatedness = []
test_entailment_judgments = []
test_pred_entailment = []

# 禁用梯度計算
with torch.no_grad():
    for batch in pbar:
        # 將數據移動到設備
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        token_type_ids = batch["token_type_ids"].to(device)
        relatedness_score = batch["relatedness_score"].to(device)
        entailment_judgment = batch["entailment_judgment"].to(device)

        # 前向傳遞：獲取模型的預測輸出
        pred_relatedness, pred_entailment = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids
        )

        # 收集測試集的真實值和預測值
        test_relatedness_scores.extend(relatedness_score.cpu().numpy())
        test_pred_relatedness.extend(pred_relatedness.cpu().numpy())
        test_entailment_judgments.extend(entailment_judgment.cpu().numpy())
        test_pred_entailment.extend(torch.argmax(pred_entailment, dim=1).cpu().numpy())

# 計算評估指標
spearman_corr = spc(torch.tensor(test_pred_relatedness), torch.tensor(test_relatedness_scores))  # 計算 Spearman 相關係數
accuracy = acc(torch.tensor(test_pred_entailment), torch.tensor(test_entailment_judgments))  # 計算準確度
f1_score = f1(torch.tensor(test_pred_entailment), torch.tensor(test_entailment_judgments))  # 計算 F1 分數

# 計算混淆矩陣
conf_matrix = confusion_matrix(torch.tensor(test_pred_entailment), torch.tensor(test_entailment_judgments))

# 輸出測試結果
print(f"Test Results:")
print(f"Spearman Correlation: {spearman_corr:.4f}")
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1_score:.4f}")
print(f"Confusion Matrix:\n{conf_matrix}")


  model = torch.load('./saved_models/ep2.ckpt')  # 載入訓練好的模型
Testing: 100%|██████████| 616/616 [00:12<00:00, 49.90it/s]


Test Results:
Spearman Correlation: 0.8351
Accuracy: 0.9874
F1 Score: 0.9883
Confusion Matrix:
tensor([[2779,   14,    0],
        [  44, 1370,    0],
        [   0,    4,  716]])


For test set predictions, you can write perform evaluation simlar to #TODO5.