<a href="https://colab.research.google.com/github/LEFT-BEE/small_project/blob/main/Transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Attention is All You Need (NIPS 2017) 실습

본 코드는 기본적으로 Transformer논문의 내용을 최대한 따른다 또한 

(German_English).ipynb [링크 텍스트](https://github.com/ndb796/Deep-Learning-Paper-Review-and-Practice/blob/master/code_practices/Attention_is_All_You_Need_Tutorial_)

의 내용을 그대로 따른다.

pytorch에서는 텍스트에 대한 여러 추상화 기능을 제공하는 자연어 처리 라이브러리를 제공한다

pyroch는 파일로드하기 ,  토큰화 , 단어집합, 정수 인코딩, 배치화, 단어벡터등을 처리할 수 있다. 

물론 모든 기능을 제공하지는 않고 훈련데이터, 검증데이터, 테스트데이터를 분리하는 작업은 별도로 해주어야 한다 

또한 각 샘플에 대해서 각 샘플에 대해서 단어들을 임베딩 벡터로 맵핑해주는 작업등은 nn.Embedding()을 통해서 해결해야한다

In [None]:
!pip install torchtext==0.6.0

### 데이터 전처리 

spaCy 라이브러리: 문장의 토큰화(tokenization), 태깅(tagging) 등의 전처리 기능을 위한 라이브러리

In [None]:
%%capture
!python -m spacy download en
!python -m spacy download de

In [None]:
import spacy
spacy_en = spacy.load('en')
spacy_de = spacy.load('de')

#간단히 토큰화 기능 사용해보기
tokenized = spacy_en.tokenizer("i am a graduate studnet.")

for i in tokenized:
  print(i.text)

영어 및 독일어의 토큰화 함수정의

In [None]:
def tokenize_de(text):
  return [token.text for token in spacy_de.tokenizer(text)]

def tokenize_en(text):
  return [token.text for token in spacy_en.tokenizer(text)]

필드(filed)라이브러리를 이용해 데이터셋에 대한 전처리 내용을 명시한다 

번역 목표
독일어(SRC) -> 영어(TRG)

In [None]:
from torchtext.data import Field, BucketIterator
SRC = Field(tokenize=tokenize_de, init_token="<sos>", eos_token="<eos>", lower=True, batch_first=True)
TRG = Field(tokenize=tokenize_en, init_token="<sos>", eos_token="<eos>", lower=True, batch_first=True)

In [None]:
#대표적인 영어-독어 번역 데이터셋인 Multi30k를 불러온다
#전처리 모델 SRC , TRG
from torchtext.datasets import Multi30k
train_dataset, valid_dataset, test_dataset = Multi30k.splits(exts=(".de", ".en"), fields=(SRC, TRG))

In [None]:
print(f"학습데이터 셋 크기 {len(train_dataset)}")
print(f"평가 데이터셋(validation dataset) 크기: {len(valid_dataset.examples)}개")
print(f"테스트 데이터셋(testing dataset) 크기: {len(test_dataset.examples)}개")

In [None]:
print(vars(train_dataset[1])['src'])
print(vars(train_dataset[1])['trg'])
#요소에 시퀀스가 들어있음, 독일어는 아직 순서를 바꾸지 않음

In [None]:
#필드객체의 build_vocab 메서드를 이용해 영어와 독어의 단어사전을 생성한다 
#최소 2번 이상 등장한 단어만을 선택한다.

SRC.build_vocab(train_dataset , min_freq =2)
TRG.build_vocab(train_dataset , min_freq = 2)

print(f"len SRC =  {len(SRC.vocab)}")
print(f"len TRG =  {len(TRG.vocab)}")

In [None]:
print(TRG.vocab.stoi["abcabc"]) # 없는 단어: 0
print(TRG.vocab.stoi[TRG.pad_token]) # 패딩(padding): 1
print(TRG.vocab.stoi["<sos>"]) # <sos>: 2
print(TRG.vocab.stoi["<eos>"]) # <eos>: 3
print(TRG.vocab.stoi["hello"])
print(TRG.vocab.stoi["world"])
#각단어가 사전의 어느 인덱스에 위치하는지 표시해줌

한 문장안에 단어가 순서대로 나열된 상태로 네트워크가 입력되어야 한다. 

따라서 하나의 배치에 포함된 문장들이 가지는 단어의 개수가 유사하도록 만들면 좋다 

이를 위해 Bucketiterator를 사용한다


In [None]:
import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

BATCH_SIZE = 128

# 일반적인 데이터 로더(data loader)의 iterator와 유사하게 사용 가능
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_dataset, valid_dataset, test_dataset),
    batch_size=BATCH_SIZE,
    device=device)
