<a href="https://colab.research.google.com/github/KeyboarderSon/DeepLearning_Pytorch/blob/main/RNN/RNN_seq2seq.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 나동빈 seq2seq

#### 독일어를 영어로

spaCy 라이브러리 : 문장의 토큰화, 태깅 등의 전처리 기능을 위한 라이브러리

In [1]:
%%capture

!python -m spacy download en
!python -m spacy download de

In [2]:
import  spacy
#영어 및 독일어 토큰화
spacy_en= spacy.load('en')
spacy_de= spacy.load('de')

In [3]:
#간단하게 토큰화 기능 사용해보기
tokenized = spacy_en.tokenizer("I am a graduate student.")

for i, token in enumerate(tokenized):
  print(f"인덱스 {i}: {token.text}")

인덱스 0: I
인덱스 1: am
인덱스 2: a
인덱스 3: graduate
인덱스 4: student
인덱스 5: .


영어 및 독일어 토큰화 함수 정의

In [4]:
def tokenize_de(text):
  #토큰화한 뒤 각각 토큰을 리스트에 담은 뒤 reverse it - 입력 sequence의 reverse의 경우가 학습 성능 좋다고 함
  return [token.text for token in spacy_de.tokenizer(text)][::-1]


def tokenize_en(text):
  return [token.text for token in spacy_en.tokenizer(text)]

print(tokenize_en("I love you so much"))

['I', 'love', 'you', 'so', 'much']


field 라이브러리를 이용해 각 문장에 대한 구체적인 전처리 내용을 명시<br>
SRC 독일어 -> (TRG) 영어

In [5]:
!pip install -U torchtext==0.8.0

Collecting torchtext==0.8.0
  Downloading torchtext-0.8.0-cp37-cp37m-manylinux1_x86_64.whl (6.9 MB)
