<a href="https://colab.research.google.com/github/HyunMiPark/study_AI/blob/main/%5B2%EC%A3%BC%EC%B0%A8%5D_%EA%B8%B0%EB%B3%B8%EA%B3%BC%EC%A0%9C_%EB%8B%A4%EC%9D%8C%EB%8B%A8%EC%96%B4_%EC%98%88%EC%B8%A1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [23]:
!pip install datasets sacremoses



Last word prediction dataset 준비

In [24]:
import torch #Pytorch의 기본 패키지(딥러닝 모델을 만들때 사용)
from datasets import load_dataset #huggingFace datasets 라이브러리에서 데이터셋을 불러오는 함수
from torch.utils.data import DataLoader #Pytorch의 데이터 로더, 배치 단위로 데이터를 불러올 때 사용
from torch.nn.utils.rnn import pad_sequence #시퀀스 길이를 맞춰주는 함수(문장을 같은 길이로 패딩할 때 사용)
from transformers import BertTokenizerFast #빠른 토크나이징을 지원하는 버전
from tokenizers import ( #토큰화 과정에서 필요한 다양한 도구들(직접 커스텀 토크나이저를 만들 때 사용 됨)
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
)

#IMDB 데이터셋 로드 : 데이터 셋중 5%의 데이터만 사용 -> 훈련 속도를 높이기 위해 데이터 일부만 선택
train_ds = load_dataset("stanfordnlp/imdb", split="train[:5%]")
test_ds = load_dataset("stanfordnlp/imdb", split="test[:5%]")

#Hugging Fae에서 BERT 토크나이저 로드
#bert-base-uncased : 영어 소문자 버전의 BERT 모델 -> 이제 텍스트 데이터를 BERT 모델이 이해할 수 있는 숫자 토큰으로 변환 가능
tokenizer = torch.hub.load('huggingface/pytorch-transformers', 'tokenizer', 'bert-base-uncased')


#데이터 전처리
def collate_fn(batch):
  max_len = 400
  texts, labels = [], []
  for row in batch:
    #마지막 세번째 토큰값을 정답으로 사용 -> 문장의 마지막 단어를 예측하는 모델을 만들려는 것
    labels.append(tokenizer(row['text'], truncation=True, max_length=max_len).input_ids[-3])
    #마지막 세번째 토큰을 제외한 나머지를 입력 데이터로 사용 -> 마지막 단어를 제외하고 이전 단어들을 모델 입력값으로 사용하는 것
    texts.append(torch.LongTensor(tokenizer(row['text'], truncation=True, max_length=max_len).input_ids[:-3]))

  #문장의 길이를 맞추기 위해 padding 추가(PAD 토큰)
  texts = pad_sequence(texts, batch_first=True, padding_value=tokenizer.pad_token_id)
  #정답 레이블을 텐서로 변환
  labels = torch.LongTensor(labels)

  #(입력문장, 정답) 반환
  return texts, labels

train_loader = DataLoader(
    train_ds, batch_size=64, shuffle=True, collate_fn=collate_fn
)
test_loader = DataLoader(
    test_ds, batch_size=64, shuffle=False, collate_fn=collate_fn
)

Using cache found in /root/.cache/torch/hub/huggingface_pytorch-transformers_main


Loss function 및 classifier output 변경
- 마지막 token id를 예측하는 것이기 때문에 binary classification이 아닌 일반적인 classification 문제로 바뀝니다. MNIST 과제에서 했던 것 처럼 loss와 TextClassifier의 출력 차원을 잘 조정하여 task를 풀 수 있도록 수정하시면 됩니다.

In [25]:
from torch import nn #Pytorch의 신경망 모듈
from math import sqrt #루트연산을 위한 함수

