In [2]:
!pip install datasets sacremoses

import torch
from datasets import load_dataset
from torch.utils.data import DataLoader
import numpy as np
from transformers import BertTokenizerFast
from tokenizers import (
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
)

train_ds = load_dataset("stanfordnlp/imdb", split="train[:100%]")
test_ds = load_dataset("stanfordnlp/imdb", split="test[:100%]")

tokenizer = BertTokenizerFast.from_pretrained("bert-base-uncased")


def collate_fn(batch):
  max_len = 400
  texts, labels = [], []
  for row in batch:
    labels.append(row['label'])
    texts.append(row['text'])

  texts = torch.LongTensor(tokenizer(texts, padding=True, truncation=True, max_length=max_len).input_ids)
  labels = torch.LongTensor(labels)

  return texts, labels


train_loader = DataLoader(
    train_ds, batch_size=64, shuffle=True, collate_fn=collate_fn
)
test_loader = DataLoader(
    test_ds, batch_size=64, shuffle=False, collate_fn=collate_fn
)
from torch import nn
from math import sqrt


class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_heads, dropout=0.1):
        super().__init__()
        assert d_model % n_heads == 0 # model의 차원은 head의 수로 나누어 떨어져야함

        self.d_model = d_model
        self.n_heads = n_heads
        self.d_heads = d_model // n_heads

        self.wq = nn.Linear(d_model, d_model)
        self.wk = nn.Linear(d_model, d_model)
        self.wv = nn.Linear(d_model, d_model)

        self.dense = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x, mask):
        batch_size = x.size(0)

        q, k, v = self.wq(x), self.wk(x), self.wv(x)

        q = q.view(batch_size, -1, self.n_heads, self.d_heads).transpose(1, 2)
        k = k.view(batch_size, -1, self.n_heads, self.d_heads).transpose(1, 2)
        v = v.view(batch_size, -1, self.n_heads, self.d_heads).transpose(1, 2)
        
        scores = torch.matmul(q, k.transpose(-1, -2)) / sqrt(self.d_heads)

        if mask is not None:
            mask = mask.unsqueeze(1)
            scores = scores + (mask * -1e9)

        scores = self.softmax(scores)
        scores = self.dropout(scores)
        out = torch.matmul(scores, v)
        out = out.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)

        return self.dense(out)

class TransformerLayer(nn.Module):
    def __init__(self, d_model, n_heads, dff, dropout=0.1):
        super().__init__()

        self.mha = MultiHeadAttention(d_model, n_heads, dropout)
        self.dropout1 = nn.Dropout(dropout)
        self.norm1 = nn.LayerNorm(d_model)

        self.ffn = nn.Sequential(
            nn.Linear(d_model, dff),
            nn.ReLU(),
            nn.Linear(dff, d_model)
        )
        self.dropout2 = nn.Dropout(dropout)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x, mask):
        x1 = self.mha(x, mask)
        x1 = self.norm1(x + self.dropout1(x1))

        x2 = self.ffn(x)
        x2 = self.norm2(x + self.dropout2(x1))

        return x2



In [4]:
def get_angles(pos, i, d_model):
  angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
  return pos * angle_rates

def positional_encoding(position, d_model):
  angle_rads = get_angles(np.arange(position)[:, None], np.arange(d_model)[None, :], d_model)
  angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
  angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
  pos_encoding = angle_rads[None, ...]

  return torch.FloatTensor(pos_encoding)

max_len = 400
print(positional_encoding(max_len, 256).shape)

class TextClassifier(nn.Module):
    def __init__(self, vocab_size, d_model, n_layers, dff, dropout=0.1):
        super().__init__()

        self.vocab_size = vocab_size
        self.d_model = d_model
        self.n_layers = n_layers
        self.dff = dff

        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = nn.parameter.Parameter(positional_encoding(max_len, d_model), requires_grad=False)

        self.dropout = nn.Dropout(dropout)

        self.layers = nn.ModuleList([
            TransformerLayer(d_model, n_heads=4, dff=dff, dropout=dropout) for _ in range(n_layers)
        ])
        self.classification = nn.Linear(d_model, 1)

    def forward(self, x):
        mask = (x == tokenizer.pad_token_id)
        mask = mask[:, None, :]

        seq_len = x.shape[1]
        x = self.embedding(x)
        x = x * sqrt(self.d_model)
        x = x + self.pos_encoding[:, :seq_len]
        x = self.dropout(x)

        for layer in self.layers:
            x = layer(x, mask)

        x = x[:, 0]
        x = self.classification(x)

        return x     

print(torch.__version__)
print(torch.backends.mps.is_built())
print(torch.backends.mps.is_available())

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

torch.Size([1, 400, 256])
2.6.0
True
True


In [6]:
model = TextClassifier(len(tokenizer), 64, 5, 128)

from torch.optim import Adam

lr = 0.001
model = model.to(device)
loss_fn = nn.BCEWithLogitsLoss()

optimizer = Adam(model.parameters(), lr=lr)

import numpy as np
import matplotlib.pyplot as plt

def accuracy(model, dataloader):
  cnt = 0
  acc = 0

  for data in dataloader:
      inputs, labels = data
      inputs, labels = inputs.to(device), labels.to(device)

      preds = model(inputs)
      # preds = torch.argmax(preds, dim=-1)
      preds = (preds > 0).long()[..., 0]

      cnt += labels.shape[0]
      acc += (labels == preds).sum().item()

  return acc / cnt

train_acc_list = []
test_acc_list = []

n_epochs = 30

for epoch in range(n_epochs):
    total_loss = 0.
    model.train()
    for data in train_loader:
        model.zero_grad()
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        labels = labels.float().unsqueeze(1)
        preds = model(inputs)
        loss = loss_fn(preds, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch + 1:3d} | Train Loss: {total_loss:.4f}")

    with torch.no_grad():
        model.eval()
        train_acc = accuracy(model, train_loader)
        test_acc = accuracy(model, test_loader)
        train_acc_list.append(train_acc)
        test_acc_list.append(test_acc)

        print(f'=====> Train acc: {train_acc:.3f} | Test acc: {test_acc:.3f}')

Epoch   1 | Train Loss: 200.3682
=====> Train acc: 0.843 | Test acc: 0.818
Epoch   2 | Train Loss: 137.3984
=====> Train acc: 0.888 | Test acc: 0.849
Epoch   3 | Train Loss: 119.8694
=====> Train acc: 0.906 | Test acc: 0.853
Epoch   4 | Train Loss: 106.8263
=====> Train acc: 0.915 | Test acc: 0.857
Epoch   5 | Train Loss: 97.7485
=====> Train acc: 0.933 | Test acc: 0.863
Epoch   6 | Train Loss: 88.4874
=====> Train acc: 0.940 | Test acc: 0.861
Epoch   7 | Train Loss: 81.4826
=====> Train acc: 0.949 | Test acc: 0.864
Epoch   8 | Train Loss: 74.7104
=====> Train acc: 0.957 | Test acc: 0.858
Epoch   9 | Train Loss: 69.6695
=====> Train acc: 0.960 | Test acc: 0.855
Epoch  10 | Train Loss: 63.6671
=====> Train acc: 0.970 | Test acc: 0.859
Epoch  11 | Train Loss: 59.3289
=====> Train acc: 0.960 | Test acc: 0.844
Epoch  12 | Train Loss: 54.4746
=====> Train acc: 0.978 | Test acc: 0.858
Epoch  13 | Train Loss: 50.9411
=====> Train acc: 0.980 | Test acc: 0.853
Epoch  14 | Train Loss: 47.4093
==