In [None]:
import pandas as pd
import os

def convert_IMDB_to_csv(directory, csv_file_path):
    data = []
    labels = []
    for label in ['pos', 'neg']:
        for subset in ['train', 'test']:
            path = f"{directory}/{subset}/{label}"
            for file in os.listdir(path):
                if file.endswith(".txt"):
                    with open(f'{path}/{file}', 'r', encoding='utf-8') as f:
                        data.append(f.read())
                        labels.append('positive' if label == 'pos' else 'negative')
    df = pd.DataFrame({'review': data, 'sentiment': labels})
    df.to_csv(csv_file_path, index=False)

convert_IMDB_to_csv('aclImdb', 'imdb_data.csv')

In [None]:
import torch
import numpy as np
import random

def set_seeds(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

set_seeds(2526)

In [None]:
import pandas as pd
from transformers import AutoTokenizer

df = pd.read_csv('imdb_data.csv')
reviews = df['review'].values
sentiments = df['sentiment'].values
labels = (sentiments == 'positive').astype('float32')

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
input_datas = tokenizer(reviews[:2].tolist(), max_length=10, truncation=True, padding="longest", return_tensors='pt')

print('Tokenizer輸出:')
print(input_datas)

In [None]:
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

class IMDB(Dataset):
    def __init__(self, x, y, tokenizer):
        self.x = x
        self.y = y
        self.tokenizer = tokenizer

    def __getitem__(self, index):
        return self.x[index], self.y[index]
       
    def __len__(self):
        return len(self.x)
    
    def collate_fn(self, batch):
        batch_x, batch_y = zip(*batch)
        input_ids = self.tokenizer(batch_x, max_length=128, truncation=True, padding="longest", return_tensors='pt').input_ids[:,1:-1]
        labels = torch.tensor(batch_y)
        return {'input_ids': input_ids, 'labels': labels}

x_train, x_valid, y_train, y_valid = train_test_split(reviews, labels, train_size=0.8, random_state=46, shuffle=True)
trainset = IMDB(x_train, y_train, tokenizer)
validset = IMDB(x_valid, y_valid, tokenizer)

train_loader = DataLoader(trainset, batch_size=32, shuffle=True, collate_fn=trainset.collate_fn)
valid_loader = DataLoader(validset, batch_size=32, shuffle=True, collate_fn=validset.collate_fn)

In [None]:
import torch.nn as nn

class TimeSeriesModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, padding_idx, num_layers=1, bidirectional=True, model_type='LSTM'):
        super().__init__()
        self.criterion = nn.BCELoss() #定義損失函數
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=padding_idx) 
        
        # 切換模型
        rnn_models = {'LSTM': nn.LSTM, 'RNN': nn.RNN}
        self.series_model = rnn_models.get(model_type, nn.LSTM) (
            embedding_dim, 
            hidden_size,
            num_layers=num_layers, 
            bidirectional=bidirectional, 
            batch_first=True
        )

        # 如果是雙向運算則最終的hidden state會變成2倍
        hidden = hidden_size * 2 if bidirectional else hidden_size
        self.fc = nn.Linear(hidden, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, **kwargs):
        # 取得輸入資料
        input_ids = kwargs['input_ids']
        labels = kwargs['labels']          
        #轉換成詞嵌入向量
        emb_out = self.embedding(input_ids)
        # 時間序列模型進行運算
        output, h_n = self.series_model(emb_out)
        # output: (batch_size, seq_len, hidden_size * 2)
        h_t = output[:, -1, :]
        # h_t: (batch_size, 1, hidden_size * 2)
        y_hat = self.sigmoid(self.fc(h_t))
        # h_t: (batch_size, 1)

        # 返回loss與logit
        return self.criterion(y_hat.view(-1), labels), y_hat

In [None]:
import torch.optim as optim
from torch.optim.lr_scheduler import LambdaLR
from Trainer import Trainer

# 自定義 Warmup Scheduler
def get_warmup_scheduler(optimizer, warmup_steps, total_steps):
    def lr_lambda(current_step):
        # 計算 warmup 比例
        if current_step < warmup_steps:
            return float(current_step) / float(max(1, warmup_steps))
        # 隨後開始隨著 total_steps 逐漸減小學習率 (線性衰減或其他方法)
        return max(0.0, float(total_steps - current_step) / float(max(1, total_steps - warmup_steps)))
    
    return LambdaLR(optimizer, lr_lambda)

# 模型、優化器和其他設置
model = TimeSeriesModel(
    vocab_size=len(tokenizer), # Embedding的總大小等同於詞彙表大小
    embedding_dim=50, 
    hidden_size=32, 
    model_type='LSTM', 
    padding_idx=tokenizer.pad_token_id
)

optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=0.001)
warmup_steps = len(train_loader) * 0.2
total_steps = len(train_loader) * 10
scheduler = get_warmup_scheduler(optimizer, warmup_steps, total_steps)

# 訓練過程中的 Trainer 設置
trainer = Trainer(
    epochs=10, 
    train_loader=train_loader, 
    valid_loader=valid_loader, 
    model=model, 
    optimizer=[optimizer],
    scheduler=[scheduler],  # 加入學習率排成器
)

# 訓練過程
trainer.train(show_loss=True)

In [None]:
model.load_state_dict(torch.load('model.ckpt'))
model.eval()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

total_correct = 0
total_samples = 0
with torch.no_grad():
    for input_data in valid_loader:
        input_datas = {k: v.to(device) for k, v in input_data.items()}
        _, y_hat = model(**input_datas)
        pred = (y_hat > 0.5).long()
        labels = input_datas['labels']
        total_correct += torch.sum(pred.view(-1) == labels).item()
        total_samples += labels.size(0)

accuracy = total_correct / total_samples
print(f'Validation Accuracy: {accuracy*100:.3f} %')