[K     |████████████████████████████████| 6.9 MB 7.5 MB/s 
Installing collected packages: torchtext
  Attempting uninstall: torchtext
    Found existing installation: torchtext 0.10.0
    Uninstalling torchtext-0.10.0:
      Successfully uninstalled torchtext-0.10.0
Successfully installed torchtext-0.8.0


In [7]:
from torchtext.data import Field, BucketIterator
SRC=Field(tokenize=tokenize_de, init_token="<sos>", eos_token="<eos>", lower=True)
TRG=Field(tokenize=tokenize_en, init_token="<sos>", eos_token="<eos>", lower=True)



실제 번역 데이터셋 불러오기

In [8]:
from torchtext.datasets import Multi30k

#각 문장을 Field에 정의된 내용을 기반으로 토큰화
train_dataset, valid_dataset, test_dataset = Multi30k.splits(exts=(".de", ".en"), fields=(SRC, TRG))

downloading training.tar.gz


training.tar.gz: 100%|██████████| 1.21M/1.21M [00:01<00:00, 957kB/s]


downloading validation.tar.gz


validation.tar.gz: 100%|██████████| 46.3k/46.3k [00:00<00:00, 166kB/s]


downloading mmt_task1_test2016.tar.gz


mmt_task1_test2016.tar.gz: 100%|██████████| 66.2k/66.2k [00:00<00:00, 157kB/s]


In [9]:
# example
print(f"training dataset 크기 : {len(train_dataset.examples)}")
print(f"validation dataset 크기 : {len(valid_dataset.examples)}")
print(f"testing dataset 크기 : {len(test_dataset.examples)}")

training dataset 크기 : 29000
validation dataset 크기 : 1014
testing dataset 크기 : 1000


In [10]:
# example
# index 30에 해당하는 하나의 문장을 출력해보자
# 번역하려는 독일어 문장-거꾸로 되어있음
print(vars(train_dataset.examples[30])['src'])
# 번역 결과로서 얻고자 하는 영어 문장
print(vars(train_dataset.examples[30])['trg'])

['.', 'steht', 'urinal', 'einem', 'an', 'kaffee', 'tasse', 'einer', 'mit', 'der', ',', 'mann', 'ein']
['a', 'man', 'standing', 'at', 'a', 'urinal', 'with', 'a', 'coffee', 'cup', '.']


```Field``` 객체의 ```build_vocab``` 메서드를 이용해 **영독 단어 사전**을 생성<br>
최소 2번 이상 등장한 단어만을 선택

In [12]:
#이를 입력차원으로 사용한다. 차원 수를 줄이기 위해 2번 이상 등장한 단어로 제한함
SRC.build_vocab(train_dataset, min_freq=2)
TRG.build_vocab(train_dataset, min_freq=2)

print(f"len(SRC): {len(SRC.vocab)}")
print(f"len(TRG): {len(TRG.vocab)}")

len(SRC): 7855
len(TRG): 5893


```SRC.vocab.stoi``` 
<br>토큰[어휘] : index

In [13]:
print(TRG.vocab.stoi["abcabc"])# 없는 단어의 경우 idx = 0
print(TRG.vocab.stoi[TRG.pad_token])
print(TRG.vocab.stoi["<sos>"])
print(TRG.vocab.stoi["<eos>"])
print(TRG.vocab.stoi["hello"])# 실제 사전에 존재하는 단어
print(TRG.vocab.stoi["world"])

0
1
2
3
4112
1752


한 문장에 포함된 단어가 연속적으로 LSTM에 입력되어야 한다. 따라서 하나의 batch에 포함된 문장들의 단어 개수가 유사하게 만들면 좋다. 이를 위해 bucketIterator을 사용한다.

In [None]:
import torch

device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')

BATCH_SIZE=128

train_iterator, valid_iterator, test_iterator=BucketIterator.splits(
    (train_dataset, valid_dataset, test_dataset),
    batch_size=BATCH_SIZE,
    device=device
)

In [None]:
# 첫번째 batch에 대해서만 출력해보자

for i, batch in enumerate(train_iterator):
  # batch는 src와 trg로 이루어져 있음
  src = batch.src
  trg = batch.trg

  print(f"첫 번째 배치 크기 : {src.shape}") #{num of token, batch size} 한 문장당 35개 단어로 된 128개 문장

  # <sos> 2, <eos> 3, 뒷부분 나머지 padding 1
  for i in range(src.shape[0]): # 한 batch에 있는 128개 문장 중 하나
    print(f"idx {i}: {src[i][0].item()}")  
  break


첫 번째 배치 크기 : torch.Size([35, 128])
idx 0: 2
idx 1: 4
idx 2: 420
idx 3: 1583
idx 4: 44
idx 5: 35
idx 6: 92
idx 7: 33
idx 8: 37
idx 9: 550
idx 10: 5
idx 11: 3
idx 12: 1
idx 13: 1
idx 14: 1
idx 15: 1
idx 16: 1
idx 17: 1
idx 18: 1
idx 19: 1
idx 20: 1
idx 21: 1
idx 22: 1
idx 23: 1
idx 24: 1
idx 25: 1
idx 26: 1
idx 27: 1
idx 28: 1
idx 29: 1
idx 30: 1
idx 31: 1
idx 32: 1
idx 33: 1
idx 34: 1


##Encoder 구조
SRC 문장 -> context vector<br>


LSTM의 입출력에 hidden state, cell state 사용<br>

**Hyperparameter**<br>
* input_dim : 하나의 단어에 대한 one hot encoding 차원(vocab 크기)<br>
* embed_dim :위 input_dim에서 압축된 차원<br>
* hidden_state<br>
* n_layers : 현재는 lstm 2번 중첩하여 사용<br>
* dropout_ratio : 정규화 효과를 위해 사용. 보통 0.5

In [None]:
import torch.nn as nn

class Encoder(nn.Module):
  def __init__(self, input_dim, embed_dim, hidden_dim, n_layers, dropout_ratio):
    super().__init__()

    # 1) embedding layer : one-hot encoding을 특정 차원의 임베딩으로 매핑하는 레이어
    self.embedding = nn.Embedding(input_dim, embed_dim)

    # 2) LSTM layer
    self.hidden_dim=hidden_dim
    self.n_layers=n_layers
    self.rnn=nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout_ratio)
    #input이 embeding을 거쳐 들어가기에 embed_dim만 존재,
    #return은 cell state, hidden state

    # 3) dropout
    self.dropout=nn.Dropout(dropout_ratio)

  #인코더는 소스 문장을 입력으로 받아 context vector을 반환
  def forward(self, src):# 1 batch 즉 128개의 문장이 한번에 처리되는 듯
    # src : [num_token, batch size] 각 단어의 index 정보
    # embedded : [num_token, batch size, embedding dim]
    embedded = self.dropout(self.embedding(src))

    # encoder에서는 output 사용되지 않음
    # hidden state가 context vector로 사용됨
    output, (hidden, cell) = self.rnn(embedded)
    # output :[단어개수, batch size, hidden dim] : 현재 단어의 출력 정보
    # hidden : [n_layer, batch size, hidden dim] : 현재까지의 모든 단어의 정보
    # cell : [n_layer, batch size, hidden dim] : 현재 까지의 모든 단어의 정보


    # context vector 반환
    return hidden, cell


