In [1]:
import os
import json
import pytorch_lightning as pl
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint

import torch
import torch.nn as nn
import torch.optim as optim
from torchmetrics.classification import Accuracy
from torch.utils.data import Dataset, DataLoader

from transformers import AutoTokenizer
from dataclasses import dataclass, field
from typing import List, Tuple
from tqdm import tqdm
import gensim
from gensim.scripts.glove2word2vec import glove2word2vec
import numpy as np

data_dir = 'D:/ComputerScience/cs_2024_Fall_Deep_Learning/Lab/data/Yelp'

In [None]:
glove_file = '..\data\glove.6B\glove.6B.300d.txt'  # 确保该文件存在
word2vec_output_file = '..\data\glove.6B\glove.6B.300d.word2vec.txt'

if not os.path.exists(word2vec_output_file):
    glove2word2vec(glove_file, word2vec_output_file)

In [3]:
tokenizer = AutoTokenizer.from_pretrained('google-bert/bert-base-uncased')
vocab = tokenizer.get_vocab()
embedding_dim = 300  # 根据 GloVe 文件选择

In [None]:
pretrained_embedding_path = '../data/glove.6B/pretrained_embedding.pt'
if not os.path.exists(pretrained_embedding_path):
    embedding_model = gensim.models.KeyedVectors.load_word2vec_format(word2vec_output_file, binary=False)
    embedding_matrix = np.random.normal(scale=0.6, size=(len(vocab), embedding_dim))  # 随机初始化
    index2word = {v: k for k, v in vocab.items()}
    # 填充嵌入矩阵
    for idx, word in index2word.items():
        # 跳过特殊标记
        if word in ['[PAD]', '[CLS]', '[SEP]', '[UNK]', '[MASK]']:
            embedding_matrix[idx] = np.zeros(embedding_dim)
            continue

        # 处理子词
        if word.startswith('##'):
            # 子词处理策略：可以选择随机初始化或使用特定方法
            embedding_matrix[idx] = np.random.normal(scale=0.6, size=(embedding_dim,))
        else:
            if word in embedding_model:
                embedding_matrix[idx] = embedding_model[word]
            else:
                # 未找到的词使用随机向量
                embedding_matrix[idx] = np.random.normal(scale=0.6, size=(embedding_dim,))
    pretrained_embedding = torch.tensor(embedding_matrix, dtype=torch.float32)
    torch.save(pretrained_embedding, pretrained_embedding_path)
else:
    pretrained_embedding = torch.load(pretrained_embedding_path)

In [5]:
# Define the data structure
@dataclass
class YelpData:
    text: str
    star: int

class YelpDataset(Dataset):
    def __init__(self, data_dir, tokenizer, train=True, max_length=512):
        """
        Dataset constructor
        :param data_dir: Directory of the data files
        :param train: Whether to load training data
        :param tokenizer_name: Name of the tokenizer to use
        :param max_length: Maximum length for padding and truncation
        """
        self.data_path = os.path.join(data_dir, 'train.json') if train else os.path.join(data_dir, 'test.json')
        self.raw_data = self._read_json(self.data_path)
        self.tokenizer = tokenizer
        self.max_length = max_length

    def _read_json(self, file_path):
        """
        Load training/test data from the specified directory
        :param data_dir: Directory containing the data files
        :param train: Whether to load the training data
        :return: List of data instances
        """
        data = []
        with open(file_path, 'r', encoding='utf-8') as f:
            for line_num, line in enumerate(f, 1):
                try:
                    rdata = json.loads(line)
                    text = rdata.get('text', None)
                    star = rdata.get('stars', None)
                    
                    if text is not None and star is not None:
                        data.append(YelpData(text=text, star=star))
                    else:
                        print(f"{line_num} data is invalid")
                except json.JSONDecodeError as e:
                    print(f"Fails to decode line {line_num}")
        
        return data


    def __len__(self):
        return len(self.raw_data)
    
    def __getitem__(self, idx):
        text = self.raw_data[idx].text
        label = self.raw_data[idx].star
        encoding = self.tokenizer(
            text,
            add_special_tokens=False,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_attention_mask=True,
            return_tensors='pt'
        )
        return {
            'input_ids': encoding['input_ids'].squeeze(0),
            'attention_mask': encoding['attention_mask'].squeeze(0),
            'label': torch.tensor(label, dtype=torch.long)
        }

In [6]:
class Config:
    def __init__(self):
        self.embedding_dim = 0
        self.hidden_size = 512
        self.num_layers = 2
        self.num_classes = 5
        self.max_length = 512
        self.vocab_size = 0

config = Config()
config.embedding_dim = embedding_dim
config.vocab_size = len(vocab)


