<a href="https://colab.research.google.com/github/ParkEunHyeok/AI_Study/blob/main/Pytorch/Pytorch_Seq2Seq_translation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim

from torchtext.legacy.datasets import Multi30k
from torchtext.legacy.data import Field, BucketIterator
import spacy
import spacy as np

import random
import math
import time

In [2]:
# 구글 드라이브 연결
import os
from google.colab import drive
drive.mount('/content/gdrive/')
path = "gdrive/My Drive/Colab Notebooks/seq2seq/pytorch_seq2seq_translation/"

Mounted at /content/gdrive/


In [3]:
# 문장을 토큰화 하는 모듈 설치
!python -m spacy download en_core_web_sm
!python -m spacy download de_core_news_sm

Collecting en_core_web_sm==2.2.5
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-2.2.5/en_core_web_sm-2.2.5.tar.gz (12.0 MB)
[K     |████████████████████████████████| 12.0 MB 4.3 MB/s 
[38;5;2m✔ Download and installation successful[0m
You can now load the model via spacy.load('en_core_web_sm')
Collecting de_core_news_sm==2.2.5
  Downloading https://github.com/explosion/spacy-models/releases/download/de_core_news_sm-2.2.5/de_core_news_sm-2.2.5.tar.gz (14.9 MB)
[K     |████████████████████████████████| 14.9 MB 4.3 MB/s 
Building wheels for collected packages: de-core-news-sm
  Building wheel for de-core-news-sm (setup.py) ... [?25l[?25hdone
  Created wheel for de-core-news-sm: filename=de_core_news_sm-2.2.5-py3-none-any.whl size=14907055 sha256=2ec15b48580490729cf6e51e76b10903941d39e39d4c4d3fca8eef453b824340
  Stored in directory: /tmp/pip-ephem-wheel-cache-uq_8wjm1/wheels/00/66/69/cb6c921610087d2cab339062345098e30a5ceb665360e7b32a
Successfu

In [4]:
import de_core_news_sm
import en_core_web_sm

# 문장 토큰화 모델 load
spacy_en = en_core_web_sm.load()
spacy_de = de_core_news_sm.load()

# 토큰화 함수,
# 입력문장의 단어를 뒤집어 성능을 향상시킴
def tokenize_de(text):
  return [tok.text for tok in spacy_de.tokenizer(text)][::-1]

def tokenize_en(text):
  return [tok.text for tok in spacy_de.tokenizer(text)]

# Field를 사용하여 구체적인 전처리 내용 명시
# source : 독일어
SRC = Field(tokenize=tokenize_de,
            init_token="<SOS>",
            eos_token="<EOS>",
            lower=True)

# target : 영어
TRG = Field(tokenize=tokenize_en,
            init_token="<SOS>",
            eos_token="<EOS>",
            lower=True)

In [5]:
# Multi30k 데이터셋 불러오기
train_data, valid_data, test_data = Multi30k.splits(exts = ('.de', '.en'),
                                                    fields = (SRC, TRG))

downloading training.tar.gz


training.tar.gz: 100%|██████████| 1.21M/1.21M [00:02<00:00, 537kB/s]


downloading validation.tar.gz


validation.tar.gz: 100%|██████████| 46.3k/46.3k [00:00<00:00, 92.3kB/s]


downloading mmt_task1_test2016.tar.gz


mmt_task1_test2016.tar.gz: 100%|██████████| 66.2k/66.2k [00:00<00:00, 88.2kB/s]


In [6]:
# 데이터의 크기 명시
print(f'Number of training examples: {len(train_data.examples)}')
print(f'Number of validation examples: {len(valid_data.examples)}')
print(f'Number of testing examples: {len(test_data.examples)}')

Number of training examples: 29000
Number of validation examples: 1014
Number of testing examples: 1000


In [7]:
# min_freq = 2는 두 번 이상 등장한 토큰을 출력
# token이 한 번만 등장했다면, <unk>로 대체
SRC.build_vocab(train_data, min_freq=2)
TRG.build_vocab(train_data, min_freq=2)

In [8]:
# iterator 생성
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

batch_size = 128

# 하나의 배치에 포함되는 단어의 개수가 유사하도록 만듦
# batch size = 128
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_data, valid_data, test_data), batch_size=batch_size, device=device
)

In [9]:
# 주어진 Sequence를 입력받아 context vector로 인코딩
# LSTM모델은 hidden state와 cell state를 반환

class Encoder(nn.Module):
  def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
    super().__init__()

    self.hid_dim = hid_dim
    self.n_layers = n_layers

    # 입력값을 emd_dim 벡터로 인코딩
    self.embedding = nn.Embedding(input_dim, emb_dim)

    # 임베딩을 입력받아 hid_dim 크기의 hidden state, cell 출력
    self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout=dropout)

    self.dropout = nn.Dropout(dropout)

  def forward(self, src):
    embedded = self.dropout(self.embedding(src))

    outputs, (hidden, cell) = self.rnn(embedded)

    return hidden, cell

In [10]:
# encode된 context vector를 입력받아 decode해 단어를 예측

class Decoder(nn.Module):
  def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout):
    super().__init__()

    self.output_dim = output_dim
    self.hid_dim = hid_dim
    self.n_layers = n_layers

    # context vector를 입력받아 emb_dim 출력
    self.embedding = nn.Embedding(output_dim, emb_dim)

    # embedding을 입력받아 hid_dim 크기의 hidden state, cell 출력
    self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout=dropout)

    self.fc_out = nn.Linear(hid_dim, output_dim)

    self.dropout = nn.Dropout(dropout)

  def forward(self, input, hidden, cell):
    input = input.unsqueeze(0)

    embedded = self.dropout(self.embedding(input))

    output, (hidden, cell) = self.rnn(embedded, (hidden, cell))

    # 예측값의 fc score
    prediction = self.fc_out(output.squeeze(0))

    return prediction, hidden, cell

In [11]:
# 앞서 정의한 encoder, decoder를 가지고 있는 하나의 아키텍쳐

class Seq2Seq(nn.Module):
  def __init__(self, encoder, decoder, devide):
    super().__init__()

    self.encoder = encoder
    self.decoder = decoder
    self.device = device

    # encoder와 decoder의 hid_dim이 일치하지 않는 경우 에러메세지
    assert encoder.hid_dim == decoder.hid_dim, \
        'Hidden dimensions of encoder decoder must be equal'
    # encoder와 decoder의 n_layers가 일치하지 않는 경우 에러메세지
    assert encoder.n_layers == decoder.n_layers, \
        'Encoder and decoder must have equal number of layers'

  def forward(self, src, trg, teacher_forcing_ratio=0.5):
    batch_size = trg.shape[1]
    trg_len = trg.shape[0]  # 타겟 토큰 길이 얻기
    trg_vocab_size = self.decoder.output_dim

    # decoder의 output을 저장하기 위한 텐서
    outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)

    # initial hidden state
    hidden, cell = self.encoder(src)

    # 첫 번째 입력값 <SOS> 토큰
    input = trg[0, :]

    for t in range(1, trg_len): # <EOS> 제외하고 타겟 길이-1 만큼 반복
      output, hidden, cell = self.decoder(input, hidden, cell)

      # prediction 저장
      outputs[t] = output

      # 교사 강요를 사용할지 말지 결정
      teacher_force = random.random() < teacher_forcing_ratio

      # 가장 높은 확률을 갖은 갚
      top1 = output.argmax(1)

      # 교사 강요를 할 경우 다음 LSTM에 타겟 토큰 입력
      input = trg[t] if teacher_force else top1

    return outputs

