In [3]:
import re
import math
import random
from collections import Counter

import torch
import torch.nn as nn
from torch.utils.data import DataLoader

from datasets import load_dataset
from sklearn.model_selection import train_test_split

In [33]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

Device: cuda


In [8]:
SEED = 42

In [5]:
raw = load_dataset("imdb")
train_texts = raw["train"]["text"]
train_labels = raw["train"]["label"]
test_texts = raw["test"]["text"]
test_labels = raw["test"]["label"]

In [6]:
raw

DatasetDict({
    train: Dataset({
        features: ['text', 'label'],
        num_rows: 25000
    })
    test: Dataset({
        features: ['text', 'label'],
        num_rows: 25000
    })
    unsupervised: Dataset({
        features: ['text', 'label'],
        num_rows: 50000
    })
})

In [9]:
train_texts, valid_texts, train_labels, valid_labels = train_test_split(
    train_texts, train_labels, test_size=0.2, random_state=SEED, stratify=train_labels
)

In [10]:
train_texts[0]

'I have always been a huge James Bond fanatic! I have seen almost all of the films except for Die Another Day, and The World Is Not Enough. The graphic\'s for Everything Or Nothing are breathtaking! The voice talents......... WOW! I LOVE PIERCE BROSNAN! He is finally Bond in a video game! HE IS BOND! I enjoyed the past Bond games: Goldeneye, The World Is Not Enough, Agent Under Fire, and Nightfire. This one is definitely the best! Finally, Mr. Brosnan, (may I call him Mr. Brosnan as a sign of respect? Yes I can!) He was phenomenally exciting to hear in a video game....... AT LONG LAST! DUH! I\'ve seen him perform with Robin Williams, and let me tell you, they make a great team. Pierce Brosnan is funny, wickedly handsome ( I mean to say wickedly in a good way,) and just one of those actor\'s who you would want to walk up to and wrap your arms around and hug, saying: "Pierce Brosnan, thank you for being James Bond," "If it wasn\'t for you, I wouldn\'t know who James Bond is." He\'s a gre

In [11]:
def basic_tokenize(s: str):
    # 소문자 + 간단한 정규화(알파벳/숫자/공백만 유지), 공백 기준 분리
    s = s.lower()
    s = re.sub(r"[^a-z0-9\s]", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s.split() if s else []

In [13]:
MIN_FREQ = 2
PAD, UNK = "<pad>", "<unk>"
def build_vocab(texts, min_freq=MIN_FREQ):
    counter = Counter()
    for t in texts:
        counter.update(basic_tokenize(t))
    # 자주 등장하는 토큰만 포함
    itos = [PAD, UNK]
    for token, freq in counter.most_common():
        if freq >= min_freq:
            itos.append(token)
    stoi = {tok: idx for idx, tok in enumerate(itos)}
    return stoi, itos


In [14]:
stoi, itos = build_vocab(train_texts, MIN_FREQ)
VOCAB_SIZE = len(itos)
PAD_IDX = stoi[PAD]
UNK_IDX = stoi[UNK]
print(f"Vocab size: {VOCAB_SIZE:,}")

Vocab size: 42,871


In [16]:
MAX_LEN = 200 

def encode(text, stoi_map=stoi, max_len=MAX_LEN):
    toks = basic_tokenize(text)
    ids = [stoi_map.get(tok, UNK_IDX) for tok in toks]
    ids = ids[-max_len:]
    return ids

In [17]:
class IMDBDataset(torch.utils.data.Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels
    def __len__(self):
        return len(self.labels)
    def __getitem__(self, idx):
        return self.texts[idx], int(self.labels[idx])

In [18]:
def collate_fn(batch):
    # batch: [(text_str, label_int), ...]
    # 1) 인코딩
    seqs = [encode(txt) for txt, _ in batch]
    labels = torch.tensor([lab for _, lab in batch], dtype=torch.float32)

    # 2) 패딩
    lengths = torch.tensor([len(s) for s in seqs], dtype=torch.long)
    maxlen = max(lengths).item() if lengths.numel() > 0 else 0
    padded = torch.full((len(seqs), maxlen), PAD_IDX, dtype=torch.long)
    for i, s in enumerate(seqs):
        if len(s) > 0:
            padded[i, :len(s)] = torch.tensor(s, dtype=torch.long)
    return padded, lengths, labels

In [19]:
BATCH_SIZE = 64
train_ds = IMDBDataset(train_texts, train_labels)
valid_ds = IMDBDataset(valid_texts, valid_labels)
test_ds  = IMDBDataset(test_texts,  test_labels)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn, num_workers=0)
valid_loader = DataLoader(valid_ds, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn, num_workers=0)
test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn, num_workers=0)