class RNNClassifier(nn.Module):
    def __init__(self, config, pretrained=None):
        super(RNNClassifier, self).__init__()
        self.embedding = nn.Embedding(config.vocab_size, config.embedding_dim, padding_idx=0)
        if pretrained is not None:
            self.embedding.weight.data.copy_(pretrained)
            self.embedding.weight.requires_grad = False
        
        self.rnn = nn.LSTM(
            input_size=config.embedding_dim,
            hidden_size=config.hidden_size,
            num_layers=config.num_layers,
            batch_first=True,
            bidirectional=True
        )
        self.fc = nn.Linear(config.hidden_size * 2, config.num_classes)

    def forward(self, input_ids, attention_mask):
        x = self.embedding(input_ids)
        x = x * attention_mask.unsqueeze(-1)
        output, _ = self.rnn(x)
        x = output[:, -1, :]
        x = self.fc(x)
        return x

In [7]:
train_dataset = YelpDataset(data_dir, tokenizer, train=True)
test_dataset = YelpDataset(data_dir, tokenizer, train=False)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=4, pin_memory=True)

In [None]:
criterion = nn.CrossEntropyLoss()
model = RNNClassifier(config, pretrained=pretrained_embedding)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
accuracy = Accuracy(task='multiclass',num_classes=5)
device = 'cuda' if torch.cuda.is_available() else 'cpu'


model.to(device)

In [None]:
num_epochs = 5
from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter

import time
timenow = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
save_dir = 'checkpoints/' + timenow

# 继续你的代码
writer = SummaryWriter(log_dir='test_logs')
with tqdm(total=num_epochs) as pbar:
        for epoch in range(num_epochs):
            model.train()
            train_loss = 0.0
            train_acc = 0.0
            accuracy.reset()  # 重置准确率计算

            # 训练阶段
            with tqdm(total=len(train_loader), desc=f"Training Epoch {epoch+1}/{num_epochs}", leave=False) as pbar:
                for batch in tqdm(train_loader, desc=f"Training Epoch {epoch+1}/{num_epochs}", leave=False):
                    input_ids = batch['input_ids'].to(device)
                    attention_mask = batch['attention_mask'].to(device)
                    labels = batch['label'].to(device)

                    optimizer.zero_grad()
                    outputs = model(input_ids, attention_mask)
                    loss = criterion(outputs, labels)
                    loss.backward()
                    optimizer.step()

                    train_loss += loss.item()
                    preds = torch.argmax(outputs, dim=1)
                    train_acc += accuracy(preds, labels).item()

            avg_train_loss = train_loss / len(train_loader)
            avg_train_acc = train_acc / len(train_loader)

            # 验证阶段
            model.eval()
            test_loss = 0.0
            test_acc = 0.0
            accuracy.reset()  # 重置准确率计算

            with torch.no_grad():
                for batch in tqdm(test_loader, desc=f"Validation Epoch {epoch+1}/{num_epochs}", leave=False):
                    input_ids = batch['input_ids'].to(device)
                    attention_mask = batch['attention_mask'].to(device)
                    labels = batch['label'].to(device)

                    outputs = model(input_ids, attention_mask)
                    loss = criterion(outputs, labels)

                    test_loss += loss.item()
                    preds = torch.argmax(outputs, dim=1)
                    test_acc += accuracy(preds, labels).item()

            avg_test_loss = test_loss / len(test_loader)
            avg_test_acc = test_acc / len(test_loader)

            # 记录到 TensorBoard
            writer.add_scalar('Loss/Train', avg_train_loss, epoch)
            writer.add_scalar('Accuracy/Train', avg_train_acc, epoch)
            writer.add_scalar('Loss/Test', avg_test_loss, epoch)
            writer.add_scalar('Accuracy/Test', avg_test_acc, epoch)

            # 更新进度条描述
            pbar.set_description(
                f"Epoch {epoch+1}/{num_epochs} | "
                f"Train Loss: {avg_train_loss:.4f} | Train Acc: {avg_train_acc:.4f} | "
                f"Test Loss: {avg_test_loss:.4f} | Test Acc: {avg_test_acc:.4f}"
            )
            pbar.update(1)

            # 可选：保存最佳模型
            # 你可以根据验证准确率或损失来保存最佳模型
            # 这里以验证准确率为例
            if epoch == 0:
                best_acc = avg_test_acc
                torch.save(model.state_dict(), os.path.join(save_dir, 'best_model.pt'))
            else:
                if avg_test_acc > best_acc:
                    best_acc = avg_test_acc
                    torch.save(model.state_dict(), os.path.join(save_dir, 'best_model.pt'))
                    print(f"New best model saved at epoch {epoch+1} with accuracy {best_acc:.4f}")

writer.close()