In [1]:
!pip install torch==1.8.0+cu111 torchvision==0.9.0+cu111 torchaudio==0.8.0 torchtext==0.9.0 -f https://download.pytorch.org/whl/torch_stable.html

Looking in links: https://download.pytorch.org/whl/torch_stable.html


In [2]:
!pip install spacy
!python -m spacy download en_core_web_sm


Collecting en-core-web-sm==3.8.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl (12.8 MB)
     ---------------------------------------- 0.0/12.8 MB ? eta -:--:--
     --------- ------------------------------ 2.9/12.8 MB 21.0 MB/s eta 0:00:01
     ------------------------------- ------- 10.5/12.8 MB 29.8 MB/s eta 0:00:01
     --------------------------------------- 12.8/12.8 MB 28.7 MB/s eta 0:00:00
[38;5;2m[+] Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')


In [3]:
import torch
from torchtext.legacy import data

SEED = 1234

torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

TEXT = data.Field(tokenize = 'spacy',
                  tokenizer_language = 'en_core_web_sm')
LABEL = data.LabelField(dtype = torch.float)

In [None]:
from torchtext.legacy import datasets

train_data, test_data = datasets.IMDB.splits(TEXT, LABEL)
print(f'Number of training examples: {len(train_data)}')
print(f'Number of testing examples: {len(test_data)}')
print(vars(train_data.examples[0]))

In [None]:
import random

train_data, valid_data = train_data.split(random_state = random.seed(SEED))
print(f'Number of training examples: {len(train_data)}')
print(f'Number of validation examples: {len(valid_data)}')
print(f'Number of testing examples: {len(test_data)}')

In [None]:
MAX_VOCAB_SIZE = 25_000

TEXT.build_vocab(train_data, max_size = MAX_VOCAB_SIZE)
LABEL.build_vocab(train_data)
print(f"Unique tokens in TEXT vocabulary: {len(TEXT.vocab)}")
print(f"Unique tokens in LABEL vocabulary: {len(LABEL.vocab)}")
print(TEXT.vocab.freqs.most_common(20))
print(TEXT.vocab.itos[:10])
print(LABEL.vocab.stoi)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Do not use BucketIterator in your implementation because you are required to implement the padding and masking yourself.
# train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits((train_data, valid_data, test_data), batch_size=1, device=device)

In [None]:
from torch.utils.data import DataLoader, TensorDataset

# 先將數據轉為張量
def process_data(data, text_field, label_field):
    text_tensors = [torch.tensor(text_field.process([vars(example)['text']])) for example in data]
    label_tensors = [torch.tensor(label_field.process([vars(example)['label']]), dtype=torch.float) for example in data]
    
    # Padding text sequences to the same length
    text_tensors = torch.nn.utils.rnn.pad_sequence(text_tensors, batch_first=True, padding_value=1)

    dataset = TensorDataset(text_tensors, torch.cat(label_tensors))
    return dataset

train_dataset = process_data(train_data, TEXT, LABEL)
valid_dataset = process_data(valid_data, TEXT, LABEL)
test_dataset = process_data(test_data, TEXT, LABEL)

# 建立 DataLoader 進行批次處理
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)


In [None]:
import torch.nn as nn
class LR(nn.Module):
    def __init__(self, input_dim, embedding_dim, output_dim):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, embedding_dim, padding_idx=1)  # 設置 padding_idx 避免影響
        self.fc = nn.Linear(embedding_dim, output_dim)

    def forward(self, text):
        embedded = self.embedding(text)  # (batch_size, seq_len, embedding_dim)
        embedded = embedded.mean(dim=1)  # 對所有時間步取平均
        return self.fc(embedded)

In [None]:
INPUT_DIM = len(TEXT.vocab)
EMBEDDING_DIM = 100
OUTPUT_DIM = 1

model = LR(INPUT_DIM, EMBEDDING_DIM, OUTPUT_DIM)

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

In [None]:
import torch.optim as optim

optimizer = optim.SGD(model.parameters(), lr=1e-3)

In [None]:
criterion = nn.BCEWithLogitsLoss()

In [None]:
model = model.to(device)
criterion = criterion.to(device)

In [None]:
def binary_accuracy(preds, y):
    """
    Returns accuracy per batch.
    """
    # 將 preds 經過 sigmoid 並四捨五入為 0 或 1
    rounded_preds = torch.round(torch.sigmoid(preds))  # [batch_size, 1]

    # 確保 y 的形狀與 rounded_preds 相同S
    y = y.unsqueeze(1) if len(y.shape) == 1 else y

    # 比較預測與真實標籤，並轉換為浮點數
    correct = (rounded_preds == y).float()

    # 計算準確率
    acc = correct.sum() / correct.numel()
    return acc


In [None]:
from tqdm import tqdm
def train(model, loader, optimizer, criterion):
    epoch_loss = 0
    epoch_acc = 0
    model.train()

    for batch in loader:
        text, labels = batch
        text, labels = text.to(device), labels.to(device)

        optimizer.zero_grad()
        predictions = model(text).squeeze(1)  # 模型輸出，形狀為 [batch_size, 1]

        loss = criterion(predictions, labels.unsqueeze(1))  # 確保形狀匹配
        acc = binary_accuracy(predictions, labels)

        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        epoch_acc += acc.item()

    return epoch_loss / len(loader), epoch_acc / len(loader)


In [None]:
def evaluate(model, loader, criterion):
    epoch_loss = 0
    epoch_acc = 0
    model.eval()

    with torch.no_grad():
        for batch in loader:
            text, labels = batch
            text, labels = text.to(device), labels.to(device)

            predictions = model(text).squeeze(1)

            loss = criterion(predictions, labels.unsqueeze(1))

            acc = binary_accuracy(predictions, labels)

            epoch_loss += loss.item()
            epoch_acc += acc.item()

    return epoch_loss / len(loader), epoch_acc / len(loader)

In [None]:
import time

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
N_EPOCHS = 5
best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
    start_time = time.time()

    train_loss, train_acc = train(model, train_loader, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, valid_loader, criterion)

    end_time = time.time()
    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'best-model.pt')

    print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')


In [None]:
model.load_state_dict(torch.load('best-model.pt'))
test_loss, test_acc = evaluate(model, test_loader, criterion)

print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')


In [None]:
# Test of model correctness
max_n_test_instances = 5
i = 1
for instance in valid_iterator:
  score = model(instance.text)
  print(score)
  if i >= max_n_test_instances:
    break
  else:
    i += 1


In [None]:
import pandas as pd

predictions = []
model.eval()
with torch.no_grad():
    for batch in test_loader:
        text, _ = batch
        text = text.to(device)
        preds = torch.sigmoid(model(text)).cpu().numpy()
        predictions.extend(preds)

df = pd.DataFrame({'predictions': predictions})
df.to_csv('test_predictions.csv', index=False)