class MultiHeadAttention(nn.Module):
  def __init__(self, input_dim, d_model, n_heads):
    super().__init__()

    self.input_dim = input_dim #입력 차원
    self.d_model = d_model #모델 차원(임베딩)
    self.n_heads = n_heads #Attention 헤드 개수

    #입력 데이터를 Q(Query),K(Key),V(Value) 벡터로 변환하는 역할
    self.wq = nn.Linear(input_dim, d_model)
    self.wk = nn.Linear(input_dim, d_model)
    self.wv = nn.Linear(input_dim, d_model)
    #Attention의 최종 출력을 다시 d_model 차원으로 변환
    self.dense = nn.Linear(d_model, d_model)

    #Attention 가중치를 확률값으로 변환
    self.softmax = nn.Softmax(dim=-1)

  def forward(self, x, mask):
    #x:입력텐서, mask:마스킹 정보(Attention 특정 단어를 무시할 때 사용)
    #B(batch size) S(Sequence Length) D(각 Attention 헤드의 차원)
    q, k, v = self.wq(x), self.wk(x), self.wv(x)
    B, S, D = q.shape[0], q.shape[1], self.d_model // self.n_heads

    #reshape:d_model을 여러개의 n_heads로 분할
    #transpose:차원 순서를 변경(B, H, S, D)
    q = q.reshape((B, S, self.n_heads, D)).transpose(1, 2)
    k = k.reshape((B, S, self.n_heads, D)).transpose(1, 2)
    v = v.reshape((B, S, self.n_heads, D)).transpose(1, 2)

    #행렬곱 -> Attention Score(유사도) 계산
    score = torch.matmul(q, k.transpose(-1, -2)) # (B, H, S, D) * (B, H, D, S) = (B, H, S, S)
    #차원의 제곱근으로 나누어 값의 크기를 조정(스케일링)
    score = score / sqrt(self.d_model)

    #mask가 존재하면 마스킹된 부분을 매우 작은 값으로 설정
    #Softmax에서 0으로 만듦 -> 패딩된 부분이 Attention Score에 영향을 주지 않도록 방지
    if mask is not None:
      score = score + (mask[:, None] * -1e9)

    #Attention 가중치를 확률값으로 변환
    score = self.softmax(score)
    #가중치를 V벡터에 곱해 최종 값 계산
    result = torch.matmul(score, v) #(B, H, S, D)

    #ranspose:다시 원래 차원 순서로 변경
    #reshape:멀티 헤드 결과를 다시 합쳐서 하나의 텐서로 변환
    result = result.transpose(1, 2).reshape(B, S, -1)
    #최종 출력을 d_model차원으로 변환
    result = self.dense(result)

    return result

In [26]:
#Transformer의 한 층(layer)을 구현
class TransformerLayer(nn.Module):
  def __init__(self, input_dim, d_model, n_heads, dff):
    super().__init__()

    self.input_dim = input_dim #입력 차원
    self.d_model = d_model #모델 차원
    self.n_heads = n_heads #멀티 헤드 Attention에서 헤드의 개수
    self.dff = dff #Feed Forward Network(FFN)의 은닉층 차원

    #입력을 Q,K,V로 변환 후 Attention 수행하여 중요도를 반영한 출력을 생성
    self.sa = MultiHeadAttention(input_dim, d_model, n_heads)
    self.ffn = nn.Sequential(
      nn.Linear(d_model, dff), #차원 확장
      nn.ReLU(), #비선형 활성화 함수 적용
      nn.Linear(dff, d_model) #다시 원래 차원(d_model)로 축소
    )

    self.norm1 = nn.LayerNorm(d_model) #정규화 -> 출력의 안정성을 높이기 위해
    self.dropout1 = nn.Dropout(0.1) #과적합 방지

    self.norm2 = nn.LayerNorm(d_model)
    self.dropout2 = nn.Dropout(0.1)

  def forward(self, x, mask):
    #x:입력텐서, mask:마스킹 정보(Attention 특정 단어를 무시할 때 사용)
    x1 = self.sa(x, mask) #Attention 수행
    x1 = self.dropout1(x1)
    x1 = self.norm1(x + x1) #잔차 연결(Residual Connection) 후 정규화

    x2 = self.ffn(x1)
    x2 = self.dropout2(x2)
    x2 = self.norm2(x1 + x2)

    return x2

In [27]:
import numpy as np #수치 연산


