In [2]:
!pip install datasets sacremoses



In [4]:
import torch
from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import BertTokenizerFast
from tokenizers import (
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
)

In [6]:
train_ds = load_dataset("stanfordnlp/imdb", split="train")
test_ds = load_dataset("stanfordnlp/imdb", split="test")

In [8]:
tokenizer = BertTokenizerFast.from_pretrained("bert-base-uncased")

In [34]:
from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    max_len = 400
    texts, labels = [], []
    for row in batch:
        labels.append(tokenizer(row['text'], truncation=True, max_length=max_len).input_ids[-3])
        texts.append(torch.LongTensor(tokenizer(row['text'], truncation=True, max_length=max_len).input_ids[:-3]))

    texts = pad_sequence(texts, batch_first=True, padding_value=tokenizer.pad_token_id)
    labels = torch.LongTensor(labels)

    return texts, labels

* labels  
text를 token id로 변환한 리스트에서 뒤에서 세 번째 토큰만 사용  


* texts  
text를 token id로 변환한 리스트에서 마지막 세 개의 토큰을 제외한 것을 input으로 사용  
pad_sequence로 padding을 넣어줌

In [37]:
train_loader = DataLoader(
    train_ds, batch_size=64, shuffle=True, collate_fn=collate_fn
)
test_loader = DataLoader(
    test_ds, batch_size=64, shuffle=False, collate_fn=collate_fn
)

In [39]:
from torch import nn
from math import sqrt

class SelfAttention(nn.Module):
    def __init__(self, input_dim, d_model):
        super().__init__()

        self.input_dim = input_dim
        self.d_model = d_model

        self.wq = nn.Linear(input_dim, d_model)
        self.wk = nn.Linear(input_dim, d_model)
        self.wv = nn.Linear(input_dim, d_model)
        self.dense = nn.Linear(d_model, d_model)

        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x, mask):
        q, k, v = self.wq(x), self.wk(x), self.wv(x)
        score = torch.matmul(q, k.transpose(-1, -2))
        score = score / sqrt(self.d_model)

        if mask is not None:
            score = score + (mask * -1e9)

        score = self.softmax(score)
        result = torch.matmul(score, v)
        result = self.dense(result)

        return result

* B : batch_size, S : seqeunce length (token 개수), D : d_model  
Query의 차원이 (B, S, D)일 때, B개의 (S, D) 행렬이 있는 것  
Q * K^T를 해주기위해 k.transpose(-1, -2)를 하면 (B, D, S), 즉 B개의 (D, S) 행렬로 변환되어
행렬의 곱셈이 가능함  
그 결과 나온 attention score의 차원은 (B, S, S)

* mask * -1e9  
 서로 길이가 다른 text data의 길이를 맞춰주기위해 더해준 것이 [PAD], padding token  
 attention score를 계산할 때는 이 토큰의 영향을 무시하기위해 score에 아주 작은 값인 mask * -1e9를 더해주는 것

* Softmax를 통과하여 확률변수로 표현할 수 있게 변환한 다음 V 행렬을 곱해줌

* self.dense(result)  
 행렬의 곱셈을 통해 도출된 result를 마지막에 Linear(d_model, d_model)을 거치는 이유는 self-attention의 output을 다시 가공하여 다음 layer에 전달하기 위함

In [42]:
class TransformerLayer(nn.Module):
    def __init__(self, input_dim, d_model, dff):
        super().__init__()

        self.input_dim = input_dim
        self.d_model = d_model
        self.dff = dff

        self.sa = SelfAttention(input_dim, d_model)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, dff),
            nn.ReLU(),
            nn.Linear(dff, d_model)
        )

    def forward(self, x, mask):
        x = self.sa(x, mask)
        x = self.ffn(x)

        return x

* ffn : feed-forward network
* dff : ffn의 dimension

In [45]:
import numpy as np

def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, None], np.arange(d_model)[None, :], d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[None, ...]

    return torch.FloatTensor(pos_encoding)

max_len = 400
print(positional_encoding(max_len, 256).shape)

torch.Size([1, 400, 256])


* Positional Encoding 계산식에서 pos : position, i : dimensio n  
 2i일 때는 sin으로, 2i + 1 일 때는 cos으로 계산  
 angle_rads에서 짝수일 때는 sin, 홀수일 때는 cos을 적용  