#매 반복마다 배치사이즈의 데이터를 넘겨줌

In [None]:
for i , batch in enumerate(train_iterator):
  src = batch.src
  trg = batch.trg
  print(f"첫 번째 배치 크기: {src.shape}")

    # 현재 배치에 있는 하나의 문장에 포함된 정보 출력
  for i in range(src.shape[1]):
      print(f"인덱스 {i}: {src[0][i].item()}") # 여기에서는 [Seq_num, Seq_len]

    # 첫 번째 배치만 확인
  break

  # sos인 2번 인덱스와 eos인 3번인덱스 사이에 있는 단어만이 실제 의미를 가지는단어이고
  #eos뒤에 1로 패딩되있음을 알 수 있다.(의미없음) 33 index까지 채워넣어있음을 볼 수 있다.

multi-head-attention 아키텍처 

어텐션은 세가지 요소를 입력으로 받는다.

-쿼리(queries)
-키(key)
-값(value)

하이퍼 파라미터

hidden_dim = 임베딩벡터

n_heads = 헤드(head)의 개수 = scaled dot_prodct attention의 개수

dropout_ratio = 드롭아웃의 비율


In [None]:
import torch.nn as nn

class MultiHeadAttentionLayer(nn.Module):
    def __init__(self, hidden_dim, n_heads, dropout_ratio, device):
        super().__init__()

        assert hidden_dim % n_heads == 0 # q,k,v의 차원은 hidden_dim / n_head

        self.hidden_dim = hidden_dim # 임베딩 차원
        self.n_heads = n_heads # 헤드(head)의 개수: 서로 다른 어텐션(attention) 컨셉의 수
        self.head_dim = hidden_dim // n_heads # 각 헤드(head)에서의 임베딩 차원

        self.fc_q = nn.Linear(hidden_dim, hidden_dim) # Query 값에 적용될 FC 레이어
        self.fc_k = nn.Linear(hidden_dim, hidden_dim) # Key 값에 적용될 FC 레이어
        self.fc_v = nn.Linear(hidden_dim, hidden_dim) # Value 값에 적용될 FC 레이어

        self.fc_o = nn.Linear(hidden_dim, hidden_dim)

        self.dropout = nn.Dropout(dropout_ratio)

        self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to(device)
        #모델에서 사용하는 input Tensor들은 input = input.to(device)를 호출해야한다.

    def forward(self, query, key, value, mask = None):

        batch_size = query.shape[0]

        # query: [batch_size, query_len, hidden_dim]
        # key: [batch_size, key_len, hidden_dim]
        # value: [batch_size, value_len, hidden_dim]
 
        Q = self.fc_q(query)
        K = self.fc_k(key)
        V = self.fc_v(value)  

        # Q: [batch_size, query_len, hidden_dim]
        # K: [batch_size, key_len, hidden_dim]
        # V: [batch_size, value_len, hidden_dim]

        # hidden_dim → n_heads X head_dim 형태로 변형
        # n_heads(h)개의 서로 다른 어텐션(attention) 컨셉을 학습하도록 유도
        Q = Q.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        K = K.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        V = V.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        #permute함수는 차원의 순서를 바꿔 주는 역할 
        # 여기서는 hidden_dim 값을 head_dim * head로 바꿔줬는데 이는 아까 각각의 linear
        #를 여러번 거치고 들어가는 것보다 효율적이다

        # Q: [batch_size, n_heads, query_len, head_dim]
        # K: [batch_size, n_heads, key_len, head_dim]
        # V: [batch_size, n_heads, value_len, head_dim]

        # Attention Energy 계산
        energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale

        # energy: [batch_size, n_heads, query_len, key_len]

        # 마스크(mask)를 사용하는 경우
        if mask is not None:
            # 마스크(mask) 값이 0인 부분을 -1e10으로 채우기
            #시퀀스를 읽는데 필요없는 부분은 소프트맥스하기전에 0에 가까운 값으로 만들어줌
            energy = energy.masked_fill(mask==0, -1e10)

        # 어텐션(attention) 스코어 계산: 각 단어에 대한 확률 값
        attention = torch.softmax(energy, dim=-1)

        # attention: [batch_size, n_heads, query_len, key_len]

        # 여기에서 Scaled Dot-Product Attention을 계산
        x = torch.matmul(self.dropout(attention), V)

        # x: [batch_size, n_heads, query_len, head_dim]

        x = x.permute(0, 2, 1, 3).contiguous()

        # x: [batch_size, query_len, n_heads, head_dim]

        x = x.view(batch_size, -1, self.hidden_dim) #다시 원래의 차원인 hidden_dim으로 바꿔서 
        #일자로 늘린다.

        # x: [batch_size, query_len, hidden_dim]

        x = self.fc_o(x)

        # x: [batch_size, query_len, hidden_dim]

        return x, attention