In [12]:
# 하이퍼 파라미터 지정
input_dim = len(SRC.vocab)
output_dim = len(TRG.vocab)
enc_emb_dim = 256 # 임베딩 차원
dec_emb_dim = 256
hid_dim = 512 # hidden state 차원
n_layers = 2
enc_dropout = 0.5
dec_dropout = 0.5

In [13]:
# 모델 생성
enc = Encoder(input_dim, enc_emb_dim, hid_dim, n_layers, enc_dropout)
dec = Decoder(output_dim, dec_emb_dim, hid_dim, n_layers, dec_dropout)

model = Seq2Seq(enc, dec, device).to(device)

In [37]:
# 가중치 초기화
def init_weights(m):
    for name, param in m.named_parameters():
        nn.init.uniform_(param.data, -0.08, 0.08)

model.apply(init_weights)

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(7855, 256)
    (rnn): LSTM(256, 512, num_layers=2, dropout=0.5)
    (dropout): Dropout(p=0.5, inplace=False)
  )
  (decoder): Decoder(
    (embedding): Embedding(5923, 256)
    (rnn): LSTM(256, 512, num_layers=2, dropout=0.5)
    (fc_out): Linear(in_features=512, out_features=5923, bias=True)
    (dropout): Dropout(p=0.5, inplace=False)
  )
)

