<a href="https://colab.research.google.com/github/tolluset/hh-ai-1/blob/2-a/2-A.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 과제 2-A




In [None]:
!pip install jupyter_black

In [35]:
import jupyter_black

jupyter_black.load()

In [36]:
backend = "cuda"

In [None]:
!pip install datasets

In [None]:
!pip install sacremoses

In [None]:
import torch
from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import BertTokenizerFast
from tokenizers import (
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
)


ds = load_dataset("stanfordnlp/imdb")
tokenizer = torch.hub.load(
    "huggingface/pytorch-transformers", "tokenizer", "bert-base-uncased"
)


def collate_fn(batch):
    max_len = 400
    texts, labels = [], []
    for row in batch:
        labels.append(row["label"])
        texts.append(row["text"])

    texts = torch.LongTensor(
        tokenizer(texts, padding=True, truncation=True, max_length=max_len).input_ids
    )
    labels = torch.LongTensor(labels)

    return texts, labels


train_loader = DataLoader(
    ds["train"], batch_size=64, shuffle=True, collate_fn=collate_fn
)
test_loader = DataLoader(
    ds["test"], batch_size=64, shuffle=False, collate_fn=collate_fn
)

## Self-attention

In [40]:
from torch import nn
from math import sqrt


class SelfAttention(nn.Module):
  def __init__(self, input_dim, d_model):
    super().__init__()

    self.input_dim = input_dim
    self.d_model = d_model

    self.wq = nn.Linear(input_dim, d_model)
    self.wk = nn.Linear(input_dim, d_model)
    self.wv = nn.Linear(input_dim, d_model)
    self.dense = nn.Linear(d_model, d_model)

    self.softmax = nn.Softmax(dim=-1)

  def forward(self, x, mask):
    q, k, v = self.wq(x), self.wk(x), self.wv(x)
    score = torch.matmul(q, k.transpose(-1, -2)) # (B, S, D) * (B, D, S) = (B, S, S)
    score = score / sqrt(self.d_model)

    if mask is not None:
      score = score + (mask * -1e9)

    score = self.softmax(score)
    result = torch.matmul(score, v)
    result = self.dense(result)

    return result

## Multi-head attention(MHA) 구현

- ✅ $Q, K, V \in \mathbb{R}^{S \times D}$가 있을 때, 이를 $Q, K, V \in \mathbb{R}^{S \times H \times D’}$으로 reshape 해줍니다. 여기서 $H$는 n_heads라는 인자로 받아야 하고, $D$가 $H$로 나눠 떨어지는 값이여야 하는 제약 조건이 필요합니다. $D = H \times D’$입니다.

- ✅ $Q, K, V$를 $Q, K, V \in \mathbb{R}^{H \times S \times D’}$의 shape으로 transpose해줍니다.


- ✅ Mask를 더합니다. 기존과 $A$의 shape이 달라졌기 때문에 dimension을 어떻게 맞춰줘야할지 생각해줘야 합니다.

- ✅ $\hat{x} = \textrm{Softmax}(A)V \in \mathbb{R}^{H \times S \times D'}$를 계산해주고 transpose와 reshape을 통해 $\hat{x} \in \mathbb{R}^{S \times D}$의 shape으로 다시 만들어줍니다.

In [41]:
from torch import nn
from math import sqrt


class MultiHeadAttention(nn.Module):
    def __init__(self, input_dim, d_model, n_heads):
        super().__init__()

        assert d_model % n_heads == 0, "d_model must be divisible by num_heads"

        self.d_model = d_model
        self.n_heads = n_heads

        self.depth = d_model // n_heads

        self.wq = nn.Linear(input_dim, d_model)
        self.wk = nn.Linear(input_dim, d_model)
        self.wv = nn.Linear(input_dim, d_model)
        self.dense = nn.Linear(d_model, d_model)

        self.softmax = nn.Softmax(dim=-1)


    def split_heads(self, x):
        batch_size, seq_len, _ = x.size()
        # S, D -> S, H, D' -> H, S, D'
        return x.view(batch_size, seq_len, self.n_heads, self.depth).transpose(1, 2)


    def forward(self, x, mask):
        batch_size = x.size(0)

        # H, S, D'
        q, k, v = (
            self.split_heads(self.wq(x)),
            self.split_heads(self.wk(x)),
            self.split_heads(self.wv(x)),
        )

        # (H, S, D') * (H, D', S) -> H, S, S
        score = torch.matmul(q, k.transpose(-1, -2)) / sqrt(self.depth)

        if mask is not None:
          mask = mask.unsqueeze(1)
          score = score + (mask * -1e9)

        score = self.softmax(score)
        result = torch.matmul(score, v)  # (H, S, S) * (H, S, D') -> H, S, D'

        # H, S, D' -> S, H, D' -> S, D
        result = result.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        result = self.dense(result)

        return result

## Layer normalization, dropout, residual connection 구현

```
✅ 다음과 같이 수정해주시면 됩니다.
# x, mask is given

x1 = MHA(x, mask)
x1 = Dropout(x1)
x1 = LayerNormalization(x1 + x)

x2 = FFN(x)
x2 = Dropout(x2)
x2 = LayerNormalization(x2 + x1)

return x2
```

In [42]:
class TransformerLayer(nn.Module):
    def __init__(self, input_dim, d_model, dff, n_heads, dropout_rate=0.1):
        super().__init__()

        self.input_dim = input_dim
        self.d_model = d_model
        self.dff = dff

        self.mha = MultiHeadAttention(input_dim, d_model, n_heads)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, dff), nn.ReLU(), nn.Linear(dff, d_model)
        )

        self.layernorm1 = nn.LayerNorm(d_model)
        self.layernorm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x, mask):
        x1 = self.mha(x, mask)
        x1 = self.dropout(x1)
        x1 = self.layernorm1(x1 + x)

        x2 = self.ffn(x1)
        x2 = self.dropout(x2)
        x2 = self.layernorm2(x2 + x1)

        return x2