In [None]:
class PositionwiseFeedforwardLayer(nn.Module): #하나의 레이어를 구성하는데 있어 필요한것 
    def __init__(self, hidden_dim, pf_dim, dropout_ratio):
        super().__init__()

        self.fc_1 = nn.Linear(hidden_dim, pf_dim)
        self.fc_2 = nn.Linear(pf_dim, hidden_dim) #아까와 같이 입력과 출력의 값이 같게 해준다

        self.dropout = nn.Dropout(dropout_ratio)

    def forward(self, x):

        # x: [batch_size, seq_len, hidden_dim]

        x = self.dropout(torch.relu(self.fc_1(x)))

        # x: [batch_size, seq_len, pf_dim]

        x = self.fc_2(x)

        # x: [batch_size, seq_len, hidden_dim]

        return x

하나의 인코더 레이어에 대해 정의한다. 입력과 출력의 차원이 같으므로 이러한 특징을 이용해 트랜스포머의 인코더는 인코더 레이어를 여러번 중첩해 사용할 수 있다.

하이퍼 파라미터(hyperparameter)
hidden_dim: 하나의 단어에 대한 임베딩 차원   
n_heads: 헤드(head)의 개수 = scaled dot-product    attention의 개수   
pf_dim: Feedforward 레이어에서의 내부 임베딩 차원   
dropout_ratio: 드롭아웃(dropout) 비율   
<pad> 토큰에 대하여 마스크(mask) 값을 0으로 설정합니다.   



In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, hidden_dim, n_heads, pf_dim, dropout_ratio, device):
        super().__init__()

        self.self_attn_layer_norm = nn.LayerNorm(hidden_dim)
        self.ff_layer_norm = nn.LayerNorm(hidden_dim)
        self.self_attention = MultiHeadAttentionLayer(hidden_dim, n_heads, dropout_ratio, device)
        self.positionwise_feedforward = PositionwiseFeedforwardLayer(hidden_dim, pf_dim, dropout_ratio)
        #이러한 레이어들이 Residual Connect되어 있는 구조이다 
        self.dropout = nn.Dropout(dropout_ratio)

    # 하나의 임베딩이 복제되어 Query, Key, Value로 입력되는 방식
    def forward(self, src, src_mask):

        # src: [batch_size, src_len, hidden_dim]
        # src_mask: [batch_size, src_len]

        # self attention
        # 필요한 경우 마스크(mask) 행렬을 이용하여 어텐션(attention)할 단어를 조절 가능
        _src, _ = self.self_attention(src, src, src, src_mask)

        # dropout, residual connection and layer norm
        src = self.self_attn_layer_norm(src + self.dropout(_src)) #더해줌으로써 residual connection을 수행한다

        # src: [batch_size, src_len, hidden_dim]

        # position-wise feedforward
        _src = self.positionwise_feedforward(src)

        # dropout, residual and layer norm
        src = self.ff_layer_norm(src + self.dropout(_src))#더해줌으로써 residual connection을 수행한다

        # src: [batch_size, src_len, hidden_dim]

        return src