In [38]:
# 모델의 학습가능한 파라미터 수 측정
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainableparameters')

The model has 13,922,083 trainableparameters


In [39]:
# optimizer
optimizer = optim.Adam(model.parameters())

# loss function
# pad에 해당하는 index는 무시
trg_pad_idx = TRG.vocab.stoi[TRG.pad_token]
criterion = nn.CrossEntropyLoss(ignore_index=trg_pad_idx)

In [44]:
import os.path
if os.path.isfile(path+'tut1-model.pt'):
  print("loaded model.")
  model = torch.load(path+'tut1-model.pt')

loaded model.


In [45]:
# 학습을 위한 함수
def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0

    for i, batch in enumerate(iterator):
        src = batch.src
        trg = batch.trg
        optimizer.zero_grad()

        output = model(src,trg) # [trg len, batch size, output dim]
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim) # loss 계산을 위해 1d로 변경
        trg = trg[1:].view(-1) # loss 계산을 위해 1d로 변경

        loss = criterion(output, trg)
        loss.backward()

        # 기울기 clip
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(iterator)

In [46]:
# evaluation function
def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    
    with torch.no_grad():
        for i, batch in enumerate(iterator):
            src = batch.src
            trg = batch.trg

            # output: [trg len, batch size, output dim]
            output = model(src, trg, 0) # teacher forcing off
            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim) # [(trg len -1) * batch size, output dim]
            trg = trg[1:].view(-1) # [(trg len -1) * batch size, output dim]

            loss = criterion(output, trg)

            epoch_loss += loss.item()

    return epoch_loss / len(iterator)

In [47]:
# function to count training time
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [48]:
# 학습 시작
num_epochs = 10
clip = 1

best_valid_loss = float('inf')

for epoch in range(num_epochs):
   
    start_time = time.time()
    
    train_loss = train(model, train_iterator, optimizer, criterion, clip)
    valid_loss = evaluate(model, valid_iterator, criterion)
    
    end_time = time.time()
    
    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        # torch.save(model.state_dict(), path+'tut1-model.pt')
        torch.save(model, path+'tut1-model.pt')
    
    print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. PPL: {math.exp(valid_loss):7.3f}')

Epoch: 01 | Time: 1m 13s
	Train Loss: 3.153 | Train PPL:  23.415
	 Val. Loss: 3.927 |  Val. PPL:  50.754
Epoch: 02 | Time: 1m 13s
	Train Loss: 3.146 | Train PPL:  23.232
	 Val. Loss: 3.927 |  Val. PPL:  50.754
Epoch: 03 | Time: 1m 13s
	Train Loss: 3.162 | Train PPL:  23.608
	 Val. Loss: 3.927 |  Val. PPL:  50.754
Epoch: 04 | Time: 1m 13s
	Train Loss: 3.156 | Train PPL:  23.481
	 Val. Loss: 3.927 |  Val. PPL:  50.754
Epoch: 05 | Time: 1m 13s
	Train Loss: 3.140 | Train PPL:  23.112
	 Val. Loss: 3.927 |  Val. PPL:  50.754
Epoch: 06 | Time: 1m 12s
	Train Loss: 3.161 | Train PPL:  23.584
	 Val. Loss: 3.927 |  Val. PPL:  50.754
Epoch: 07 | Time: 1m 13s
	Train Loss: 3.137 | Train PPL:  23.039
	 Val. Loss: 3.927 |  Val. PPL:  50.754
Epoch: 08 | Time: 1m 12s
	Train Loss: 3.156 | Train PPL:  23.477
	 Val. Loss: 3.927 |  Val. PPL:  50.754