## Decoder

* 주어진 context vector로 target 문장을 decoding 한다
* lstm은 hidden state와 cell state을 반환한다

In [None]:
class Decoder(nn.Module):
  def __init__(self, output_dim, embed_dim, hidden_dim, n_layers, dropout_ratio):
    super().__init__()

    # 1) embedding layer
    self.embedding=nn.embedding(output_dim, embed_dim)
    
    # 2) lstm layer
    self.hidden_dim=hidden_dim
    self.n_layers=n_layers
    self.rnn=nn.LSTM(embed_dim, hidden_dim, n_layes, dropout=dropout_ratio)

    # 3) fully connected layer
    self.output_dim = output_dim
    self.fc_out = nn.Linear(hidden_dim, output_dim)

    # 4) dropout
    self.dropout = nn.Dropout(dropout_ratio)

  # Decoder는 "현재까지 출력된 문장에 대한 정보"를 input으로 받아 target 문장 반환
  def forward(self, input, hidden, cell):
    # input 단어의 개수는 항상 1개 input
    # input : [단어 개수 = 1, batch size]
    input = input.unsqueeze(0)

    # embedded : [단어 개수, batch size, embedding dim]
    embedded = self.dropout(self.embedding(input))

    output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
    
    # output :[단어개수, batch size, hidden dim] : 현재 단어의 출력 정보
    # hidden : [n_layer, batch size, hidden dim] : 현재까지의 모든 단어의 정보
    # cell : [n_layer, batch size, hidden dim] : 현재 까지의 모든 단어의 정보

    # 단어 개수 1개이니 불필요한 차원 제거
    # prediction : [batch size, output dim]
    prediction = self.fc_out(output.squeeze(0))
    
    # [현재 출력 단어, 현재 까지의 모든 단어의 정보, 현재까지의 모든 단어의 정보]
    return prediction, hidden, cell

# Sequence to Sequence
* Encoder : 주어진 source sequence를 context vector로. 한 번에 값을 구함
* Decoder : 주어진 context vector을 target sequence로. 한 단어씩 넣어서 한 번씩 결과를 낸다.
* teacher forcing : Decoder의 prediction을 한번 틀리면 error propagation이 발생하니 prediction 값이 아닌 실제 ground truth를 다음 입력으로 사용하는 기법

In [None]:
class Seq2Seq(nn.Module):
  def __init__(self, encoder, decoder, device):
    super().__init__()

    self.encoder = encoder
    self.decoder = decoder
    self.device = device

  def forward(self, src, trg, teacher_forcing_ratio = 0.5):
    hidden, cell = self.encoder(src)

    # Decoder의 최종 결과를 담을 tensor 객체 만들기
    trg_len = trg.shape[0] # 단어 개수
    batch_size = trg.shape[1] # batch size
    trg_vocab_size = self.decoder.output_dim # output dimension
    outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)

    input = trg[0, :]# 첫 번째 입력은 항상 <sos> 토큰

    # target 단어 개수 만큼 반복하여 Decoder에 forwarding
    for t in range(1, trg_len):
      output, hidden, cell = self.decoder(input, hidden, cell)
      outputs[t] = output
      top1 = output.argmax(1)

      teacher_force = random.random() < teacher_forcing_ratio
      input = trg[t] if teacher_force else top1

    return outputs