인코더(Encoder) 아키텍쳐

원본 논문과는 다르게 위치 임베딩을 학습하는 형태로 구현된다. BERT와 같은 모던 트랜스포머 아키텍쳐에서 사용되는 방식이다.

"pad" 토큰에 대해서 마스크 값을 0으로 설정한다.

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, n_layers, n_heads, pf_dim, dropout_ratio, device, max_length=100):
        super().__init__()

        self.device = device

        self.tok_embedding = nn.Embedding(input_dim, hidden_dim) #위에서 해준거는 토크나이저를 통해
        #단어들을 인덱스화 시키고 이렇게 인덱스화를 해줘야지만(전처리) nn.Embedding을 사용할 수 있음
        self.pos_embedding = nn.Embedding(max_length, hidden_dim) 
        #위 코드가 논문과 다른 점인데 원래 주기함수를 사인이나 코사인 함수처럼 정해진 주기 함수를 사용하지만
        #최근의 이용방법은 이 위치 임베딩을 학습레이어를 통해 학스시킨다.


        self.layers = nn.ModuleList([EncoderLayer(hidden_dim, n_heads, pf_dim, dropout_ratio, device) for _ in range(n_layers)])

        self.dropout = nn.Dropout(dropout_ratio)

        self.scale = torch.sqrt(torch.FloatTensor([hidden_dim])).to(device)

    def forward(self, src, src_mask):

        # src: [batch_size, src_len]
        # src_mask: [batch_size, src_len]

        batch_size = src.shape[0] #문장의 갯수
        src_len = src.shape[1]#단어의 수가 가장 많은 문장의 단어수

        pos = torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)
        #repeat은 특정텐서의 size차원의 데이터를 반복한다.
        #즉 src_len만큼의 텐서를 생성하고 이를 문장수만큼 다 들어갈 수 있게 만들어준다
        #이렇게 만들어준 텐서에 pos 임베딩값을 넣어준다

        # pos: [batch_size, src_len]

        # 소스 문장의 임베딩과 위치 임베딩을 더한 것을 사용
        src = self.dropout((self.tok_embedding(src) * self.scale) + self.pos_embedding(pos))
        #따라서 위치정보와 단어의 의미가 섞어진 즉, 문맥의 의미를 보인다.

        # src: [batch_size, src_len, hidden_dim]

        # 모든 인코더 레이어를 차례대로 거치면서 순전파(forward) 수행
        for layer in self.layers:
            src = layer(src, src_mask)

        # src: [batch_size, src_len, hidden_dim]

        return src # 마지막 레이어의 출력을 반환 
        #문맥의 정보가 src에 return 된다.

디코더 레이어  아키텍쳐   
* 하나의 디코더 레이어에 대해 이렇게 정의된다 
  - 입력과 출력의 차원이 같다.
  - 이러한 특징을 이용해 트랜스포머의 디코더는 디코더 레이어를 여러번 중첩해 사용한다.
* 소스문장의 \<pad> 토큰에 대하여 마스크 값을 0으로 설정한다
* 타겟문장에서 각단어는 다음단어가 무엇인지 알 수 없도록 만들기 위해 마스크를 사용한다.
* 디코더는 한 블록안에 어텐션이 두번들어가는데 하나는 self-attention이고 또 하나는 encoder-decoder attention이다.