Epoch: 09 | Time: 1m 13s
	Train Loss: 3.156 | Train PPL:  23.483
	 Val. Loss: 3.927 |  Val. PPL:  50.754
Epoch: 10 | Time: 1m 13s
	Train Loss: 3.140 | Train PPL

In [50]:
# best val loss일 때의 가중치를 불러옵니다.
torch.load(path+'tut1-model.pt')

# test loss를 측정합니다.
test_loss = evaluate(model, test_iterator, criterion)

print(f'| Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):7.3f} |')

| Test Loss: 3.973 | Test PPL:  53.146 |


In [51]:
# 번역(translation) 함수
def translate_sentence(sentence, src_field, trg_field, model, device, max_len=50):
    model.eval() # 평가 모드

    if isinstance(sentence, str):
        nlp = spacy.load('de')
        tokens = [token.text.lower() for token in nlp(sentence)]
    else:
        tokens = [token.lower() for token in sentence]

    # 처음에 <sos> 토큰, 마지막에 <eos> 토큰 붙이기
    tokens = [src_field.init_token] + tokens + [src_field.eos_token]
    print(f"전체 소스 토큰: {tokens}")

    src_indexes = [src_field.vocab.stoi[token] for token in tokens]
    print(f"소스 문장 인덱스: {src_indexes}")

    src_tensor = torch.LongTensor(src_indexes).unsqueeze(1).to(device)

    # 인코더(endocer)에 소스 문장을 넣어 문맥 벡터(context vector) 계산
    with torch.no_grad():
        hidden, cell = model.encoder(src_tensor)

    # 처음에는 <sos> 토큰 하나만 가지고 있도록 하기
    trg_indexes = [trg_field.vocab.stoi[trg_field.init_token]]

    for i in range(max_len):
        # 이전에 출력한 단어가 현재 단어로 입력될 수 있도록
        trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)

        with torch.no_grad():
            output, hidden, cell = model.decoder(trg_tensor, hidden, cell)

        pred_token = output.argmax(1).item()
        trg_indexes.append(pred_token) # 출력 문장에 더하기

        # <eos>를 만나는 순간 끝
        if pred_token == trg_field.vocab.stoi[trg_field.eos_token]:
            break

    # 각 출력 단어 인덱스를 실제 단어로 변환
    trg_tokens = [trg_field.vocab.itos[i] for i in trg_indexes]

    # 첫 번째 <sos>는 제외하고 출력 문장 반환
    return trg_tokens[1:]

In [59]:
example_idx = 10

src = vars(test_data.examples[example_idx])['src']
trg = vars(test_data.examples[example_idx])['trg']

print(f'소스 문장: {src}')
print(f'타겟 문장: {trg}')
print("모델 출력 결과:", " ".join(translate_sentence(src, SRC, TRG, model, device)))

소스 문장: ['.', 'freien', 'im', 'tag', 'schönen', 'einen', 'genießen', 'sohn', 'kleiner', 'ihr', 'und', 'mutter', 'eine']
타겟 문장: ['a', 'mother', 'and', 'her', 'young', 'song', 'enjoying', 'a', 'beautiful', 'day', 'outside', '.']
전체 소스 토큰: ['<SOS>', '.', 'freien', 'im', 'tag', 'schönen', 'einen', 'genießen', 'sohn', 'kleiner', 'ihr', 'und', 'mutter', 'eine', '<EOS>']
소스 문장 인덱스: [2, 4, 88, 20, 200, 780, 19, 565, 624, 70, 134, 10, 364, 8, 3]
모델 출력 결과: a young and her son are in the water in the background . <EOS>


In [53]:
src = tokenize_de("Guten Abend.")

print(f'소스 문장: {src}')
print("모델 출력 결과:", " ".join(translate_sentence(src, SRC, TRG, model, device)))

소스 문장: ['.', 'Abend', 'Guten']
전체 소스 토큰: ['<SOS>', '.', 'abend', 'guten', '<EOS>']
소스 문장 인덱스: [2, 4, 1163, 3799, 3]
모델 출력 결과: a <unk> . . <EOS>