## Positional encoding

In [43]:
import numpy as np


def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates


def positional_encoding(position, d_model):
    angle_rads = get_angles(
        np.arange(position)[:, None], np.arange(d_model)[None, :], d_model
    )
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[None, ...]

    return torch.FloatTensor(pos_encoding)


max_len = 400
print(positional_encoding(max_len, 256).shape)

torch.Size([1, 400, 256])


##5-layer 4-head Transformer
- ✅ 기존 실습에서 사용한 hyper-parameter들과 위에서 구현한 Transformer를 가지고 5-layer 4-head Transformer의 성능 결과를 report해주시면 됩니다.

In [44]:
class TextClassifier(nn.Module):
    def __init__(self, vocab_size, d_model, n_layers, dff, n_heads):
        super().__init__()

        self.vocab_size = vocab_size
        self.d_model = d_model
        self.n_layers = n_layers
        self.dff = dff

        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = nn.parameter.Parameter(
            positional_encoding(max_len, d_model), requires_grad=False
        )
        self.layers = nn.ModuleList(
            [TransformerLayer(d_model, d_model, dff, n_heads) for _ in range(n_layers)]
        )
        self.classification = nn.Linear(d_model, 1)

    def forward(self, x):
        mask = x == tokenizer.pad_token_id
        mask = mask[:, None, :]
        seq_len = x.shape[1]

        x = self.embedding(x)
        x = x * sqrt(self.d_model)
        x = x + self.pos_encoding[:, :seq_len]

        for layer in self.layers:
            x = layer(x, mask)

        x = x[:, 0]
        x = self.classification(x)

        return x


model = TextClassifier(len(tokenizer), 32, 5, 128, 4)

## 학습


In [45]:
from torch.optim import Adam

lr = 0.001
model = model.to(backend)
loss_fn = nn.BCEWithLogitsLoss()

optimizer = Adam(model.parameters(), lr=lr)

In [46]:
import numpy as np
import matplotlib.pyplot as plt


def accuracy(model, dataloader):
    cnt = 0
    acc = 0

    for data in dataloader:
        inputs, labels = data
        inputs, labels = inputs.to(backend), labels.to(backend)

        preds = model(inputs)
        # preds = torch.argmax(preds, dim=-1)
        preds = (preds > 0).long()[..., 0]

        cnt += labels.shape[0]
        acc += (labels == preds).sum().item()

    return acc / cnt

In [47]:
# 시간관계상 50 -> 3으로 변경
n_epochs = 3

for epoch in range(n_epochs):
  total_loss = 0.
  model.train()
  for data in train_loader:
    model.zero_grad()
    inputs, labels = data
    inputs, labels = inputs.to(backend), labels.to(backend).float()

    preds = model(inputs)[..., 0]
    loss = loss_fn(preds, labels)
    loss.backward()
    optimizer.step()

    total_loss += loss.item()

  print(f"Epoch {epoch:3d} | Train Loss: {total_loss}")

  with torch.no_grad():
    model.eval()
    train_acc = accuracy(model, train_loader)
    test_acc = accuracy(model, test_loader)
    print(f"=========> Train acc: {train_acc:.3f} | Test acc: {test_acc:.3f}")

Epoch   0 | Train Loss: 218.48692874610424
Epoch   1 | Train Loss: 150.3742184638977
Epoch   2 | Train Loss: 119.84587635099888