In [None]:
class DecoderLayer(nn.Module):
    def __init__(self, hidden_dim, n_heads, pf_dim, dropout_ratio, device):
        super().__init__()

        self.self_attn_layer_norm = nn.LayerNorm(hidden_dim)
        self.enc_attn_layer_norm = nn.LayerNorm(hidden_dim)
        self.ff_layer_norm = nn.LayerNorm(hidden_dim)
        
        self.self_attention = MultiHeadAttentionLayer(hidden_dim, n_heads, dropout_ratio, device)
        self.encoder_attention = MultiHeadAttentionLayer(hidden_dim, n_heads, dropout_ratio, device)
        self.positionwise_feedforward = PositionwiseFeedforwardLayer(hidden_dim, pf_dim, dropout_ratio)
        self.dropout = nn.Dropout(dropout_ratio)

    # 인코더의 출력 값(enc_src)을 어텐션(attention)하는 구조
    def forward(self, trg, enc_src, trg_mask, src_mask):

        # trg: [batch_size, trg_len, hidden_dim]
        # enc_src: [batch_size, src_len, hidden_dim]
        # trg_mask: [batch_size, trg_len]
        # src_mask: [batch_size, src_len]

        # self attention
        # 자기 자신에 대하여 어텐션(attention)
        _trg, _ = self.self_attention(trg, trg, trg, trg_mask)

        # dropout, residual connection and layer norm
        trg = self.self_attn_layer_norm(trg + self.dropout(_trg))

        # trg: [batch_size, trg_len, hidden_dim]

        # encoder attention
        # 디코더의 쿼리(Query)를 이용해 인코더를 어텐션(attention)
        _trg, attention = self.encoder_attention(trg, enc_src, enc_src, src_mask)

        # dropout, residual connection and layer norm
        trg = self.enc_attn_layer_norm(trg + self.dropout(_trg))

        # trg: [batch_size, trg_len, hidden_dim]

        # positionwise feedforward
        _trg = self.positionwise_feedforward(trg)

        # dropout, residual and layer norm
        trg = self.ff_layer_norm(trg + self.dropout(_trg))

        # trg: [batch_size, trg_len, hidden_dim]
        # attention: [batch_size, n_heads, trg_len, src_len]

        return trg, attention

디코더 아키텍쳐
* 전체 디코더 아키텍처를 정의한다.
* 원본 논문과 다르게 위치 임베딩을 학습하는 형태로 구현된다.
* Seq2seq와 마찬가지로 실제로 추론 시기에는 디코더를 반복적으로 넣을 필요가 있다.
  - 학습 시기에는 한번에 출력 문장을 구해 학습할 수 있다.
* 타겟 문장에서 각 단어는 다음 단어가 무엇인지 알 수 없도록 만들기 위해 마스크를 사용한다.


In [None]:
class Decoder(nn.Module):
    def __init__(self, output_dim, hidden_dim, n_layers, n_heads, pf_dim, dropout_ratio, device, max_length=100):
        super().__init__()

        self.device = device

        self.tok_embedding = nn.Embedding(output_dim, hidden_dim)
        self.pos_embedding = nn.Embedding(max_length, hidden_dim)

        self.layers = nn.ModuleList([DecoderLayer(hidden_dim, n_heads, pf_dim, dropout_ratio, device) for _ in range(n_layers)])

        self.fc_out = nn.Linear(hidden_dim, output_dim)

        self.dropout = nn.Dropout(dropout_ratio)

        self.scale = torch.sqrt(torch.FloatTensor([hidden_dim])).to(device)

    def forward(self, trg, enc_src, trg_mask, src_mask):

        # trg: [batch_size, trg_len]
        # enc_src: [batch_size, src_len, hidden_dim]
        # trg_mask: [batch_size, trg_len]
        # src_mask: [batch_size, src_len]

        batch_size = trg.shape[0]
        trg_len = trg.shape[1]

        pos = torch.arange(0, trg_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)

        # pos: [batch_size, trg_len]

        trg = self.dropout((self.tok_embedding(trg) * self.scale) + self.pos_embedding(pos))

        # trg: [batch_size, trg_len, hidden_dim]

        for layer in self.layers:
            # 소스 마스크와 타겟 마스크 모두 사용
            trg, attention = layer(trg, enc_src, trg_mask, src_mask)

        # trg: [batch_size, trg_len, hidden_dim]
        # attention: [batch_size, n_heads, trg_len, src_len]

        output = self.fc_out(trg)

        # output: [batch_size, trg_len, output_dim]

        return output, attention

트랜스포머 아키텍처
* 최종적인 전체 트랜스포머모델을 정의한다.
* 입력이 들어왔을때 앞서 정의한 인코더와 디코더를 거쳐 추력 문장을 생성한다.