In [26]:
class BasicRNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=1, dropout = 0.2, n_classes = 1):
        super().__init__()
        self.hidden_dim = hidden_dim 
        self.n_layers  = num_layers
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.dropout = nn.Dropout(dropout)
        self.rnn = nn.RNN(
            input_size = embed_dim,
            hidden_size = hidden_dim,
            num_layers = num_layers,
            batch_first=True
        )
        self.fc = nn.Linear(hidden_dim, n_classes)

    def forward(self, x, lengths):
        # x: (B, T), lengths: (B,)
        emb = self.embedding(x)  # (B, T, E)
        packed = nn.utils.rnn.pack_padded_sequence(emb, lengths.cpu(), batch_first=True, enforce_sorted=False)
        h0 = torch.zeros(self.n_layers, x.size(0), self.hidden_dim, device=x.device)
        _, h_n = self.rnn(packed, h0)                    # h_n: (L, B, H)
        h_last = h_n[-1]                                 # (B, H) 마지막 층
        h_last = self.dropout(h_last)
        logits = self.fc(h_last).squeeze(-1)            # (B,)
        return logits

In [46]:
vocab_size = len(itos)
n_classes = 1

In [47]:
model = BasicRNN(num_layers = 1, hidden_dim = 256, vocab_size = vocab_size, embed_dim = 128, n_classes = n_classes, dropout = 0.5)
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

In [48]:
def train(model, optimizer, loader):
    model.train()
    total_loss, total_acc, total_n = 0.0, 0.0, 0
    for x, lengths, y in loader:       # <-- tuple 언패킹
        x, lengths, y = x.to(DEVICE), lengths.to(DEVICE), y.float().to(DEVICE)

        logits = model(x, lengths)      # (B,)
        loss = criterion(logits, y)

        optimizer.zero_grad()
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), 5.0)
        optimizer.step()

        with torch.no_grad():
            preds = (torch.sigmoid(logits) >= 0.5).float()
            bs = y.size(0)
            total_loss += loss.item() * bs
            total_acc  += (preds == y).float().sum().item()
            total_n    += bs
    return total_loss / total_n, total_acc / total_n

In [49]:
def evaluate(model, loader):
    model.eval()
    total_loss, total_acc, total_n = 0.0, 0.0, 0
    with torch.no_grad():
        for x, lengths, y in loader:   # <-- tuple 언패킹
            x, lengths, y = x.to(DEVICE), lengths.to(DEVICE), y.float().to(DEVICE)
            logits = model(x, lengths)
            loss = criterion(logits, y)
            preds = (torch.sigmoid(logits) >= 0.5).float()
            bs = y.size(0)
            total_loss += loss.item() * bs
            total_acc  += (preds == y).float().sum().item()
            total_n    += bs
    return total_loss / total_n, total_acc / total_n

In [50]:
BATCH_SIZE = 100
LR = 0.001
EPOCHS = 5
for e in range(1, EPOCHS + 1):
    train(model, optimizer, train_loader)
    val_loss, val_accuracy = evaluate(model, valid_loader)
    print("[EPOCH: %d], Validation Loss: %5.2f | Validation Accuracy: %5.2f" % (e, val_loss, val_accuracy))

[EPOCH: 1], Validation Loss: 132.24 | Validation Accuracy:  0.57
[EPOCH: 2], Validation Loss: 130.49 | Validation Accuracy:  0.62
[EPOCH: 3], Validation Loss: 129.94 | Validation Accuracy:  0.68
[EPOCH: 4], Validation Loss: 129.31 | Validation Accuracy:  0.70
[EPOCH: 5], Validation Loss: 130.03 | Validation Accuracy:  0.57


In [53]:
test_loss, test_accuracy = evaluate(model, test_loader)
print("Test Loss: %5.2f | Test Accuracy: %5.2f" % ( test_loss, test_accuracy))

Test Loss: 134.16 | Test Accuracy:  0.58