def get_angles(pos, i, d_model):
    #pos:위치값(position index), i:임베딩 차원의 index, d_model:임베딩 차원의 크기
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    #positon:최대 시퀀스 길이(max_len), d_model:임베딩 차원의 크기
    angle_rads = get_angles(np.arange(position)[:, None], np.arange(d_model)[None, :], d_model)
    #짝수 index -> sin 함수 적용
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    #홀수 index -> cos 함수 적용
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[None, ...]

    return torch.FloatTensor(pos_encoding)


max_len = 400
print(positional_encoding(max_len, 256).shape)

torch.Size([1, 400, 256])


In [28]:
#텍스트 분류 모델을 구현
class TextClassifier(nn.Module):
  def __init__(self, vocab_size, d_model, n_heads, n_layers, dff):
    super().__init__()

    self.vocab_size = vocab_size #단어 사전 크기
    self.d_model = d_model #모델의 임베딩 차원
    self.n_heads = n_heads #멀티 헤드 Attention에서 헤드 개수
    self.n_layers = n_layers #Transformer 인코더 총 개수
    self.dff = dff #Feed Forward netwofk(FFN) 은닉층 크기

    #vocab_size 크기의 임베딩 테이블을 생성하여 단어를 고정된 크기의 벡터로 변환
    self.embedding = nn.Embedding(vocab_size, d_model)
    #requires_grad=False:위치 인코딩은 학습되지 않도록 고정
    self.pos_encoding = nn.parameter.Parameter(positional_encoding(max_len, d_model), requires_grad=False)
    #n_layers 개수만큼 Transformer 인코더 레이어 쌓기
    self.layers = nn.ModuleList([TransformerLayer(d_model, d_model, n_heads, dff) for _ in range(n_layers)])
    #[CLS] 토큰을 사용하여 1차원(이진 분류) 출력
    self.classification = nn.Linear(d_model, 1)

  def forward(self, x):
    mask = (x == tokenizer.pad_token_id)
    mask = mask[:, None, :]
    seq_len = x.shape[1]

    x = self.embedding(x) #단어를 임베딩 벡터로 변환
    x = x * sqrt(self.d_model) #임베딩 값을 조정
    x = x + self.pos_encoding[:, :seq_len] #위치 정보 추가

    for layer in self.layers:
      x = layer(x, mask)

    x = x[:, 0] #첫번째 토큰[CLS]을 사용하여 문장 전체를 대표하는 벡터로 활용
    x = self.classification(x) #(batch_size, 1)형태로 생성

    return x


model = TextClassifier(len(tokenizer), 32, 4, 5, 32)

학습 결과 report
- 기존 Transformer 실습에서 사용한 모델로 last word prediction을 학습하고 학습 경과를 report하면 됩니다.

In [29]:
from torch.optim import Adam

lr = 0.001
#model = model.to('cuda')
#이진분류(Binary Classification)에서 사용
#내부적으로 sigmoid함수 포함
#즉 모델의 출력값을 확률로 변환할 필요 없이 logits형태 그대로 사용 가능
loss_fn = nn.BCEWithLogitsLoss()

optimizer = Adam(model.parameters(), lr=lr)

In [30]:
import numpy as np
import matplotlib.pyplot as plt

def accuracy(model, dataloader):
  cnt = 0
  acc = 0

  for data in dataloader:
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda')

    preds = model(inputs)
    # preds = torch.argmax(preds, dim=-1)
    preds = (preds > 0).long()[..., 0]

    cnt += labels.shape[0]
    acc += (labels == preds).sum().item()

  return acc / cnt

In [32]:
n_epochs = 50

for epoch in range(n_epochs):
  total_loss = 0.
  model.train()
  for data in train_loader:
    model.zero_grad()
    inputs, labels = data
    inputs, labels = inputs.to('cuda'), labels.to('cuda').float()

    preds = model(inputs)[..., 0]
    loss = loss_fn(preds, labels)
    loss.backward()
    optimizer.step()

    total_loss += loss.item()

  print(f"Epoch {epoch:3d} | Train Loss: {total_loss}")

  with torch.no_grad():
    model.eval()
    train_acc = accuracy(model, train_loader)
    test_acc = accuracy(model, test_loader)
    print(f"=========> Train acc: {train_acc:.3f} | Test acc: {test_acc:.3f}")




RuntimeError: result type Float can't be cast to the desired output type Long