In [None]:
class Transformer(nn.Module):
    def __init__(self, encoder, decoder, src_pad_idx, trg_pad_idx, device):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.device = device

    # 소스 문장의 <pad> 토큰에 대하여 마스크(mask) 값을 0으로 설정
    def make_src_mask(self, src):

        # src: [batch_size, src_len]

        src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)

        # src_mask: [batch_size, 1, 1, src_len]

        return src_mask

    # 타겟 문장에서 각 단어는 다음 단어가 무엇인지 알 수 없도록(이전 단어만 보도록) 만들기 위해 마스크를 사용
    def make_trg_mask(self, trg):

        # trg: [batch_size, trg_len]

        """ (마스크 예시)
        1 0 0 0 0
        1 1 0 0 0
        1 1 1 0 0
        1 1 1 0 0
        1 1 1 0 0
        """
        trg_pad_mask = (trg != self.trg_pad_idx).unsqueeze(1).unsqueeze(2)

        # trg_pad_mask: [batch_size, 1, 1, trg_len]

        trg_len = trg.shape[1]

        """ (마스크 예시)
        1 0 0 0 0
        1 1 0 0 0
        1 1 1 0 0
        1 1 1 1 0
        1 1 1 1 1
        """
        trg_sub_mask = torch.tril(torch.ones((trg_len, trg_len), device = self.device)).bool()

        # trg_sub_mask: [trg_len, trg_len]

        trg_mask = trg_pad_mask & trg_sub_mask

        # trg_mask: [batch_size, 1, trg_len, trg_len]

        return trg_mask

    def forward(self, src, trg):

        # src: [batch_size, src_len]
        # trg: [batch_size, trg_len]

        src_mask = self.make_src_mask(src)
        trg_mask = self.make_trg_mask(trg)

        # src_mask: [batch_size, 1, 1, src_len]
        # trg_mask: [batch_size, 1, trg_len, trg_len]

        enc_src = self.encoder(src, src_mask)

        # enc_src: [batch_size, src_len, hidden_dim]

        output, attention = self.decoder(trg, enc_src, trg_mask, src_mask)

        # output: [batch_size, trg_len, output_dim]
        # attention: [batch_size, n_heads, trg_len, src_len]

        return output, attention

In [None]:
INPUT_DIM = len(SRC.vocab)
OUTPUT_DIM = len(TRG.vocab)
HIDDEN_DIM = 256
ENC_LAYERS = 3
DEC_LAYERS = 3
ENC_HEADS = 8
DEC_HEADS = 8
ENC_PF_DIM = 512
DEC_PF_DIM = 512
ENC_DROPOUT = 0.1
DEC_DROPOUT = 0.1

In [None]:
SRC_PAD_IDX = SRC.vocab.stoi[SRC.pad_token]
TRG_PAD_IDX = TRG.vocab.stoi[TRG.pad_token]

# 인코더(encoder)와 디코더(decoder) 객체 선언
enc = Encoder(INPUT_DIM, HIDDEN_DIM, ENC_LAYERS, ENC_HEADS, ENC_PF_DIM, ENC_DROPOUT, device)
dec = Decoder(OUTPUT_DIM, HIDDEN_DIM, DEC_LAYERS, DEC_HEADS, DEC_PF_DIM, DEC_DROPOUT, device)

# Transformer 객체 선언
model = Transformer(enc, dec, SRC_PAD_IDX, TRG_PAD_IDX, device).to(device)

모델의 가중치 파라미터 초기화

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

In [None]:
def initialize_weights(m):
    if hasattr(m, 'weight') and m.weight.dim() > 1:
      nn.init.xavier_uniform_(m.weight.data)
model.apply(initialize_weights)

In [None]:
import torch.optim as optim

LEARNING_RATE = 0.0005
optimizer = torch.optim.Adam(model.parameters() , lr = LEARNING_RATE)

criterion = nn.CrossEntropyLoss(ignore_index = TRG_PAD_IDX)