* np.arange  
 연속되 숫자 배열을 생성하는데, range()와 비슷하지만 NumPy 배열로 반환됨  

* np.arange(position)[:, None] 의 차원은 (position, 1)  
 np.arange(d_model)[None, :] 의 차원은 (1, d_model)

In [48]:
class TextClassifier(nn.Module):
    def __init__(self, vocab_size, d_model, n_layers, dff):
        super().__init__()

        self.vocab_size = vocab_size
        self.d_model = d_model
        self.n_layers = n_layers
        self.dff = dff

        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = nn.parameter.Parameter(positional_encoding(max_len, d_model), requires_grad=False)
        self.layers = nn.ModuleList([TransformerLayer(d_model, d_model, dff) for _ in range(n_layers)])
        self.classification = nn.Linear(d_model, vocab_size) # 출력 차원을 vocab_size로 수정

    def forward(self, x):
        mask = (x == tokenizer.pad_token_id)
        mask = mask[:, None, :]
        seq_len = x.shape[1]

        x = self.embedding(x)
        x = x * sqrt(self.d_model)
        x = x + self.pos_encoding[:, :seq_len]

        for layer in self.layers:
            x = layer(x, mask)

        x = x[:, -1] # 마지막 단어를 예측
        x = self.classification(x)

        return x

In [50]:
model = TextClassifier(len(tokenizer), 32, 2, 32)

In [52]:
print(torch.__version__)
print(torch.backends.mps.is_built())
print(torch.backends.mps.is_available())

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

2.6.0
True
True


In [54]:
from torch.optim import Adam

lr = 0.001
model = model.to(device)
loss_fn = nn.CrossEntropyLoss() # loss_fn 변경 : 다중 분류에 적합한 CrossEntropyLoss

optimizer = Adam(model.parameters(), lr=lr)

In [56]:
def accuracy(model, dataloader):
  correct = 0
  total = 0

  for data in dataloader:
    inputs, labels = data
    inputs, labels = inputs.to(device), labels.to(device)

    outputs = model(inputs) # 출력의 차원은 (B, vocab_size)
    preds = torch.argmax(outputs, dim=-1) # 마지막 차원에서 가장 큰 값을 고름
      
    correct += (labels == preds).sum().item()
    total += labels.size(0)

  return correct / total

In [58]:
train_accs = []
test_accs = []

n_epochs = 50

for epoch in range(n_epochs):
    total_loss = 0.
    model.train()

    for data in train_loader:
        model.zero_grad()
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        preds = model(inputs)
        loss = loss_fn(preds, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f'Epoch {epoch + 1} | Train Loss: {total_loss}')

    with torch.no_grad():
        model.eval()
        train_acc = accuracy(model, train_loader)
        test_acc = accuracy(model, test_loader)
        train_accs.append(train_acc)
        test_accs.append(test_acc)
        print(f'=====> Train acc: {train_acc:.3f} | Test acc: {test_acc:.3f}')

Epoch 1 | Train Loss: 3023.059310913086
=====> Train acc: 0.036 | Test acc: 0.035
Epoch 2 | Train Loss: 2687.641040802002
=====> Train acc: 0.035 | Test acc: 0.035
Epoch 3 | Train Loss: 2632.9019799232483
=====> Train acc: 0.042 | Test acc: 0.040
Epoch 4 | Train Loss: 2595.4274096488953
=====> Train acc: 0.042 | Test acc: 0.041
Epoch 5 | Train Loss: 2578.8629007339478
=====> Train acc: 0.040 | Test acc: 0.039
Epoch 6 | Train Loss: 2568.6112747192383
=====> Train acc: 0.043 | Test acc: 0.041
Epoch 7 | Train Loss: 2561.965498447418
=====> Train acc: 0.042 | Test acc: 0.041
Epoch 8 | Train Loss: 2556.437255382538
=====> Train acc: 0.044 | Test acc: 0.041
Epoch 9 | Train Loss: 2552.59273147583
=====> Train acc: 0.045 | Test acc: 0.042
Epoch 10 | Train Loss: 2546.243408679962
=====> Train acc: 0.046 | Test acc: 0.043
Epoch 11 | Train Loss: 2540.873517036438
=====> Train acc: 0.048 | Test acc: 0.044
Epoch 12 | Train Loss: 2531.9673190116882
=====> Train acc: 0.048 | Test acc: 0.045
Epoch 13 