In [None]:
def train(model , iterator , optimzier , criterion , clip):
  model.train() #학습모드
  epoch_loss = 0

  for i , batch in enumerate(iterator):
    src = batch.src
    trg = batch.trg

    optimzier.zero_grad()
    # 출력 단어의 마지막 인덱스(<eos>)는 제외
    # 입력을 할 때는 <sos>부터 시작하도록 처리
    output , _ = model(src , trg[: , :-1] # 뒤집어준다)
    #output : [배치크기 , trg_len-1 , output_dim]
    # trg [배치크기 , trg_len]
    output_dim = output.shape[-1]

    output = output.contiguous().view(-1 , output_dim) 
    # 출력 단어의 인덱스 0(<sos>)은 제외
    trg = trg[:,1:].contiguous().view(-1)

    # 모델의 출력 결과와 타겟 문장을 비교하여 손실 계산
    loss = criterion(output, trg)
    loss.backward() # 기울기(gradient) 계산

    # 기울기(gradient) clipping 진행
    torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

    # 파라미터 업데이트
    optimizer.step()

    # 전체 손실 값 계산
    epoch_loss += loss.item()

return epoch_loss / len(iterator)

In [None]:
# 모델 평가(evaluate) 함수
def evaluate(model, iterator, criterion):
    model.eval() # 평가 모드
    epoch_loss = 0

    with torch.no_grad():
        # 전체 평가 데이터를 확인하며 학습을 멈춰둠
        for i, batch in enumerate(iterator):
            src = batch.src
            trg = batch.trg

            # 출력 단어의 마지막 인덱스(<eos>)는 제외
            # 입력을 할 때는 <sos>부터 시작하도록 처리
            output, _ = model(src, trg[:,:-1])

            # output: [배치 크기, trg_len - 1, output_dim]
            # trg: [배치 크기, trg_len]

            output_dim = output.shape[-1]

            output = output.contiguous().view(-1, output_dim)
            # 출력 단어의 인덱스 0(<sos>)은 제외
            trg = trg[:,1:].contiguous().view(-1)

            # output: [배치 크기 * trg_len - 1, output_dim]
            # trg: [배치 크기 * trg len - 1]

            # 모델의 출력 결과와 타겟 문장을 비교하여 손실 계산
            loss = criterion(output, trg)

            # 전체 손실 값 계산
            epoch_loss += loss.item()

    return epoch_loss / len(iterator)

- 학습및 검증 진행
-학습 횟수(epoch) 10번

In [None]:

import math
import time

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
import time
import math
import random

N_EPOCHS = 10
CLIP = 1
best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
    start_time = time.time() # 시작 시간 기록

    train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
    valid_loss = evaluate(model, valid_iterator, criterion)

    end_time = time.time() # 종료 시간 기록
    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'transformer_german_to_english.pt')

    print(f'Epoch: {epoch + 1:02} | Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):.3f}')
    print(f'\tValidation Loss: {valid_loss:.3f} | Validation PPL: {math.exp(valid_loss):.3f}')

In [None]:
# 학습된 모델 저장
from google.colab import files

files.download('transformer_german_to_english.pt')

In [None]:
!wget https://postechackr-my.sharepoint.com/:u:/g/personal/dongbinna_postech_ac_kr/EbWFiKBmscFBrbzCQxRyqwsBwcXgdKdimkdsBl2dE9VYaQ?download=1 -O transformer_german_to_english.pt

In [None]:
model.load_state_dict(torch.load('transformer_german_to_english.pt'))

test_loss = evaluate(model, test_iterator, criterion)

print(f'Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):.3f}')

## Positional Encoding

In [None]:
import math
import matplotlib.pyplot as plt

n = 4 #단어(word) 한 시퀀스에 단어 4개 
dim = 8 # 단어를 8차원 벡터로 임베팅 시킴 

def get_angles(pos , i , dim):
  angles = 1 / math.pow(10000 , (2 *(i//2)) / dim)
  return pos * angles

def get_positional_encoding(pos , i, dim):
  if i%2 ==0:
    return math.sin(get_angles(pos ,i,dim))
  else:
    return math.cos(get_angles(pos,i,dim))

In [None]:
result = [[0] * dim for i in range(4)]

for i in range(n):
  for j in range(dim):
    result[i][j]= get_positional_encoding(i,j,dim)

아래와 같이 주기함수를 통해 위치인코딩을 잘하였다

In [None]:
plt.xlabel("Depth")
plt.ylabel("Position")
plt.pcolormesh(result , cmap = 'Blues')